This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.evaluator;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.annotations.audience.TaskSide;
023import org.apache.reef.tang.annotations.Parameter;
024import org.apache.reef.tang.annotations.Unit;
025import org.apache.reef.task.HeartBeatTriggerManager;
026import org.apache.reef.task.Task;
027import org.apache.reef.task.TaskMessage;
028import org.apache.reef.task.TaskMessageSource;
029import org.apache.reef.task.events.CloseEvent;
030import org.apache.reef.task.events.DriverMessage;
031import org.apache.reef.util.Optional;
032import org.apache.reef.vortex.common.KryoUtils;
033import org.apache.reef.vortex.protocol.mastertoworker.*;
034import org.apache.reef.vortex.protocol.workertomaster.*;
035import org.apache.reef.vortex.driver.VortexWorkerConf;
036import org.apache.reef.wake.EventHandler;
037
038import javax.inject.Inject;
039import java.util.ArrayList;
040import java.util.List;
041import java.util.concurrent.*;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Receives commands from VortexMaster, executes them, and returns the results.
047 * TODO[REEF-503]: Basic Vortex profiling.
048 */
049@Unstable
050@Unit
051@TaskSide
052public final class VortexWorker implements Task, TaskMessageSource {
053  private static final Logger LOG = Logger.getLogger(VortexWorker.class.getName());
054  private static final String MESSAGE_SOURCE_ID = ""; // empty string as there is no use for it
055
056  private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>();
057  private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque<>();
058  private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap<>();
059
060  private final KryoUtils kryoUtils;
061  private final HeartBeatTriggerManager heartBeatTriggerManager;
062  private final int numOfThreads;
063  private final CountDownLatch terminated = new CountDownLatch(1);
064
065  @Inject
066  private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager,
067                       final KryoUtils kryoUtils,
068                       @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) {
069    this.heartBeatTriggerManager = heartBeatTriggerManager;
070    this.kryoUtils = kryoUtils;
071    this.numOfThreads = numOfThreads;
072  }
073
074  /**
075   * Starts the scheduler and executor and waits until termination.
076   */
077  @Override
078  public byte[] call(final byte[] memento) throws Exception {
079    final ExecutorService schedulerThread = Executors.newSingleThreadExecutor();
080    final ExecutorService commandExecutor = Executors.newFixedThreadPool(numOfThreads);
081    final ConcurrentMap<Integer, Future> futures = new ConcurrentHashMap<>();
082
083    // Scheduling thread starts
084    schedulerThread.execute(new Runnable() {
085      @SuppressWarnings("InfiniteLoopStatement") // Scheduler is supposed to run forever.
086      @Override
087      public void run() {
088        while (true) {
089          // Scheduler Thread: Pick a command to execute (For now, simple FIFO order)
090          final byte[] message;
091          try {
092            message = pendingRequests.takeFirst();
093          } catch (InterruptedException e) {
094            throw new RuntimeException(e);
095          }
096
097          // Command Executor: Deserialize the command
098          final MasterToWorkerRequest masterToWorkerRequest = (MasterToWorkerRequest)kryoUtils.deserialize(message);
099
100          switch (masterToWorkerRequest.getType()) {
101            case AggregateTasklets:
102              final TaskletAggregationRequest taskletAggregationRequest =
103                  (TaskletAggregationRequest) masterToWorkerRequest;
104              aggregates.put(taskletAggregationRequest.getAggregateFunctionId(),
105                  new AggregateContainer(heartBeatTriggerManager, kryoUtils, workerReports,
106                      taskletAggregationRequest));
107              break;
108            case ExecuteAggregateTasklet:
109              executeAggregateTasklet(commandExecutor, masterToWorkerRequest);
110              break;
111            case ExecuteTasklet:
112              executeTasklet(commandExecutor, futures, masterToWorkerRequest);
113              break;
114            case CancelTasklet:
115              final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) masterToWorkerRequest;
116              LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", cancellationRequest.getTaskletId());
117              final Future future = futures.get(cancellationRequest.getTaskletId());
118              if (future != null) {
119                future.cancel(true);
120              }
121              break;
122            default:
123              throw new RuntimeException("Unknown Command");
124          }
125        }
126      }
127    });
128
129    terminated.await();
130    return null;
131  }
132
133  /**
134   * Executes an tasklet request from the {@link org.apache.reef.vortex.driver.VortexDriver}.
135   */
136  private void executeTasklet(final ExecutorService commandExecutor,
137                              final ConcurrentMap<Integer, Future> futures,
138                              final MasterToWorkerRequest masterToWorkerRequest) {
139    final CountDownLatch latch = new CountDownLatch(1);
140    final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) masterToWorkerRequest;
141
142    // Scheduler Thread: Pass the command to the worker thread pool to be executed
143    // Record future to support cancellation.
144    futures.put(
145        taskletExecutionRequest.getTaskletId(),
146        commandExecutor.submit(new Runnable() {
147          @Override
148          public void run() {
149            final WorkerToMasterReports reports;
150            final List<WorkerToMasterReport> holder = new ArrayList<>();
151
152            try {
153              // Command Executor: Execute the command
154              final WorkerToMasterReport workerToMasterReport =
155                  new TaskletResultReport(taskletExecutionRequest.getTaskletId(), taskletExecutionRequest.execute());
156              holder.add(workerToMasterReport);
157            } catch (final InterruptedException ex) {
158              // Assumes that user's thread follows convention that cancelled Futures
159              // should throw InterruptedException.
160              final WorkerToMasterReport workerToMasterReport =
161                  new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
162              LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", taskletExecutionRequest.getTaskletId());
163              holder.add(workerToMasterReport);
164            } catch (Exception e) {
165              // Command Executor: Tasklet throws an exception
166              final WorkerToMasterReport workerToMasterReport =
167                  new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
168              holder.add(workerToMasterReport);
169            }
170
171            reports = new WorkerToMasterReports(holder);
172            workerReports.addLast(kryoUtils.serialize(reports));
173            try {
174              latch.await();
175            } catch (final InterruptedException e) {
176              LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
177              throw new RuntimeException(e);
178            }
179            futures.remove(taskletExecutionRequest.getTaskletId());
180            heartBeatTriggerManager.triggerHeartBeat();
181          }
182        }));
183
184    // Signal that future is put.
185    latch.countDown();
186  }
187
188  /**
189   * Executes an aggregation request from the {@link org.apache.reef.vortex.driver.VortexDriver}.
190   */
191  private void executeAggregateTasklet(final ExecutorService commandExecutor,
192                                       final MasterToWorkerRequest masterToWorkerRequest) {
193    final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest =
194        (TaskletAggregateExecutionRequest) masterToWorkerRequest;
195
196    assert aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId());
197
198    final AggregateContainer aggregateContainer =
199        aggregates.get(taskletAggregateExecutionRequest.getAggregateFunctionId());
200    final TaskletAggregationRequest aggregationRequest = aggregateContainer.getTaskletAggregationRequest();
201
202    commandExecutor.submit(new Runnable() {
203      @Override
204      public void run() {
205        try {
206          aggregateContainer.scheduleTasklet(taskletAggregateExecutionRequest.getTaskletId());
207          final Object result = aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput());
208          aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), result);
209        } catch (final Exception e) {
210          aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e);
211        }
212      }
213    });
214  }
215
216  /**
217   * @return the workerReport the worker wishes to send.
218   */
219  @Override
220  public Optional<TaskMessage> getMessage() {
221    final byte[] msg = workerReports.pollFirst();
222    if (msg != null) {
223      return Optional.of(TaskMessage.from(MESSAGE_SOURCE_ID, msg));
224    } else {
225      return Optional.empty();
226    }
227  }
228
229  /**
230   * Handle requests from Vortex Master.
231   */
232  public final class DriverMessageHandler implements EventHandler<DriverMessage> {
233    @Override
234    public void onNext(final DriverMessage message) {
235      if (message.get().isPresent()) {
236        pendingRequests.addLast(message.get().get());
237      }
238    }
239  }
240
241  /**
242   * Shut down this worker.
243   */
244  public final class TaskCloseHandler implements EventHandler<CloseEvent> {
245    @Override
246    public void onNext(final CloseEvent closeEvent) {
247      terminated.countDown();
248    }
249  }
250}