001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.evaluator; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.annotations.audience.TaskSide; 023import org.apache.reef.tang.annotations.Parameter; 024import org.apache.reef.tang.annotations.Unit; 025import org.apache.reef.task.HeartBeatTriggerManager; 026import org.apache.reef.task.Task; 027import org.apache.reef.task.TaskMessage; 028import org.apache.reef.task.TaskMessageSource; 029import org.apache.reef.task.events.CloseEvent; 030import org.apache.reef.task.events.DriverMessage; 031import org.apache.reef.util.Optional; 032import org.apache.reef.vortex.common.KryoUtils; 033import org.apache.reef.vortex.protocol.mastertoworker.*; 034import org.apache.reef.vortex.protocol.workertomaster.*; 035import org.apache.reef.vortex.driver.VortexWorkerConf; 036import org.apache.reef.wake.EventHandler; 037 038import javax.inject.Inject; 039import java.util.ArrayList; 040import java.util.List; 041import java.util.concurrent.*; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Receives commands from VortexMaster, executes them, and returns the results. 047 * TODO[REEF-503]: Basic Vortex profiling. 048 */ 049@Unstable 050@Unit 051@TaskSide 052public final class VortexWorker implements Task, TaskMessageSource { 053 private static final Logger LOG = Logger.getLogger(VortexWorker.class.getName()); 054 private static final String MESSAGE_SOURCE_ID = ""; // empty string as there is no use for it 055 056 private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>(); 057 private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque<>(); 058 private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap<>(); 059 060 private final KryoUtils kryoUtils; 061 private final HeartBeatTriggerManager heartBeatTriggerManager; 062 private final int numOfThreads; 063 private final CountDownLatch terminated = new CountDownLatch(1); 064 065 @Inject 066 private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager, 067 final KryoUtils kryoUtils, 068 @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) { 069 this.heartBeatTriggerManager = heartBeatTriggerManager; 070 this.kryoUtils = kryoUtils; 071 this.numOfThreads = numOfThreads; 072 } 073 074 /** 075 * Starts the scheduler and executor and waits until termination. 076 */ 077 @Override 078 public byte[] call(final byte[] memento) throws Exception { 079 final ExecutorService schedulerThread = Executors.newSingleThreadExecutor(); 080 final ExecutorService commandExecutor = Executors.newFixedThreadPool(numOfThreads); 081 final ConcurrentMap<Integer, Future> futures = new ConcurrentHashMap<>(); 082 083 // Scheduling thread starts 084 schedulerThread.execute(new Runnable() { 085 @SuppressWarnings("InfiniteLoopStatement") // Scheduler is supposed to run forever. 086 @Override 087 public void run() { 088 while (true) { 089 // Scheduler Thread: Pick a command to execute (For now, simple FIFO order) 090 final byte[] message; 091 try { 092 message = pendingRequests.takeFirst(); 093 } catch (InterruptedException e) { 094 throw new RuntimeException(e); 095 } 096 097 // Command Executor: Deserialize the command 098 final MasterToWorkerRequest masterToWorkerRequest = (MasterToWorkerRequest)kryoUtils.deserialize(message); 099 100 switch (masterToWorkerRequest.getType()) { 101 case AggregateTasklets: 102 final TaskletAggregationRequest taskletAggregationRequest = 103 (TaskletAggregationRequest) masterToWorkerRequest; 104 aggregates.put(taskletAggregationRequest.getAggregateFunctionId(), 105 new AggregateContainer(heartBeatTriggerManager, kryoUtils, workerReports, 106 taskletAggregationRequest)); 107 break; 108 case ExecuteAggregateTasklet: 109 executeAggregateTasklet(commandExecutor, masterToWorkerRequest); 110 break; 111 case ExecuteTasklet: 112 executeTasklet(commandExecutor, futures, masterToWorkerRequest); 113 break; 114 case CancelTasklet: 115 final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) masterToWorkerRequest; 116 LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", cancellationRequest.getTaskletId()); 117 final Future future = futures.get(cancellationRequest.getTaskletId()); 118 if (future != null) { 119 future.cancel(true); 120 } 121 break; 122 default: 123 throw new RuntimeException("Unknown Command"); 124 } 125 } 126 } 127 }); 128 129 terminated.await(); 130 return null; 131 } 132 133 /** 134 * Executes an tasklet request from the {@link org.apache.reef.vortex.driver.VortexDriver}. 135 */ 136 private void executeTasklet(final ExecutorService commandExecutor, 137 final ConcurrentMap<Integer, Future> futures, 138 final MasterToWorkerRequest masterToWorkerRequest) { 139 final CountDownLatch latch = new CountDownLatch(1); 140 final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) masterToWorkerRequest; 141 142 // Scheduler Thread: Pass the command to the worker thread pool to be executed 143 // Record future to support cancellation. 144 futures.put( 145 taskletExecutionRequest.getTaskletId(), 146 commandExecutor.submit(new Runnable() { 147 @Override 148 public void run() { 149 final WorkerToMasterReports reports; 150 final List<WorkerToMasterReport> holder = new ArrayList<>(); 151 152 try { 153 // Command Executor: Execute the command 154 final WorkerToMasterReport workerToMasterReport = 155 new TaskletResultReport(taskletExecutionRequest.getTaskletId(), taskletExecutionRequest.execute()); 156 holder.add(workerToMasterReport); 157 } catch (final InterruptedException ex) { 158 // Assumes that user's thread follows convention that cancelled Futures 159 // should throw InterruptedException. 160 final WorkerToMasterReport workerToMasterReport = 161 new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); 162 LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", taskletExecutionRequest.getTaskletId()); 163 holder.add(workerToMasterReport); 164 } catch (Exception e) { 165 // Command Executor: Tasklet throws an exception 166 final WorkerToMasterReport workerToMasterReport = 167 new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); 168 holder.add(workerToMasterReport); 169 } 170 171 reports = new WorkerToMasterReports(holder); 172 workerReports.addLast(kryoUtils.serialize(reports)); 173 try { 174 latch.await(); 175 } catch (final InterruptedException e) { 176 LOG.log(Level.SEVERE, "Cannot wait for Future to be put."); 177 throw new RuntimeException(e); 178 } 179 futures.remove(taskletExecutionRequest.getTaskletId()); 180 heartBeatTriggerManager.triggerHeartBeat(); 181 } 182 })); 183 184 // Signal that future is put. 185 latch.countDown(); 186 } 187 188 /** 189 * Executes an aggregation request from the {@link org.apache.reef.vortex.driver.VortexDriver}. 190 */ 191 private void executeAggregateTasklet(final ExecutorService commandExecutor, 192 final MasterToWorkerRequest masterToWorkerRequest) { 193 final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest = 194 (TaskletAggregateExecutionRequest) masterToWorkerRequest; 195 196 assert aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId()); 197 198 final AggregateContainer aggregateContainer = 199 aggregates.get(taskletAggregateExecutionRequest.getAggregateFunctionId()); 200 final TaskletAggregationRequest aggregationRequest = aggregateContainer.getTaskletAggregationRequest(); 201 202 commandExecutor.submit(new Runnable() { 203 @Override 204 public void run() { 205 try { 206 aggregateContainer.scheduleTasklet(taskletAggregateExecutionRequest.getTaskletId()); 207 final Object result = aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput()); 208 aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), result); 209 } catch (final Exception e) { 210 aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e); 211 } 212 } 213 }); 214 } 215 216 /** 217 * @return the workerReport the worker wishes to send. 218 */ 219 @Override 220 public Optional<TaskMessage> getMessage() { 221 final byte[] msg = workerReports.pollFirst(); 222 if (msg != null) { 223 return Optional.of(TaskMessage.from(MESSAGE_SOURCE_ID, msg)); 224 } else { 225 return Optional.empty(); 226 } 227 } 228 229 /** 230 * Handle requests from Vortex Master. 231 */ 232 public final class DriverMessageHandler implements EventHandler<DriverMessage> { 233 @Override 234 public void onNext(final DriverMessage message) { 235 if (message.get().isPresent()) { 236 pendingRequests.addLast(message.get().get()); 237 } 238 } 239 } 240 241 /** 242 * Shut down this worker. 243 */ 244 public final class TaskCloseHandler implements EventHandler<CloseEvent> { 245 @Override 246 public void onNext(final CloseEvent closeEvent) { 247 terminated.countDown(); 248 } 249 } 250}