This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.vortex.evaluator;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.annotations.audience.TaskSide;
023import org.apache.reef.tang.annotations.Parameter;
024import org.apache.reef.tang.annotations.Unit;
025import org.apache.reef.task.HeartBeatTriggerManager;
026import org.apache.reef.task.Task;
027import org.apache.reef.task.TaskMessage;
028import org.apache.reef.task.TaskMessageSource;
029import org.apache.reef.task.events.CloseEvent;
030import org.apache.reef.task.events.DriverMessage;
031import org.apache.reef.util.Optional;
032import org.apache.reef.vortex.common.*;
033import org.apache.reef.vortex.common.AggregateFunctionRepository;
034import org.apache.reef.vortex.driver.VortexWorkerConf;
035import org.apache.reef.wake.EventHandler;
036
037import javax.inject.Inject;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.concurrent.*;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Receives commands from VortexMaster, executes them, and returns the results.
046 * TODO[REEF-503]: Basic Vortex profiling.
047 */
048@Unstable
049@Unit
050@TaskSide
051public final class VortexWorker implements Task, TaskMessageSource {
052  private static final Logger LOG = Logger.getLogger(VortexWorker.class.getName());
053  private static final String MESSAGE_SOURCE_ID = ""; // empty string as there is no use for it
054
055  private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>();
056  private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque<>();
057  private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap<>();
058
059  private final AggregateFunctionRepository aggregateFunctionRepository;
060  private final VortexAvroUtils vortexAvroUtils;
061  private final HeartBeatTriggerManager heartBeatTriggerManager;
062  private final int numOfThreads;
063  private final CountDownLatch terminated = new CountDownLatch(1);
064
065  @Inject
066  private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager,
067                       final AggregateFunctionRepository aggregateFunctionRepository,
068                       final VortexAvroUtils vortexAvroUtils,
069                       @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) {
070    this.heartBeatTriggerManager = heartBeatTriggerManager;
071    this.aggregateFunctionRepository = aggregateFunctionRepository;
072    this.vortexAvroUtils = vortexAvroUtils;
073    this.numOfThreads = numOfThreads;
074  }
075
076  /**
077   * Starts the scheduler and executor and waits until termination.
078   */
079  @Override
080  public byte[] call(final byte[] memento) throws Exception {
081    final ExecutorService schedulerThread = Executors.newSingleThreadExecutor();
082    final ExecutorService commandExecutor = Executors.newFixedThreadPool(numOfThreads);
083    final ConcurrentMap<Integer, Future> futures = new ConcurrentHashMap<>();
084
085    // Scheduling thread starts
086    schedulerThread.execute(new Runnable() {
087      @SuppressWarnings("InfiniteLoopStatement") // Scheduler is supposed to run forever.
088      @Override
089      public void run() {
090        while (true) {
091          // Scheduler Thread: Pick a command to execute (For now, simple FIFO order)
092          final byte[] message;
093          try {
094            message = pendingRequests.takeFirst();
095          } catch (InterruptedException e) {
096            throw new RuntimeException(e);
097          }
098
099          // Command Executor: Deserialize the command
100          final VortexRequest vortexRequest = vortexAvroUtils.toVortexRequest(message);
101
102          switch (vortexRequest.getType()) {
103            case AggregateTasklets:
104              final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest;
105              aggregates.put(taskletAggregationRequest.getAggregateFunctionId(),
106                  new AggregateContainer(heartBeatTriggerManager, vortexAvroUtils, workerReports,
107                      taskletAggregationRequest));
108
109              // VortexFunctions need to be put into the repository such that VortexAvroUtils will know how to
110              // convert inputs and functions into a VortexRequest on subsequent messages requesting to
111              // execute the aggregateable tasklets.
112              aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(),
113                  taskletAggregationRequest.getAggregateFunction(), taskletAggregationRequest.getFunction(),
114                  taskletAggregationRequest.getPolicy());
115              break;
116            case ExecuteAggregateTasklet:
117              executeAggregateTasklet(commandExecutor, vortexRequest);
118              break;
119            case ExecuteTasklet:
120              executeTasklet(commandExecutor, futures, vortexRequest);
121              break;
122            case CancelTasklet:
123              final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) vortexRequest;
124              LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", cancellationRequest.getTaskletId());
125              final Future future = futures.get(cancellationRequest.getTaskletId());
126              if (future != null) {
127                future.cancel(true);
128              }
129              break;
130            default:
131              throw new RuntimeException("Unknown Command");
132          }
133        }
134      }
135    });
136
137    terminated.await();
138    return null;
139  }
140
141  /**
142   * Executes an tasklet request from the {@link org.apache.reef.vortex.driver.VortexDriver}.
143   */
144  private void executeTasklet(final ExecutorService commandExecutor,
145                              final ConcurrentMap<Integer, Future> futures,
146                              final VortexRequest vortexRequest) {
147    final CountDownLatch latch = new CountDownLatch(1);
148    final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
149
150    // Scheduler Thread: Pass the command to the worker thread pool to be executed
151    // Record future to support cancellation.
152    futures.put(
153        taskletExecutionRequest.getTaskletId(),
154        commandExecutor.submit(new Runnable() {
155          @Override
156          public void run() {
157            final WorkerReport workerReport;
158            final List<TaskletReport> taskletReports = new ArrayList<>();
159
160            try {
161              // Command Executor: Execute the command
162              final TaskletReport taskletReport =
163                  new TaskletResultReport(taskletExecutionRequest.getTaskletId(),
164                      taskletExecutionRequest.execute());
165              taskletReports.add(taskletReport);
166            } catch (final InterruptedException ex) {
167              // Assumes that user's thread follows convention that cancelled Futures
168              // should throw InterruptedException.
169              final TaskletReport taskletReport =
170                  new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
171              LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled",
172                  taskletExecutionRequest.getTaskletId());
173              taskletReports.add(taskletReport);
174            } catch (Exception e) {
175              // Command Executor: Tasklet throws an exception
176              final TaskletReport taskletReport =
177                  new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
178              taskletReports.add(taskletReport);
179            }
180
181            workerReport = new WorkerReport(taskletReports);
182            workerReports.addLast(vortexAvroUtils.toBytes(workerReport));
183            try {
184              latch.await();
185            } catch (final InterruptedException e) {
186              LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
187              throw new RuntimeException(e);
188            }
189            futures.remove(taskletExecutionRequest.getTaskletId());
190            heartBeatTriggerManager.triggerHeartBeat();
191          }
192        }));
193
194    // Signal that future is put.
195    latch.countDown();
196  }
197
198  /**
199   * Executes an aggregation request from the {@link org.apache.reef.vortex.driver.VortexDriver}.
200   */
201  private void executeAggregateTasklet(final ExecutorService commandExecutor,
202                                       final VortexRequest vortexRequest) {
203    final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest =
204        (TaskletAggregateExecutionRequest) vortexRequest;
205
206    assert aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId());
207
208    final AggregateContainer aggregateContainer = aggregates.get(
209        taskletAggregateExecutionRequest.getAggregateFunctionId());
210    final TaskletAggregationRequest aggregationRequest = aggregateContainer.getTaskletAggregationRequest();
211
212    commandExecutor.submit(new Runnable() {
213      @Override
214      public void run() {
215        try {
216          aggregateContainer.scheduleTasklet(taskletAggregateExecutionRequest.getTaskletId());
217          final Object result = aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput());
218          aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), result);
219        } catch (final Exception e) {
220          aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e);
221        }
222      }
223    });
224  }
225
226  /**
227   * @return the workerReport the worker wishes to send.
228   */
229  @Override
230  public Optional<TaskMessage> getMessage() {
231    final byte[] msg = workerReports.pollFirst();
232    if (msg != null) {
233      return Optional.of(TaskMessage.from(MESSAGE_SOURCE_ID, msg));
234    } else {
235      return Optional.empty();
236    }
237  }
238
239  /**
240   * Handle requests from Vortex Master.
241   */
242  public final class DriverMessageHandler implements EventHandler<DriverMessage> {
243    @Override
244    public void onNext(final DriverMessage message) {
245      if (message.get().isPresent()) {
246        pendingRequests.addLast(message.get().get());
247      }
248    }
249  }
250
251  /**
252   * Shut down this worker.
253   */
254  public final class TaskCloseHandler implements EventHandler<CloseEvent> {
255    @Override
256    public void onNext(final CloseEvent closeEvent) {
257      terminated.countDown();
258    }
259  }
260}