001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.evaluator; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.annotations.audience.TaskSide; 023import org.apache.reef.tang.annotations.Parameter; 024import org.apache.reef.tang.annotations.Unit; 025import org.apache.reef.task.HeartBeatTriggerManager; 026import org.apache.reef.task.Task; 027import org.apache.reef.task.TaskMessage; 028import org.apache.reef.task.TaskMessageSource; 029import org.apache.reef.task.events.CloseEvent; 030import org.apache.reef.task.events.DriverMessage; 031import org.apache.reef.util.Optional; 032import org.apache.reef.vortex.common.*; 033import org.apache.reef.vortex.common.AggregateFunctionRepository; 034import org.apache.reef.vortex.driver.VortexWorkerConf; 035import org.apache.reef.wake.EventHandler; 036 037import javax.inject.Inject; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.concurrent.*; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Receives commands from VortexMaster, executes them, and returns the results. 046 * TODO[REEF-503]: Basic Vortex profiling. 047 */ 048@Unstable 049@Unit 050@TaskSide 051public final class VortexWorker implements Task, TaskMessageSource { 052 private static final Logger LOG = Logger.getLogger(VortexWorker.class.getName()); 053 private static final String MESSAGE_SOURCE_ID = ""; // empty string as there is no use for it 054 055 private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>(); 056 private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque<>(); 057 private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap<>(); 058 059 private final AggregateFunctionRepository aggregateFunctionRepository; 060 private final VortexAvroUtils vortexAvroUtils; 061 private final HeartBeatTriggerManager heartBeatTriggerManager; 062 private final int numOfThreads; 063 private final CountDownLatch terminated = new CountDownLatch(1); 064 065 @Inject 066 private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager, 067 final AggregateFunctionRepository aggregateFunctionRepository, 068 final VortexAvroUtils vortexAvroUtils, 069 @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) { 070 this.heartBeatTriggerManager = heartBeatTriggerManager; 071 this.aggregateFunctionRepository = aggregateFunctionRepository; 072 this.vortexAvroUtils = vortexAvroUtils; 073 this.numOfThreads = numOfThreads; 074 } 075 076 /** 077 * Starts the scheduler and executor and waits until termination. 078 */ 079 @Override 080 public byte[] call(final byte[] memento) throws Exception { 081 final ExecutorService schedulerThread = Executors.newSingleThreadExecutor(); 082 final ExecutorService commandExecutor = Executors.newFixedThreadPool(numOfThreads); 083 final ConcurrentMap<Integer, Future> futures = new ConcurrentHashMap<>(); 084 085 // Scheduling thread starts 086 schedulerThread.execute(new Runnable() { 087 @SuppressWarnings("InfiniteLoopStatement") // Scheduler is supposed to run forever. 088 @Override 089 public void run() { 090 while (true) { 091 // Scheduler Thread: Pick a command to execute (For now, simple FIFO order) 092 final byte[] message; 093 try { 094 message = pendingRequests.takeFirst(); 095 } catch (InterruptedException e) { 096 throw new RuntimeException(e); 097 } 098 099 // Command Executor: Deserialize the command 100 final VortexRequest vortexRequest = vortexAvroUtils.toVortexRequest(message); 101 102 switch (vortexRequest.getType()) { 103 case AggregateTasklets: 104 final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest; 105 aggregates.put(taskletAggregationRequest.getAggregateFunctionId(), 106 new AggregateContainer(heartBeatTriggerManager, vortexAvroUtils, workerReports, 107 taskletAggregationRequest)); 108 109 // VortexFunctions need to be put into the repository such that VortexAvroUtils will know how to 110 // convert inputs and functions into a VortexRequest on subsequent messages requesting to 111 // execute the aggregateable tasklets. 112 aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(), 113 taskletAggregationRequest.getAggregateFunction(), taskletAggregationRequest.getFunction(), 114 taskletAggregationRequest.getPolicy()); 115 break; 116 case ExecuteAggregateTasklet: 117 executeAggregateTasklet(commandExecutor, vortexRequest); 118 break; 119 case ExecuteTasklet: 120 executeTasklet(commandExecutor, futures, vortexRequest); 121 break; 122 case CancelTasklet: 123 final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) vortexRequest; 124 LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", cancellationRequest.getTaskletId()); 125 final Future future = futures.get(cancellationRequest.getTaskletId()); 126 if (future != null) { 127 future.cancel(true); 128 } 129 break; 130 default: 131 throw new RuntimeException("Unknown Command"); 132 } 133 } 134 } 135 }); 136 137 terminated.await(); 138 return null; 139 } 140 141 /** 142 * Executes an tasklet request from the {@link org.apache.reef.vortex.driver.VortexDriver}. 143 */ 144 private void executeTasklet(final ExecutorService commandExecutor, 145 final ConcurrentMap<Integer, Future> futures, 146 final VortexRequest vortexRequest) { 147 final CountDownLatch latch = new CountDownLatch(1); 148 final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; 149 150 // Scheduler Thread: Pass the command to the worker thread pool to be executed 151 // Record future to support cancellation. 152 futures.put( 153 taskletExecutionRequest.getTaskletId(), 154 commandExecutor.submit(new Runnable() { 155 @Override 156 public void run() { 157 final WorkerReport workerReport; 158 final List<TaskletReport> taskletReports = new ArrayList<>(); 159 160 try { 161 // Command Executor: Execute the command 162 final TaskletReport taskletReport = 163 new TaskletResultReport(taskletExecutionRequest.getTaskletId(), 164 taskletExecutionRequest.execute()); 165 taskletReports.add(taskletReport); 166 } catch (final InterruptedException ex) { 167 // Assumes that user's thread follows convention that cancelled Futures 168 // should throw InterruptedException. 169 final TaskletReport taskletReport = 170 new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); 171 LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", 172 taskletExecutionRequest.getTaskletId()); 173 taskletReports.add(taskletReport); 174 } catch (Exception e) { 175 // Command Executor: Tasklet throws an exception 176 final TaskletReport taskletReport = 177 new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); 178 taskletReports.add(taskletReport); 179 } 180 181 workerReport = new WorkerReport(taskletReports); 182 workerReports.addLast(vortexAvroUtils.toBytes(workerReport)); 183 try { 184 latch.await(); 185 } catch (final InterruptedException e) { 186 LOG.log(Level.SEVERE, "Cannot wait for Future to be put."); 187 throw new RuntimeException(e); 188 } 189 futures.remove(taskletExecutionRequest.getTaskletId()); 190 heartBeatTriggerManager.triggerHeartBeat(); 191 } 192 })); 193 194 // Signal that future is put. 195 latch.countDown(); 196 } 197 198 /** 199 * Executes an aggregation request from the {@link org.apache.reef.vortex.driver.VortexDriver}. 200 */ 201 private void executeAggregateTasklet(final ExecutorService commandExecutor, 202 final VortexRequest vortexRequest) { 203 final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest = 204 (TaskletAggregateExecutionRequest) vortexRequest; 205 206 assert aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId()); 207 208 final AggregateContainer aggregateContainer = aggregates.get( 209 taskletAggregateExecutionRequest.getAggregateFunctionId()); 210 final TaskletAggregationRequest aggregationRequest = aggregateContainer.getTaskletAggregationRequest(); 211 212 commandExecutor.submit(new Runnable() { 213 @Override 214 public void run() { 215 try { 216 aggregateContainer.scheduleTasklet(taskletAggregateExecutionRequest.getTaskletId()); 217 final Object result = aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput()); 218 aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), result); 219 } catch (final Exception e) { 220 aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e); 221 } 222 } 223 }); 224 } 225 226 /** 227 * @return the workerReport the worker wishes to send. 228 */ 229 @Override 230 public Optional<TaskMessage> getMessage() { 231 final byte[] msg = workerReports.pollFirst(); 232 if (msg != null) { 233 return Optional.of(TaskMessage.from(MESSAGE_SOURCE_ID, msg)); 234 } else { 235 return Optional.empty(); 236 } 237 } 238 239 /** 240 * Handle requests from Vortex Master. 241 */ 242 public final class DriverMessageHandler implements EventHandler<DriverMessage> { 243 @Override 244 public void onNext(final DriverMessage message) { 245 if (message.get().isPresent()) { 246 pendingRequests.addLast(message.get().get()); 247 } 248 } 249 } 250 251 /** 252 * Shut down this worker. 253 */ 254 public final class TaskCloseHandler implements EventHandler<CloseEvent> { 255 @Override 256 public void onNext(final CloseEvent closeEvent) { 257 terminated.countDown(); 258 } 259 } 260}