001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.driver.client.JobMessageObserver; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.evaluator.AllocatedEvaluator; 025import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 026import org.apache.reef.driver.evaluator.EvaluatorRequestor; 027import org.apache.reef.driver.task.*; 028import org.apache.reef.io.checkpoint.fs.FSCheckPointServiceConfiguration; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.JavaConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.annotations.Parameter; 033import org.apache.reef.tang.annotations.Unit; 034import org.apache.reef.tang.exceptions.BindException; 035import org.apache.reef.wake.EventHandler; 036import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 037import org.apache.reef.wake.time.event.StartTime; 038import org.apache.reef.wake.time.event.StopTime; 039 040import javax.inject.Inject; 041import javax.xml.bind.DatatypeConverter; 042import java.util.Collections; 043import java.util.HashMap; 044import java.util.Map; 045import java.util.logging.Level; 046import java.util.logging.Logger; 047 048/** 049 * Suspend/resume example job driver. Execute a simple task in all evaluators, 050 * and send EvaluatorControlMessage suspend/resume events properly. 051 */ 052@Unit 053public class SuspendDriver { 054 055 /** 056 * Standard Java logger. 057 */ 058 private static final Logger LOG = Logger.getLogger(SuspendDriver.class.getName()); 059 060 /** 061 * Number of evaluators to request. 062 */ 063 private static final int NUM_EVALUATORS = 2; 064 065 /** 066 * String codec is used to encode the results driver sends to the client. 067 */ 068 private static final ObjectSerializableCodec<String> CODEC_STR = new ObjectSerializableCodec<>(); 069 070 /** 071 * Integer codec is used to decode the results driver gets from the tasks. 072 */ 073 private static final ObjectSerializableCodec<Integer> CODEC_INT = new ObjectSerializableCodec<>(); 074 075 /** 076 * Job observer on the client. 077 * We use it to send results from the driver back to the client. 078 */ 079 private final JobMessageObserver jobMessageObserver; 080 081 /** 082 * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks. 083 */ 084 private final EvaluatorRequestor evaluatorRequestor; 085 086 /** 087 * TANG Configuration of the Task. 088 */ 089 private final Configuration contextConfig; 090 091 /** 092 * Map from task ID (a string) to the TaskRuntime instance (that can be suspended). 093 */ 094 private final Map<String, RunningTask> runningTasks = 095 Collections.synchronizedMap(new HashMap<String, RunningTask>()); 096 097 /** 098 * Map from task ID (a string) to the SuspendedTask instance (that can be resumed). 099 */ 100 private final Map<String, SuspendedTask> suspendedTasks = new HashMap<>(); 101 102 /** 103 * Job driver constructor. 104 * All parameters are injected from TANG automatically. 105 * 106 * @param evaluatorRequestor is used to request Evaluators. 107 * @param numCycles number of cycles to run in the task. 108 * @param delay delay in seconds between cycles in the task. 109 */ 110 @Inject 111 SuspendDriver( 112 final JobMessageObserver jobMessageObserver, 113 final EvaluatorRequestor evaluatorRequestor, 114 @Parameter(Launch.Local.class) final boolean isLocal, 115 @Parameter(Launch.NumCycles.class) final int numCycles, 116 @Parameter(Launch.Delay.class) final int delay) { 117 118 this.jobMessageObserver = jobMessageObserver; 119 this.evaluatorRequestor = evaluatorRequestor; 120 121 try { 122 123 final Configuration checkpointServiceConfig = FSCheckPointServiceConfiguration.CONF 124 .set(FSCheckPointServiceConfiguration.IS_LOCAL, Boolean.toString(isLocal)) 125 .set(FSCheckPointServiceConfiguration.PATH, "/tmp") 126 .set(FSCheckPointServiceConfiguration.PREFIX, "reef-checkpoint-") 127 .set(FSCheckPointServiceConfiguration.REPLICATION_FACTOR, "3") 128 .build(); 129 130 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder() 131 .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles)) 132 .bindNamedParameter(Launch.Delay.class, Integer.toString(delay)); 133 134 cb.addConfiguration(checkpointServiceConfig); 135 this.contextConfig = cb.build(); 136 137 } catch (final BindException ex) { 138 throw new RuntimeException(ex); 139 } 140 } 141 142 /** 143 * Receive notification that the Task is ready to run. 144 */ 145 final class RunningTaskHandler implements EventHandler<RunningTask> { 146 @Override 147 public void onNext(final RunningTask task) { 148 LOG.log(Level.INFO, "Running task: {0}", task.getId()); 149 runningTasks.put(task.getId(), task); 150 jobMessageObserver.sendMessageToClient(CODEC_STR.encode("start task: " + task.getId())); 151 } 152 } 153 154 /** 155 * Receive notification that the Task has completed successfully. 156 */ 157 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 158 @Override 159 public void onNext(final CompletedTask task) { 160 161 final EvaluatorDescriptor e = task.getActiveContext().getEvaluatorDescriptor(); 162 final String msg = "Task completed " + task.getId() + " on node " + e; 163 LOG.info(msg); 164 165 jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg)); 166 runningTasks.remove(task.getId()); 167 task.getActiveContext().close(); 168 169 final boolean noTasks; 170 171 synchronized (suspendedTasks) { 172 LOG.log(Level.INFO, "Tasks running: {0} suspended: {1}", new Object[]{ 173 runningTasks.size(), suspendedTasks.size()}); 174 noTasks = runningTasks.isEmpty() && suspendedTasks.isEmpty(); 175 } 176 177 if (noTasks) { 178 LOG.info("All tasks completed; shutting down."); 179 } 180 } 181 } 182 183 /** 184 * Receive notification that the Task has been suspended. 185 */ 186 final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { 187 @Override 188 public void onNext(final SuspendedTask task) { 189 190 final String msg = "Task suspended: " + task.getId(); 191 LOG.info(msg); 192 193 synchronized (suspendedTasks) { 194 suspendedTasks.put(task.getId(), task); 195 runningTasks.remove(task.getId()); 196 } 197 198 jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg)); 199 } 200 } 201 202 /** 203 * Receive message from the Task. 204 */ 205 final class TaskMessageHandler implements EventHandler<TaskMessage> { 206 @Override 207 public void onNext(final TaskMessage message) { 208 final int result = CODEC_INT.decode(message.get()); 209 final String msg = "Task message " + message.getId() + ": " + result; 210 LOG.info(msg); 211 jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg)); 212 } 213 } 214 215 /** 216 * Receive notification that an Evaluator had been allocated, 217 * and submitTask a new Task in that Evaluator. 218 */ 219 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 220 @Override 221 public void onNext(final AllocatedEvaluator eval) { 222 try { 223 224 LOG.log(Level.INFO, "Allocated Evaluator: {0}", eval.getId()); 225 226 final Configuration thisContextConfiguration = ContextConfiguration.CONF.set( 227 ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build(); 228 229 eval.submitContext(Tang.Factory.getTang() 230 .newConfigurationBuilder(thisContextConfiguration, contextConfig).build()); 231 232 } catch (final BindException ex) { 233 throw new RuntimeException(ex); 234 } 235 } 236 } 237 238 /** 239 * Receive notification that a new Context is available. 240 * Submit a new Task to that Context. 241 */ 242 final class ActiveContextHandler implements EventHandler<ActiveContext> { 243 @Override 244 public synchronized void onNext(final ActiveContext context) { 245 LOG.log(Level.INFO, "Active Context: {0}", context.getId()); 246 try { 247 context.submitTask(TaskConfiguration.CONF 248 .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task") 249 .set(TaskConfiguration.TASK, SuspendTestTask.class) 250 .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class) 251 .set(TaskConfiguration.ON_SEND_MESSAGE, SuspendTestTask.class) 252 .build()); 253 } catch (final BindException ex) { 254 LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); 255 throw new RuntimeException(ex); 256 } 257 } 258 } 259 260 /** 261 * Handle notifications from the client. 262 */ 263 final class ClientMessageHandler implements EventHandler<byte[]> { 264 @Override 265 public void onNext(final byte[] message) { 266 267 final String commandStr = CODEC_STR.decode(message); 268 LOG.log(Level.INFO, "Client message: {0}", commandStr); 269 270 final String[] split = commandStr.split("\\s+", 2); 271 if (split.length != 2) { 272 throw new IllegalArgumentException("Bad command: " + commandStr); 273 } else { 274 275 final String command = split[0].toLowerCase().intern(); 276 final String taskId = split[1]; 277 278 switch (command) { 279 280 case "suspend": { 281 final RunningTask task = runningTasks.get(taskId); 282 if (task != null) { 283 task.suspend(); 284 } else { 285 throw new IllegalArgumentException("Suspend: Task not found: " + taskId); 286 } 287 break; 288 } 289 290 case "resume": { 291 final SuspendedTask suspendedTask; 292 synchronized (suspendedTasks) { 293 suspendedTask = suspendedTasks.remove(taskId); 294 } 295 if (suspendedTask != null) { 296 try { 297 suspendedTask.getActiveContext().submitTask(TaskConfiguration.CONF 298 .set(TaskConfiguration.IDENTIFIER, taskId) 299 .set(TaskConfiguration.TASK, SuspendTestTask.class) 300 .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class) 301 .set(TaskConfiguration.ON_SEND_MESSAGE, SuspendTestTask.class) 302 .set(TaskConfiguration.MEMENTO, 303 DatatypeConverter.printBase64Binary(suspendedTask.get())) 304 .build()); 305 } catch (final BindException e) { 306 throw new RuntimeException(e); 307 } 308 } else { 309 throw new IllegalArgumentException("Resume: Task not found: " + taskId); 310 } 311 break; 312 } 313 314 default: 315 throw new IllegalArgumentException("Bad command: " + command); 316 } 317 } 318 } 319 } 320 321 /** 322 * Job Driver is ready and the clock is set up: request the evaluators. 323 */ 324 final class StartHandler implements EventHandler<StartTime> { 325 @Override 326 public void onNext(final StartTime time) { 327 LOG.log(Level.INFO, "StartTime: {0}", time); 328 evaluatorRequestor.newRequest() 329 .setMemory(128) 330 .setNumberOfCores(1) 331 .setNumber(NUM_EVALUATORS) 332 .submit(); 333 } 334 } 335 336 /** 337 * Shutting down the job driver: close the evaluators. 338 */ 339 final class StopHandler implements EventHandler<StopTime> { 340 @Override 341 public void onNext(final StopTime time) { 342 LOG.log(Level.INFO, "StopTime: {0}", time); 343 jobMessageObserver.sendMessageToClient(CODEC_STR.encode("got StopTime")); 344 } 345 } 346}