001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.driver.client.JobMessageObserver; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.evaluator.AllocatedEvaluator; 025import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 026import org.apache.reef.driver.evaluator.EvaluatorRequest; 027import org.apache.reef.driver.evaluator.EvaluatorRequestor; 028import org.apache.reef.driver.task.*; 029import org.apache.reef.io.checkpoint.fs.FSCheckPointServiceConfiguration; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.JavaConfigurationBuilder; 032import org.apache.reef.tang.Tang; 033import org.apache.reef.tang.annotations.Parameter; 034import org.apache.reef.tang.annotations.Unit; 035import org.apache.reef.tang.exceptions.BindException; 036import org.apache.reef.wake.EventHandler; 037import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 038import org.apache.reef.wake.time.event.StartTime; 039import org.apache.reef.wake.time.event.StopTime; 040 041import javax.inject.Inject; 042import javax.xml.bind.DatatypeConverter; 043import java.util.Collections; 044import java.util.HashMap; 045import java.util.Map; 046import java.util.logging.Level; 047import java.util.logging.Logger; 048 049/** 050 * Suspend/resume example job driver. Execute a simple task in all evaluators, 051 * and send EvaluatorControlMessage suspend/resume events properly. 052 */ 053@Unit 054public class SuspendDriver { 055 056 /** 057 * Standard Java logger. 058 */ 059 private static final Logger LOG = Logger.getLogger(SuspendDriver.class.getName()); 060 061 /** 062 * Number of evaluators to request. 063 */ 064 private static final int NUM_EVALUATORS = 2; 065 066 /** 067 * String codec is used to encode the results driver sends to the client. 068 */ 069 private static final ObjectSerializableCodec<String> CODEC_STR = new ObjectSerializableCodec<>(); 070 071 /** 072 * Integer codec is used to decode the results driver gets from the tasks. 073 */ 074 private static final ObjectSerializableCodec<Integer> CODEC_INT = new ObjectSerializableCodec<>(); 075 076 /** 077 * Job observer on the client. 078 * We use it to send results from the driver back to the client. 079 */ 080 private final JobMessageObserver jobMessageObserver; 081 082 /** 083 * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks. 084 */ 085 private final EvaluatorRequestor evaluatorRequestor; 086 087 /** 088 * TANG Configuration of the Task. 089 */ 090 private final Configuration contextConfig; 091 092 /** 093 * Map from task ID (a string) to the TaskRuntime instance (that can be suspended). 094 */ 095 private final Map<String, RunningTask> runningTasks = 096 Collections.synchronizedMap(new HashMap<String, RunningTask>()); 097 098 /** 099 * Map from task ID (a string) to the SuspendedTask instance (that can be resumed). 100 */ 101 private final Map<String, SuspendedTask> suspendedTasks = new HashMap<>(); 102 103 /** 104 * Job driver constructor. 105 * All parameters are injected from TANG automatically. 106 * 107 * @param evaluatorRequestor is used to request Evaluators. 108 * @param numCycles number of cycles to run in the task. 109 * @param delay delay in seconds between cycles in the task. 110 */ 111 @Inject 112 SuspendDriver( 113 final JobMessageObserver jobMessageObserver, 114 final EvaluatorRequestor evaluatorRequestor, 115 @Parameter(Launch.Local.class) final boolean isLocal, 116 @Parameter(Launch.NumCycles.class) final int numCycles, 117 @Parameter(Launch.Delay.class) final int delay) { 118 119 this.jobMessageObserver = jobMessageObserver; 120 this.evaluatorRequestor = evaluatorRequestor; 121 122 try { 123 124 final Configuration checkpointServiceConfig = FSCheckPointServiceConfiguration.CONF 125 .set(FSCheckPointServiceConfiguration.IS_LOCAL, Boolean.toString(isLocal)) 126 .set(FSCheckPointServiceConfiguration.PATH, "/tmp") 127 .set(FSCheckPointServiceConfiguration.PREFIX, "reef-checkpoint-") 128 .set(FSCheckPointServiceConfiguration.REPLICATION_FACTOR, "3") 129 .build(); 130 131 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder() 132 .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles)) 133 .bindNamedParameter(Launch.Delay.class, Integer.toString(delay)); 134 135 cb.addConfiguration(checkpointServiceConfig); 136 this.contextConfig = cb.build(); 137 138 } catch (final BindException ex) { 139 throw new RuntimeException(ex); 140 } 141 } 142 143 /** 144 * Receive notification that the Task is ready to run. 145 */ 146 final class RunningTaskHandler implements EventHandler<RunningTask> { 147 @Override 148 public void onNext(final RunningTask task) { 149 LOG.log(Level.INFO, "Running task: {0}", task.getId()); 150 runningTasks.put(task.getId(), task); 151 jobMessageObserver.sendMessageToClient(CODEC_STR.encode("start task: " + task.getId())); 152 } 153 } 154 155 /** 156 * Receive notification that the Task has completed successfully. 157 */ 158 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 159 @Override 160 public void onNext(final CompletedTask task) { 161 162 final EvaluatorDescriptor e = task.getActiveContext().getEvaluatorDescriptor(); 163 final String msg = "Task completed " + task.getId() + " on node " + e; 164 LOG.info(msg); 165 166 jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg)); 167 runningTasks.remove(task.getId()); 168 task.getActiveContext().close(); 169 170 final boolean noTasks; 171 172 synchronized (suspendedTasks) { 173 LOG.log(Level.INFO, "Tasks running: {0} suspended: {1}", new Object[]{ 174 runningTasks.size(), suspendedTasks.size()}); 175 noTasks = runningTasks.isEmpty() && suspendedTasks.isEmpty(); 176 } 177 178 if (noTasks) { 179 LOG.info("All tasks completed; shutting down."); 180 } 181 } 182 } 183 184 /** 185 * Receive notification that the Task has been suspended. 186 */ 187 final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { 188 @Override 189 public void onNext(final SuspendedTask task) { 190 191 final String msg = "Task suspended: " + task.getId(); 192 LOG.info(msg); 193 194 synchronized (suspendedTasks) { 195 suspendedTasks.put(task.getId(), task); 196 runningTasks.remove(task.getId()); 197 } 198 199 jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg)); 200 } 201 } 202 203 /** 204 * Receive message from the Task. 205 */ 206 final class TaskMessageHandler implements EventHandler<TaskMessage> { 207 @Override 208 public void onNext(final TaskMessage message) { 209 final int result = CODEC_INT.decode(message.get()); 210 final String msg = "Task message " + message.getId() + ": " + result; 211 LOG.info(msg); 212 jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg)); 213 } 214 } 215 216 /** 217 * Receive notification that an Evaluator had been allocated, 218 * and submitTask a new Task in that Evaluator. 219 */ 220 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 221 @Override 222 public void onNext(final AllocatedEvaluator eval) { 223 try { 224 225 LOG.log(Level.INFO, "Allocated Evaluator: {0}", eval.getId()); 226 227 final Configuration thisContextConfiguration = ContextConfiguration.CONF.set( 228 ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build(); 229 230 eval.submitContext(Tang.Factory.getTang() 231 .newConfigurationBuilder(thisContextConfiguration, contextConfig).build()); 232 233 } catch (final BindException ex) { 234 throw new RuntimeException(ex); 235 } 236 } 237 } 238 239 /** 240 * Receive notification that a new Context is available. 241 * Submit a new Task to that Context. 242 */ 243 final class ActiveContextHandler implements EventHandler<ActiveContext> { 244 @Override 245 public synchronized void onNext(final ActiveContext context) { 246 LOG.log(Level.INFO, "Active Context: {0}", context.getId()); 247 try { 248 context.submitTask(TaskConfiguration.CONF 249 .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task") 250 .set(TaskConfiguration.TASK, SuspendTestTask.class) 251 .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class) 252 .build()); 253 } catch (final BindException ex) { 254 LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); 255 throw new RuntimeException(ex); 256 } 257 } 258 } 259 260 /** 261 * Handle notifications from the client. 262 */ 263 final class ClientMessageHandler implements EventHandler<byte[]> { 264 @Override 265 public void onNext(final byte[] message) { 266 267 final String commandStr = CODEC_STR.decode(message); 268 LOG.log(Level.INFO, "Client message: {0}", commandStr); 269 270 final String[] split = commandStr.split("\\s+", 2); 271 if (split.length != 2) { 272 throw new IllegalArgumentException("Bad command: " + commandStr); 273 } else { 274 275 final String command = split[0].toLowerCase().intern(); 276 final String taskId = split[1]; 277 278 switch (command) { 279 280 case "suspend": { 281 final RunningTask task = runningTasks.get(taskId); 282 if (task != null) { 283 task.suspend(); 284 } else { 285 throw new IllegalArgumentException("Suspend: Task not found: " + taskId); 286 } 287 break; 288 } 289 290 case "resume": { 291 final SuspendedTask suspendedTask; 292 synchronized (suspendedTasks) { 293 suspendedTask = suspendedTasks.remove(taskId); 294 } 295 if (suspendedTask != null) { 296 try { 297 suspendedTask.getActiveContext().submitTask(TaskConfiguration.CONF 298 .set(TaskConfiguration.IDENTIFIER, taskId) 299 .set(TaskConfiguration.MEMENTO, 300 DatatypeConverter.printBase64Binary(suspendedTask.get())) 301 .build()); 302 } catch (final BindException e) { 303 throw new RuntimeException(e); 304 } 305 } else { 306 throw new IllegalArgumentException("Resume: Task not found: " + taskId); 307 } 308 break; 309 } 310 311 default: 312 throw new IllegalArgumentException("Bad command: " + command); 313 } 314 } 315 } 316 } 317 318 /** 319 * Job Driver is ready and the clock is set up: request the evaluators. 320 */ 321 final class StartHandler implements EventHandler<StartTime> { 322 @Override 323 public void onNext(final StartTime time) { 324 LOG.log(Level.INFO, "StartTime: {0}", time); 325 evaluatorRequestor.submit(EvaluatorRequest.newBuilder() 326 .setMemory(128).setNumberOfCores(1).setNumber(NUM_EVALUATORS).build()); 327 } 328 } 329 330 /** 331 * Shutting down the job driver: close the evaluators. 332 */ 333 final class StopHandler implements EventHandler<StopTime> { 334 @Override 335 public void onNext(final StopTime time) { 336 LOG.log(Level.INFO, "StopTime: {0}", time); 337 jobMessageObserver.sendMessageToClient(CODEC_STR.encode("got StopTime")); 338 } 339 } 340}