This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.api;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.evaluator.FailedEvaluator;
027import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
028import org.apache.reef.io.network.util.Pair;
029import org.apache.reef.tang.Configuration;
030import org.apache.reef.tang.annotations.Parameter;
031import org.apache.reef.tang.annotations.Unit;
032import org.apache.reef.tang.exceptions.BindException;
033import org.apache.reef.wake.EventHandler;
034import org.apache.reef.wake.impl.SingleThreadStage;
035import org.apache.reef.wake.time.Clock;
036import org.apache.reef.wake.time.event.Alarm;
037import org.apache.reef.wake.time.event.StartTime;
038
039import javax.inject.Inject;
040import java.util.concurrent.BlockingQueue;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.LinkedBlockingQueue;
044import java.util.concurrent.atomic.AtomicInteger;
045import java.util.logging.Level;
046import java.util.logging.Logger;
047
048/**
049 * The driver component for the DataLoadingService
050 * Also acts as the central point for resource requests
051 * All the allocated evaluators pass through this and
052 * the ones that need data loading have a context stacked
053 * that enables a task to get access to Data via the
054 * {@link DataSet}.
055 * <p/>
056 * TODO: Add timeouts
057 */
058@DriverSide
059@Unit
060public class DataLoader {
061
062  private static final Logger LOG = Logger.getLogger(DataLoader.class.getName());
063
064  private final ConcurrentMap<String, Pair<Configuration, Configuration>> submittedDataEvalConfigs = new ConcurrentHashMap<>();
065  private final ConcurrentMap<String, Configuration> submittedComputeEvalConfigs = new ConcurrentHashMap<>();
066  private final BlockingQueue<Configuration> failedComputeEvalConfigs = new LinkedBlockingQueue<>();
067  private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue<>();
068
069  private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0);
070
071  private final DataLoadingService dataLoadingService;
072  private final int dataEvalMemoryMB;
073  private final int dataEvalCore;
074  private final EvaluatorRequest computeRequest;
075  private final SingleThreadStage<EvaluatorRequest> resourceRequestStage;
076  private final ResourceRequestHandler resourceRequestHandler;
077  private final int computeEvalMemoryMB;
078  private final int computeEvalCore;
079  private final EvaluatorRequestor requestor;
080
081  @Inject
082  public DataLoader(
083      final Clock clock,
084      final EvaluatorRequestor requestor,
085      final DataLoadingService dataLoadingService,
086      final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorMemoryMB.class) int dataEvalMemoryMB,
087      final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorNumberOfCores.class) int dataEvalCore,
088      final @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequest.class) String serializedComputeRequest) {
089
090    // FIXME: Issue #855: We need this alarm to look busy for REEF.
091    clock.scheduleAlarm(30000, new EventHandler<Alarm>() {
092      @Override
093      public void onNext(final Alarm time) {
094        LOG.log(Level.FINE, "Received Alarm: {0}", time);
095      }
096    });
097
098    this.requestor = requestor;
099    this.dataLoadingService = dataLoadingService;
100    this.dataEvalMemoryMB = dataEvalMemoryMB;
101    this.dataEvalCore = dataEvalCore;
102    this.resourceRequestHandler = new ResourceRequestHandler(requestor);
103    this.resourceRequestStage = new SingleThreadStage<>(this.resourceRequestHandler, 2);
104
105    if (serializedComputeRequest.equals("NULL")) {
106      this.computeRequest = null;
107      this.computeEvalMemoryMB = -1;
108      computeEvalCore = 1;
109    } else {
110      this.computeRequest = EvaluatorRequestSerializer.deserialize(serializedComputeRequest);
111      this.computeEvalMemoryMB = this.computeRequest.getMegaBytes();
112      this.computeEvalCore = this.computeRequest.getNumberOfCores();
113      this.numComputeRequestsToSubmit.set(this.computeRequest.getNumber());
114
115      this.resourceRequestStage.onNext(this.computeRequest);
116    }
117
118    this.resourceRequestStage.onNext(getDataLoadingRequest());
119  }
120
121  private EvaluatorRequest getDataLoadingRequest() {
122    return EvaluatorRequest.newBuilder()
123        .setNumber(this.dataLoadingService.getNumberOfPartitions())
124        .setMemory(this.dataEvalMemoryMB)
125        .setNumberOfCores(this.dataEvalCore)
126        .build();
127  }
128
129  public class StartHandler implements EventHandler<StartTime> {
130    @Override
131    public void onNext(final StartTime startTime) {
132      LOG.log(Level.INFO, "StartTime: {0}", startTime);
133      resourceRequestHandler.releaseResourceRequestGate();
134    }
135  }
136
137  public class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
138
139    @Override
140    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
141
142      final String evalId = allocatedEvaluator.getId();
143      LOG.log(Level.FINEST, "Allocated evaluator: {0}", evalId);
144
145      if (!failedComputeEvalConfigs.isEmpty()) {
146        LOG.log(Level.FINE, "Failed Compute requests need to be satisfied for {0}", evalId);
147        final Configuration conf = failedComputeEvalConfigs.poll();
148        if (conf != null) {
149          LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
150          allocatedEvaluator.submitContext(conf);
151          submittedComputeEvalConfigs.put(evalId, conf);
152          return;
153        }
154      }
155
156      if (!failedDataEvalConfigs.isEmpty()) {
157        LOG.log(Level.FINE, "Failed Data requests need to be satisfied for {0}", evalId);
158        final Pair<Configuration, Configuration> confPair = failedDataEvalConfigs.poll();
159        if (confPair != null) {
160          LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
161          allocatedEvaluator.submitContextAndService(confPair.first, confPair.second);
162          submittedDataEvalConfigs.put(evalId, confPair);
163          return;
164        }
165      }
166
167      final int evaluatorsForComputeRequest = numComputeRequestsToSubmit.decrementAndGet();
168      LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest);
169
170      if (evaluatorsForComputeRequest >= 0) {
171        try {
172          final Configuration idConfiguration = ContextConfiguration.CONF
173              .set(ContextConfiguration.IDENTIFIER,
174                  dataLoadingService.getComputeContextIdPrefix() + evaluatorsForComputeRequest)
175              .build();
176          LOG.log(Level.FINE, "Submitting Compute Context to {0}", evalId);
177          allocatedEvaluator.submitContext(idConfiguration);
178          submittedComputeEvalConfigs.put(allocatedEvaluator.getId(), idConfiguration);
179          if (evaluatorsForComputeRequest == 0) {
180            LOG.log(Level.FINE, "All Compute requests satisfied. Releasing gate");
181            resourceRequestHandler.releaseResourceRequestGate();
182          }
183        } catch (final BindException e) {
184          throw new RuntimeException("Unable to bind context id for Compute request", e);
185        }
186
187      } else {
188
189        final Pair<Configuration, Configuration> confPair = new Pair<>(
190            dataLoadingService.getContextConfiguration(allocatedEvaluator),
191            dataLoadingService.getServiceConfiguration(allocatedEvaluator));
192
193        LOG.log(Level.FINE, "Submitting data loading context to {0}", evalId);
194        allocatedEvaluator.submitContextAndService(confPair.first, confPair.second);
195        submittedDataEvalConfigs.put(allocatedEvaluator.getId(), confPair);
196      }
197    }
198  }
199
200  public class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
201    @Override
202    public void onNext(final FailedEvaluator failedEvaluator) {
203
204      final String evalId = failedEvaluator.getId();
205
206      final Configuration computeConfig = submittedComputeEvalConfigs.remove(evalId);
207      if (computeConfig != null) {
208
209        LOG.log(Level.INFO, "Received failed compute evaluator: {0}", evalId);
210        failedComputeEvalConfigs.add(computeConfig);
211
212        requestor.submit(EvaluatorRequest.newBuilder()
213            .setMemory(computeEvalMemoryMB).setNumber(1).setNumberOfCores(computeEvalCore).build());
214
215      } else {
216
217        final Pair<Configuration, Configuration> confPair = submittedDataEvalConfigs.remove(evalId);
218        if (confPair != null) {
219
220          LOG.log(Level.INFO, "Received failed data evaluator: {0}", evalId);
221          failedDataEvalConfigs.add(confPair);
222
223          requestor.submit(EvaluatorRequest.newBuilder()
224              .setMemory(dataEvalMemoryMB).setNumber(1).setNumberOfCores(dataEvalCore).build());
225
226        } else {
227
228          LOG.log(Level.SEVERE, "Received unknown failed evaluator " + evalId,
229              failedEvaluator.getEvaluatorException());
230
231          throw new RuntimeException("Received failed evaluator that I did not submit: " + evalId);
232        }
233      }
234    }
235  }
236}