001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.api; 020 021import org.apache.commons.lang.Validate; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.evaluator.AllocatedEvaluator; 025import org.apache.reef.driver.evaluator.EvaluatorRequest; 026import org.apache.reef.driver.evaluator.EvaluatorRequestor; 027import org.apache.reef.driver.evaluator.FailedEvaluator; 028import org.apache.reef.io.data.loading.impl.AvroEvaluatorRequestSerializer; 029import org.apache.reef.io.network.util.Pair; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.annotations.Parameter; 032import org.apache.reef.tang.annotations.Unit; 033import org.apache.reef.tang.exceptions.BindException; 034import org.apache.reef.wake.EventHandler; 035import org.apache.reef.wake.impl.SingleThreadStage; 036import org.apache.reef.wake.time.Clock; 037import org.apache.reef.wake.time.event.Alarm; 038import org.apache.reef.wake.time.event.StartTime; 039 040import javax.inject.Inject; 041 042import java.util.Set; 043import java.util.concurrent.BlockingQueue; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ConcurrentMap; 046import java.util.concurrent.LinkedBlockingQueue; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.logging.Level; 049import java.util.logging.Logger; 050 051/** 052 * The driver component for the DataLoadingService 053 * Also acts as the central point for resource requests 054 * All the allocated evaluators pass through this and 055 * the ones that need data loading have a context stacked 056 * that enables a task to get access to Data via the 057 * {@link DataSet}. 058 * <p> 059 * TODO: Add timeouts 060 */ 061@DriverSide 062@Unit 063public class DataLoader { 064 065 private static final Logger LOG = Logger.getLogger(DataLoader.class.getName()); 066 067 private final ConcurrentMap<String, Pair<Configuration, Configuration>> submittedDataEvalConfigs = 068 new ConcurrentHashMap<>(); 069 private final ConcurrentMap<String, Configuration> submittedComputeEvalConfigs = new ConcurrentHashMap<>(); 070 private final BlockingQueue<Configuration> failedComputeEvalConfigs = new LinkedBlockingQueue<>(); 071 private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue<>(); 072 073 private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0); 074 private final AtomicInteger numDataRequestsToSubmit = new AtomicInteger(0); 075 076 private final DataLoadingService dataLoadingService; 077 private int dataEvalMemoryMB; 078 private int dataEvalCore; 079 private final SingleThreadStage<EvaluatorRequest> resourceRequestStage; 080 private final ResourceRequestHandler resourceRequestHandler; 081 private int computeEvalMemoryMB; 082 private int computeEvalCore; 083 private final EvaluatorRequestor requestor; 084 085 /** 086 * Allows to specify compute and data evaluator requests in particular 087 * locations. 088 * 089 * @param clock 090 * the clock 091 * @param requestor 092 * the evaluator requestor 093 * @param dataLoadingService 094 * the data loading service 095 * @param serializedComputeRequests 096 * serialized compute requests (evaluators that will not load data) 097 * @param serializedDataRequests 098 * serialized data requests (evaluators that will load data). It 099 * cannot be empty (to maintain previous functionality) 100 */ 101 @Inject 102 public DataLoader( 103 final Clock clock, 104 final EvaluatorRequestor requestor, 105 final DataLoadingService dataLoadingService, 106 @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequests.class) 107 final Set<String> serializedComputeRequests, 108 @Parameter(DataLoadingRequestBuilder.DataLoadingDataRequests.class) final Set<String> serializedDataRequests) { 109 // data requests should not be empty. This maintains previous functionality 110 Validate.notEmpty(serializedDataRequests, "Should contain a data request object"); 111 // FIXME: Issue #855: We need this alarm to look busy for REEF. 112 clock.scheduleAlarm(30000, new EventHandler<Alarm>() { 113 @Override 114 public void onNext(final Alarm time) { 115 LOG.log(Level.FINE, "Received Alarm: {0}", time); 116 } 117 }); 118 119 this.requestor = requestor; 120 this.dataLoadingService = dataLoadingService; 121 this.resourceRequestHandler = new ResourceRequestHandler(requestor); 122 // the resource request queue will have as many requests as compute and data requests. 123 this.resourceRequestStage = new SingleThreadStage<>( 124 this.resourceRequestHandler, serializedComputeRequests.size() 125 + serializedDataRequests.size()); 126 127 if (serializedComputeRequests.isEmpty()) { 128 this.computeEvalMemoryMB = -1; 129 this.computeEvalCore = 1; 130 } else { 131 // Deserialize each compute request. 132 // Keep the maximum number of cores and memory requested, in case some 133 // evaluator fails, we will try to reallocate based on that. 134 for (final String serializedComputeRequest : serializedComputeRequests) { 135 final EvaluatorRequest computeRequest = AvroEvaluatorRequestSerializer.fromString(serializedComputeRequest); 136 this.numComputeRequestsToSubmit.addAndGet(computeRequest.getNumber()); 137 this.computeEvalMemoryMB = Math.max(this.computeEvalMemoryMB, computeRequest.getMegaBytes()); 138 this.computeEvalCore = Math.max(this.computeEvalCore, computeRequest.getNumberOfCores()); 139 this.resourceRequestStage.onNext(computeRequest); 140 } 141 } 142 // Deserialize each data requests. 143 // We distribute the partitions evenly across the DCs. 144 // The number of partitions extracted from the dataLoadingService override 145 // the number of evaluators requested (this preserves previous functionality) 146 final int dcs = serializedDataRequests.size(); 147 final int partitionsPerDataCenter = this.dataLoadingService.getNumberOfPartitions() / dcs; 148 int missing = this.dataLoadingService.getNumberOfPartitions() % dcs; 149 for (final String serializedDataRequest : serializedDataRequests) { 150 EvaluatorRequest dataRequest = AvroEvaluatorRequestSerializer.fromString(serializedDataRequest); 151 this.dataEvalMemoryMB = Math.max(this.dataEvalMemoryMB, dataRequest.getMegaBytes()); 152 this.dataEvalCore = Math.max(this.dataEvalCore, dataRequest.getNumberOfCores()); 153 // clone the request but update the number of evaluators based on the number of partitions 154 int number = partitionsPerDataCenter; 155 if (missing > 0) { 156 number++; 157 missing--; 158 } 159 dataRequest = EvaluatorRequest.newBuilder(dataRequest).setNumber(number).build(); 160 this.numDataRequestsToSubmit.addAndGet(number); 161 this.resourceRequestStage.onNext(dataRequest); 162 } 163 } 164 165 public class StartHandler implements EventHandler<StartTime> { 166 @Override 167 public void onNext(final StartTime startTime) { 168 LOG.log(Level.INFO, "StartTime: {0}", startTime); 169 resourceRequestHandler.releaseResourceRequestGate(); 170 } 171 } 172 173 public class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 174 175 @Override 176 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 177 178 final String evalId = allocatedEvaluator.getId(); 179 LOG.log(Level.FINEST, "Allocated evaluator: {0}", evalId); 180 181 if (!failedComputeEvalConfigs.isEmpty()) { 182 LOG.log(Level.FINE, "Failed Compute requests need to be satisfied for {0}", evalId); 183 final Configuration conf = failedComputeEvalConfigs.poll(); 184 if (conf != null) { 185 LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId); 186 allocatedEvaluator.submitContext(conf); 187 submittedComputeEvalConfigs.put(evalId, conf); 188 return; 189 } 190 } 191 192 if (!failedDataEvalConfigs.isEmpty()) { 193 LOG.log(Level.FINE, "Failed Data requests need to be satisfied for {0}", evalId); 194 final Pair<Configuration, Configuration> confPair = failedDataEvalConfigs.poll(); 195 if (confPair != null) { 196 LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId); 197 allocatedEvaluator.submitContextAndService(confPair.getFirst(), confPair.getSecond()); 198 submittedDataEvalConfigs.put(evalId, confPair); 199 return; 200 } 201 } 202 203 final int evaluatorsForComputeRequest = numComputeRequestsToSubmit.decrementAndGet(); 204 205 if (evaluatorsForComputeRequest >= 0) { 206 LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest); 207 try { 208 final Configuration idConfiguration = ContextConfiguration.CONF.set( 209 ContextConfiguration.IDENTIFIER, 210 dataLoadingService.getComputeContextIdPrefix() 211 + evaluatorsForComputeRequest).build(); 212 LOG.log(Level.FINE, "Submitting Compute Context to {0}", evalId); 213 allocatedEvaluator.submitContext(idConfiguration); 214 submittedComputeEvalConfigs.put(allocatedEvaluator.getId(), 215 idConfiguration); 216 // should release the request gate when there are >= 0 compute 217 // requests (now that we can have more than 1) 218 LOG.log( 219 Level.FINE, 220 evaluatorsForComputeRequest > 0 ? "More Compute requests need to be satisfied" 221 : "All Compute requests satisfied." + " Releasing gate"); 222 resourceRequestHandler.releaseResourceRequestGate(); 223 } catch (final BindException e) { 224 throw new RuntimeException( 225 "Unable to bind context id for Compute request", e); 226 } 227 228 } else { 229 230 final int evaluatorsForDataRequest = numDataRequestsToSubmit.decrementAndGet(); 231 LOG.log(Level.FINE, "Evaluators for data request: {0}", evaluatorsForDataRequest); 232 233 final Pair<Configuration, Configuration> confPair = new Pair<>( 234 dataLoadingService.getContextConfiguration(allocatedEvaluator), 235 dataLoadingService.getServiceConfiguration(allocatedEvaluator)); 236 237 LOG.log(Level.FINE, "Submitting data loading context to {0}", evalId); 238 allocatedEvaluator.submitContextAndService(confPair.getFirst(), confPair.getSecond()); 239 submittedDataEvalConfigs.put(allocatedEvaluator.getId(), confPair); 240 241 // release the gate to keep on asking for more "data" evaluators. 242 if (evaluatorsForDataRequest > 0) { 243 LOG.log(Level.FINE, "More Data requests need to be satisfied. Releasing gate"); 244 resourceRequestHandler.releaseResourceRequestGate(); 245 // don't need to release if it's 0 246 } else if (evaluatorsForDataRequest == 0) { 247 LOG.log(Level.FINE, "All Data requests satisfied"); 248 } 249 } 250 } 251 } 252 253 public class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { 254 @Override 255 public void onNext(final FailedEvaluator failedEvaluator) { 256 257 final String evalId = failedEvaluator.getId(); 258 259 final Configuration computeConfig = submittedComputeEvalConfigs.remove(evalId); 260 if (computeConfig != null) { 261 262 LOG.log(Level.INFO, "Received failed compute evaluator: {0}", evalId); 263 failedComputeEvalConfigs.add(computeConfig); 264 265 requestor.submit(EvaluatorRequest.newBuilder() 266 .setMemory(computeEvalMemoryMB).setNumber(1).setNumberOfCores(computeEvalCore).build()); 267 268 } else { 269 270 final Pair<Configuration, Configuration> confPair = submittedDataEvalConfigs.remove(evalId); 271 if (confPair != null) { 272 273 LOG.log(Level.INFO, "Received failed data evaluator: {0}", evalId); 274 failedDataEvalConfigs.add(confPair); 275 276 requestor.submit(EvaluatorRequest.newBuilder() 277 .setMemory(dataEvalMemoryMB).setNumber(1).setNumberOfCores(dataEvalCore).build()); 278 279 } else { 280 281 LOG.log(Level.SEVERE, "Received unknown failed evaluator " + evalId, 282 failedEvaluator.getEvaluatorException()); 283 284 throw new RuntimeException("Received failed evaluator that I did not submit: " + evalId); 285 } 286 } 287 } 288 } 289}