001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.api;
020
021import org.apache.commons.lang.Validate;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.evaluator.AllocatedEvaluator;
025import org.apache.reef.driver.evaluator.EvaluatorRequest;
026import org.apache.reef.driver.evaluator.EvaluatorRequestor;
027import org.apache.reef.driver.evaluator.FailedEvaluator;
028import org.apache.reef.io.data.loading.impl.AvroEvaluatorRequestSerializer;
029import org.apache.reef.io.network.util.Pair;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.annotations.Parameter;
032import org.apache.reef.tang.annotations.Unit;
033import org.apache.reef.tang.exceptions.BindException;
034import org.apache.reef.wake.EventHandler;
035import org.apache.reef.wake.impl.SingleThreadStage;
036import org.apache.reef.wake.time.Clock;
037import org.apache.reef.wake.time.event.Alarm;
038import org.apache.reef.wake.time.event.StartTime;
039
040import javax.inject.Inject;
041
042import java.util.Set;
043import java.util.concurrent.BlockingQueue;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.ConcurrentMap;
046import java.util.concurrent.LinkedBlockingQueue;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.logging.Level;
049import java.util.logging.Logger;
050
051/**
052 * The driver component for the DataLoadingService
053 * Also acts as the central point for resource requests
054 * All the allocated evaluators pass through this and
055 * the ones that need data loading have a context stacked
056 * that enables a task to get access to Data via the
057 * {@link DataSet}.
058 * <p>
059 * TODO: Add timeouts
060 */
061@DriverSide
062@Unit
063public class DataLoader {
064
065  private static final Logger LOG = Logger.getLogger(DataLoader.class.getName());
066
067  private final ConcurrentMap<String, Pair<Configuration, Configuration>> submittedDataEvalConfigs =
068      new ConcurrentHashMap<>();
069  private final ConcurrentMap<String, Configuration> submittedComputeEvalConfigs = new ConcurrentHashMap<>();
070  private final BlockingQueue<Configuration> failedComputeEvalConfigs = new LinkedBlockingQueue<>();
071  private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue<>();
072
073  private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0);
074  private final AtomicInteger numDataRequestsToSubmit = new AtomicInteger(0);
075
076  private final DataLoadingService dataLoadingService;
077  private int dataEvalMemoryMB;
078  private int dataEvalCore;
079  private final SingleThreadStage<EvaluatorRequest> resourceRequestStage;
080  private final ResourceRequestHandler resourceRequestHandler;
081  private int computeEvalMemoryMB;
082  private int computeEvalCore;
083  private final EvaluatorRequestor requestor;
084
085  /**
086   * Allows to specify compute and data evaluator requests in particular
087   * locations.
088   *
089   * @param clock
090   *          the clock
091   * @param requestor
092   *          the evaluator requestor
093   * @param dataLoadingService
094   *          the data loading service
095   * @param serializedComputeRequests
096   *          serialized compute requests (evaluators that will not load data)
097   * @param serializedDataRequests
098   *          serialized data requests (evaluators that will load data). It
099   *          cannot be empty (to maintain previous functionality)
100   */
101  @Inject
102  public DataLoader(
103      final Clock clock,
104      final EvaluatorRequestor requestor,
105      final DataLoadingService dataLoadingService,
106      @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequests.class)
107      final Set<String> serializedComputeRequests,
108      @Parameter(DataLoadingRequestBuilder.DataLoadingDataRequests.class) final Set<String> serializedDataRequests) {
109    // data requests should not be empty. This maintains previous functionality
110    Validate.notEmpty(serializedDataRequests, "Should contain a data request object");
111    // FIXME: Issue #855: We need this alarm to look busy for REEF.
112    clock.scheduleAlarm(30000, new EventHandler<Alarm>() {
113      @Override
114      public void onNext(final Alarm time) {
115        LOG.log(Level.FINE, "Received Alarm: {0}", time);
116      }
117    });
118
119    this.requestor = requestor;
120    this.dataLoadingService = dataLoadingService;
121    this.resourceRequestHandler = new ResourceRequestHandler(requestor);
122    // the resource request queue will have as many requests as compute and data requests.
123    this.resourceRequestStage = new SingleThreadStage<>(
124        this.resourceRequestHandler, serializedComputeRequests.size()
125            + serializedDataRequests.size());
126
127    if (serializedComputeRequests.isEmpty()) {
128      this.computeEvalMemoryMB = -1;
129      this.computeEvalCore = 1;
130    } else {
131      // Deserialize each compute request.
132      // Keep the maximum number of cores and memory requested, in case some
133      // evaluator fails, we will try to reallocate based on that.
134      for (final String serializedComputeRequest : serializedComputeRequests) {
135        final EvaluatorRequest computeRequest = AvroEvaluatorRequestSerializer.fromString(serializedComputeRequest);
136        this.numComputeRequestsToSubmit.addAndGet(computeRequest.getNumber());
137        this.computeEvalMemoryMB = Math.max(this.computeEvalMemoryMB, computeRequest.getMegaBytes());
138        this.computeEvalCore = Math.max(this.computeEvalCore, computeRequest.getNumberOfCores());
139        this.resourceRequestStage.onNext(computeRequest);
140      }
141    }
142    // Deserialize each data requests.
143    // We distribute the partitions evenly across the DCs.
144    // The number of partitions extracted from the dataLoadingService override
145    // the number of evaluators requested (this preserves previous functionality)
146    final int dcs = serializedDataRequests.size();
147    final int partitionsPerDataCenter = this.dataLoadingService.getNumberOfPartitions() / dcs;
148    int missing = this.dataLoadingService.getNumberOfPartitions() % dcs;
149    for (final String serializedDataRequest : serializedDataRequests) {
150      EvaluatorRequest dataRequest = AvroEvaluatorRequestSerializer.fromString(serializedDataRequest);
151      this.dataEvalMemoryMB = Math.max(this.dataEvalMemoryMB, dataRequest.getMegaBytes());
152      this.dataEvalCore = Math.max(this.dataEvalCore, dataRequest.getNumberOfCores());
153      // clone the request but update the number of evaluators based on the number of partitions
154      int number = partitionsPerDataCenter;
155      if (missing > 0) {
156        number++;
157        missing--;
158      }
159      dataRequest = EvaluatorRequest.newBuilder(dataRequest).setNumber(number).build();
160      this.numDataRequestsToSubmit.addAndGet(number);
161      this.resourceRequestStage.onNext(dataRequest);
162    }
163  }
164
165  public class StartHandler implements EventHandler<StartTime> {
166    @Override
167    public void onNext(final StartTime startTime) {
168      LOG.log(Level.INFO, "StartTime: {0}", startTime);
169      resourceRequestHandler.releaseResourceRequestGate();
170    }
171  }
172
173  public class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
174
175    @Override
176    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
177
178      final String evalId = allocatedEvaluator.getId();
179      LOG.log(Level.FINEST, "Allocated evaluator: {0}", evalId);
180
181      if (!failedComputeEvalConfigs.isEmpty()) {
182        LOG.log(Level.FINE, "Failed Compute requests need to be satisfied for {0}", evalId);
183        final Configuration conf = failedComputeEvalConfigs.poll();
184        if (conf != null) {
185          LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
186          allocatedEvaluator.submitContext(conf);
187          submittedComputeEvalConfigs.put(evalId, conf);
188          return;
189        }
190      }
191
192      if (!failedDataEvalConfigs.isEmpty()) {
193        LOG.log(Level.FINE, "Failed Data requests need to be satisfied for {0}", evalId);
194        final Pair<Configuration, Configuration> confPair = failedDataEvalConfigs.poll();
195        if (confPair != null) {
196          LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
197          allocatedEvaluator.submitContextAndService(confPair.getFirst(), confPair.getSecond());
198          submittedDataEvalConfigs.put(evalId, confPair);
199          return;
200        }
201      }
202
203      final int evaluatorsForComputeRequest = numComputeRequestsToSubmit.decrementAndGet();
204
205      if (evaluatorsForComputeRequest >= 0) {
206        LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest);
207        try {
208          final Configuration idConfiguration = ContextConfiguration.CONF.set(
209              ContextConfiguration.IDENTIFIER,
210              dataLoadingService.getComputeContextIdPrefix()
211                  + evaluatorsForComputeRequest).build();
212          LOG.log(Level.FINE, "Submitting Compute Context to {0}", evalId);
213          allocatedEvaluator.submitContext(idConfiguration);
214          submittedComputeEvalConfigs.put(allocatedEvaluator.getId(),
215              idConfiguration);
216          // should release the request gate when there are >= 0 compute
217          // requests (now that we can have more than 1)
218          LOG.log(
219              Level.FINE,
220              evaluatorsForComputeRequest > 0 ? "More Compute requests need to be satisfied"
221                  : "All Compute requests satisfied." + " Releasing gate");
222          resourceRequestHandler.releaseResourceRequestGate();
223        } catch (final BindException e) {
224          throw new RuntimeException(
225              "Unable to bind context id for Compute request", e);
226        }
227
228      } else {
229
230        final int evaluatorsForDataRequest = numDataRequestsToSubmit.decrementAndGet();
231        LOG.log(Level.FINE, "Evaluators for data request: {0}", evaluatorsForDataRequest);
232
233        final Pair<Configuration, Configuration> confPair = new Pair<>(
234            dataLoadingService.getContextConfiguration(allocatedEvaluator),
235            dataLoadingService.getServiceConfiguration(allocatedEvaluator));
236
237        LOG.log(Level.FINE, "Submitting data loading context to {0}", evalId);
238        allocatedEvaluator.submitContextAndService(confPair.getFirst(), confPair.getSecond());
239        submittedDataEvalConfigs.put(allocatedEvaluator.getId(), confPair);
240
241        // release the gate to keep on asking for more "data" evaluators.
242        if (evaluatorsForDataRequest > 0) {
243          LOG.log(Level.FINE, "More Data requests need to be satisfied. Releasing gate");
244          resourceRequestHandler.releaseResourceRequestGate();
245        // don't need to release if it's 0
246        } else if (evaluatorsForDataRequest == 0) {
247          LOG.log(Level.FINE, "All Data requests satisfied");
248        }
249      }
250    }
251  }
252
253  public class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
254    @Override
255    public void onNext(final FailedEvaluator failedEvaluator) {
256
257      final String evalId = failedEvaluator.getId();
258
259      final Configuration computeConfig = submittedComputeEvalConfigs.remove(evalId);
260      if (computeConfig != null) {
261
262        LOG.log(Level.INFO, "Received failed compute evaluator: {0}", evalId);
263        failedComputeEvalConfigs.add(computeConfig);
264
265        requestor.submit(EvaluatorRequest.newBuilder()
266            .setMemory(computeEvalMemoryMB).setNumber(1).setNumberOfCores(computeEvalCore).build());
267
268      } else {
269
270        final Pair<Configuration, Configuration> confPair = submittedDataEvalConfigs.remove(evalId);
271        if (confPair != null) {
272
273          LOG.log(Level.INFO, "Received failed data evaluator: {0}", evalId);
274          failedDataEvalConfigs.add(confPair);
275
276          requestor.submit(EvaluatorRequest.newBuilder()
277              .setMemory(dataEvalMemoryMB).setNumber(1).setNumberOfCores(dataEvalCore).build());
278
279        } else {
280
281          LOG.log(Level.SEVERE, "Received unknown failed evaluator " + evalId,
282              failedEvaluator.getEvaluatorException());
283
284          throw new RuntimeException("Received failed evaluator that I did not submit: " + evalId);
285        }
286      }
287    }
288  }
289}