001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.retained_evalCLR; 020 021import org.apache.reef.driver.catalog.ResourceCatalog; 022import org.apache.reef.driver.client.JobMessageObserver; 023import org.apache.reef.driver.context.ActiveContext; 024import org.apache.reef.driver.context.ClosedContext; 025import org.apache.reef.driver.context.ContextConfiguration; 026import org.apache.reef.driver.context.FailedContext; 027import org.apache.reef.driver.evaluator.*; 028import org.apache.reef.driver.task.CompletedTask; 029import org.apache.reef.driver.task.TaskConfiguration; 030import org.apache.reef.examples.library.Command; 031import org.apache.reef.examples.library.ShellTask; 032import org.apache.reef.tang.*; 033import org.apache.reef.tang.annotations.Unit; 034import org.apache.reef.tang.exceptions.BindException; 035import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy; 036import org.apache.reef.tang.proto.ClassHierarchyProto; 037import org.apache.reef.wake.EventHandler; 038import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 039import org.apache.reef.wake.time.Clock; 040import org.apache.reef.wake.time.event.Alarm; 041import org.apache.reef.wake.time.event.StartTime; 042import org.apache.reef.wake.time.event.StopTime; 043 044import javax.inject.Inject; 045import java.io.FileInputStream; 046import java.io.IOException; 047import java.io.InputStream; 048import java.util.ArrayList; 049import java.util.HashMap; 050import java.util.List; 051import java.util.Map; 052import java.util.logging.Level; 053import java.util.logging.Logger; 054 055/** 056 * Retained Evaluator example job driver. Execute shell command on all evaluators, 057 * capture stdout, and return concatenated results back to the client. 058 */ 059@Unit 060public final class JobDriver { 061 public static final String SHELL_TASK_CLASS_HIERARCHY_FILENAME = "ShellTask.bin"; 062 /** 063 * Standard Java logger. 064 */ 065 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 066 /** 067 * Duration of one clock interval. 068 */ 069 private static final int CHECK_UP_INTERVAL = 1000; // 1 sec. 070 private static final String JVM_CONTEXT_SUFFIX = "_JVMContext"; 071 private static final String CLR_CONTEXT_SUFFIX = "_CLRContext"; 072 /** 073 * String codec is used to encode the results 074 * before passing them back to the client. 075 */ 076 private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); 077 public static int totalEvaluators = 2; 078 /** 079 * Wake clock is used to schedule periodical job check-ups. 080 */ 081 private final Clock clock; 082 /** 083 * Job observer on the client. 084 * We use it to send results from the driver back to the client. 085 */ 086 private final JobMessageObserver jobMessageObserver; 087 /** 088 * Job driver uses EvaluatorRequestor 089 * to request Evaluators that will run the Tasks. 090 */ 091 private final EvaluatorRequestor evaluatorRequestor; 092 /** 093 * Static catalog of REEF resources. 094 * We use it to schedule Task on every available node. 095 */ 096 private final ResourceCatalog catalog; 097 /** 098 * Shell execution results from each Evaluator. 099 */ 100 private final List<String> results = new ArrayList<>(); 101 /** 102 * Map from context ID to running evaluator context. 103 */ 104 private final Map<String, ActiveContext> contexts = new HashMap<>(); 105 private int nCLREvaluator = 1; // guarded by this 106 private int nJVMEvaluator = totalEvaluators - nCLREvaluator; // guarded by this 107 /** 108 * Job driver state. 109 */ 110 private State state = State.INIT; 111 /** 112 * First command to execute. Sometimes client can send us the first command 113 * before Evaluators are available; we need to store this command here. 114 */ 115 private String cmd; 116 /** 117 * Number of evaluators/tasks to complete. 118 */ 119 private int expectCount = 0; 120 121 /** 122 * Job driver constructor. 123 * All parameters are injected from TANG automatically. 124 * 125 * @param clock Wake clock to schedule and check up running jobs. 126 * @param jobMessageObserver is used to send messages back to the client. 127 * @param evaluatorRequestor is used to request Evaluators. 128 */ 129 @Inject 130 JobDriver(final Clock clock, 131 final JobMessageObserver jobMessageObserver, 132 final EvaluatorRequestor evaluatorRequestor, 133 final ResourceCatalog catalog) { 134 this.clock = clock; 135 this.jobMessageObserver = jobMessageObserver; 136 this.evaluatorRequestor = evaluatorRequestor; 137 this.catalog = catalog; 138 } 139 140 /** 141 * Makes a task configuration for the CLR ShellTask. 142 * 143 * @param taskId 144 * @return task configuration for the CLR Task. 145 * @throws BindException 146 */ 147 private static final Configuration getCLRTaskConfiguration( 148 final String taskId, final String command) throws BindException { 149 150 final ConfigurationBuilder cb = Tang.Factory.getTang() 151 .newConfigurationBuilder(loadShellTaskClassHierarchy(SHELL_TASK_CLASS_HIERARCHY_FILENAME)); 152 153 cb.bind("Microsoft.Reef.Tasks.ITask, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", "Microsoft.Reef.Tasks.ShellTask, Microsoft.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"); 154 cb.bind("Microsoft.Reef.Tasks.TaskConfigurationOptions+Identifier, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", taskId); 155 cb.bind("Microsoft.Reef.Tasks.ShellTask+Command, Microsoft.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null", command); 156 157 return cb.build(); 158 } 159 160 /** 161 * Makes a task configuration for the JVM ShellTask.. 162 * 163 * @param taskId 164 * @return task configuration for the JVM Task. 165 * @throws BindException 166 */ 167 private static final Configuration getJVMTaskConfiguration( 168 final String taskId, final String command) throws BindException { 169 170 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 171 cb.addConfiguration( 172 TaskConfiguration.CONF 173 .set(TaskConfiguration.IDENTIFIER, taskId) 174 .set(TaskConfiguration.TASK, ShellTask.class) 175 .build() 176 ); 177 cb.bindNamedParameter(Command.class, command); 178 return cb.build(); 179 } 180 181 /** 182 * Loads the class hierarchy. 183 * 184 * @return 185 */ 186 private static ClassHierarchy loadShellTaskClassHierarchy(String binFile) { 187 try (final InputStream chin = new FileInputStream(binFile)) { 188 final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin); 189 final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root); 190 return ch; 191 } catch (final IOException e) { 192 final String message = "Unable to load class hierarchy " + binFile; 193 LOG.log(Level.SEVERE, message, e); 194 throw new RuntimeException(message, e); 195 } 196 } 197 198 private void submitEvaluator(final AllocatedEvaluator eval, EvaluatorType type) { 199 synchronized (JobDriver.this) { 200 201 String contextIdSuffix = type.equals(EvaluatorType.JVM) ? JVM_CONTEXT_SUFFIX : CLR_CONTEXT_SUFFIX; 202 String contextId = eval.getId() + contextIdSuffix; 203 204 eval.setType(type); 205 206 LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}", 207 new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()}); 208 assert (JobDriver.this.state == State.WAIT_EVALUATORS); 209 try { 210 eval.submitContext(ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, contextId).build()); 211 } catch (final BindException ex) { 212 LOG.log(Level.SEVERE, "Failed to submit context " + contextId, ex); 213 throw new RuntimeException(ex); 214 } 215 } 216 } 217 218 /** 219 * Submit command to all available evaluators. 220 * 221 * @param command shell command to execute. 222 */ 223 private void submit(final String command) { 224 LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}", 225 new Object[]{command, this.contexts.size(), this.state}); 226 assert (this.state == State.READY); 227 this.expectCount = this.contexts.size(); 228 this.state = State.WAIT_TASKS; 229 this.cmd = null; 230 for (final ActiveContext context : this.contexts.values()) { 231 this.submit(context, command); 232 } 233 } 234 235 /** 236 * Submit a Task that execute the command to a single Evaluator. 237 * This method is called from <code>submitTask(cmd)</code>. 238 */ 239 private void submit(final ActiveContext context, final String command) { 240 try { 241 LOG.log(Level.INFO, "Sending command {0} to context: {1}", new Object[]{command, context}); 242 String taskId = context.getId() + "_task"; 243 final Configuration taskConfiguration; 244 if (context.getId().endsWith(JVM_CONTEXT_SUFFIX)) { 245 taskConfiguration = getJVMTaskConfiguration(taskId, command); 246 } else { 247 taskConfiguration = getCLRTaskConfiguration(taskId, command); 248 } 249 context.submitTask(taskConfiguration); 250 } catch (final BindException ex) { 251 LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); 252 throw new RuntimeException(ex); 253 } 254 } 255 256 /** 257 * Construct the final result and forward it to the Client. 258 */ 259 private void returnResults() { 260 final StringBuilder sb = new StringBuilder(); 261 for (final String result : this.results) { 262 sb.append(result); 263 } 264 this.results.clear(); 265 LOG.log(Level.INFO, "Return results to the client:\n{0}", sb); 266 this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(sb.toString())); 267 } 268 269 /** 270 * Request evaluators on each node. 271 * If nodes are not available yet, schedule another request in CHECK_UP_INTERVAL. 272 * TODO: Ask for specific nodes. (This is not working in YARN... need to check again at some point.) 273 * 274 * @throws RuntimeException if any of the requests fails. 275 */ 276 private synchronized void requestEvaluators() { 277 assert (this.state == State.INIT); 278 final int numNodes = totalEvaluators; 279 if (numNodes > 0) { 280 LOG.log(Level.INFO, "Schedule on {0} nodes.", numNodes); 281 this.evaluatorRequestor.submit( 282 EvaluatorRequest.newBuilder() 283 .setMemory(128) 284 .setNumberOfCores(1) 285 .setNumber(numNodes).build() 286 ); 287 this.state = State.WAIT_EVALUATORS; 288 this.expectCount = numNodes; 289 } else { 290 // No nodes available yet - wait and ask again. 291 this.clock.scheduleAlarm(CHECK_UP_INTERVAL, new EventHandler<Alarm>() { 292 @Override 293 public void onNext(final Alarm time) { 294 synchronized (JobDriver.this) { 295 LOG.log(Level.INFO, "{0} Alarm: {1}", new Object[]{JobDriver.this.state, time}); 296 if (JobDriver.this.state == State.INIT) { 297 JobDriver.this.requestEvaluators(); 298 } 299 } 300 } 301 }); 302 } 303 } 304 305 /** 306 * Possible states of the job driver. Can be one of: 307 * <dl> 308 * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd> 309 * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd> 310 * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd> 311 * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd> 312 * </dl> 313 */ 314 private enum State { 315 INIT, WAIT_EVALUATORS, READY, WAIT_TASKS 316 } 317 318 /** 319 * Handles AllocatedEvaluator: Submit an empty context 320 */ 321 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 322 @Override 323 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 324 synchronized (JobDriver.this) { 325 if (JobDriver.this.nJVMEvaluator > 0) { 326 LOG.log(Level.INFO, "===== adding JVM evaluator ====="); 327 JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.JVM); 328 JobDriver.this.nJVMEvaluator -= 1; 329 } else if (JobDriver.this.nCLREvaluator > 0) { 330 LOG.log(Level.INFO, "===== adding CLR evaluator ====="); 331 JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR); 332 JobDriver.this.nCLREvaluator -= 1; 333 } 334 } 335 } 336 } 337 338 /** 339 * Receive notification that a new Context is available. 340 * Submit a new Distributed Shell Task to that Context. 341 */ 342 final class ActiveContextHandler implements EventHandler<ActiveContext> { 343 @Override 344 public void onNext(final ActiveContext context) { 345 synchronized (JobDriver.this) { 346 LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}", 347 new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state}); 348 assert (JobDriver.this.state == State.WAIT_EVALUATORS); 349 JobDriver.this.contexts.put(context.getId(), context); 350 if (--JobDriver.this.expectCount <= 0) { 351 JobDriver.this.state = State.READY; 352 if (JobDriver.this.cmd == null) { 353 LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}", 354 JobDriver.this.state); 355 } else { 356 JobDriver.this.submit(JobDriver.this.cmd); 357 } 358 } 359 } 360 } 361 } 362 363 /** 364 * Receive notification that the Context had completed. 365 * Remove context from the list of active context. 366 */ 367 final class ClosedContextHandler implements EventHandler<ClosedContext> { 368 @Override 369 public void onNext(final ClosedContext context) { 370 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 371 synchronized (JobDriver.this) { 372 JobDriver.this.contexts.remove(context.getId()); 373 } 374 } 375 } 376 377 /** 378 * Receive notification that the Context had failed. 379 * Remove context from the list of active context and notify the client. 380 */ 381 final class FailedContextHandler implements EventHandler<FailedContext> { 382 @Override 383 public void onNext(final FailedContext context) { 384 LOG.log(Level.SEVERE, "FailedContext", context); 385 synchronized (JobDriver.this) { 386 JobDriver.this.contexts.remove(context.getId()); 387 } 388 throw new RuntimeException("Failed context: ", context.asError()); 389 } 390 } 391 392 /** 393 * Receive notification that the entire Evaluator had failed. 394 * Stop other jobs and pass this error to the job observer on the client. 395 */ 396 final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 397 @Override 398 public void onNext(final FailedEvaluator eval) { 399 synchronized (JobDriver.this) { 400 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 401 for (final FailedContext failedContext : eval.getFailedContextList()) { 402 JobDriver.this.contexts.remove(failedContext.getId()); 403 } 404 throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException()); 405 } 406 } 407 } 408 409 /** 410 * Receive notification that the Task has completed successfully. 411 */ 412 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 413 @Override 414 public void onNext(final CompletedTask task) { 415 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 416 // Take the message returned by the task and add it to the running result. 417 String result = "default result"; 418 try { 419 if (task.getId().contains(CLR_CONTEXT_SUFFIX)) { 420 result = new String(task.get()); 421 } else { 422 result = JVM_CODEC.decode(task.get()); 423 } 424 } catch (final Exception e) { 425 LOG.log(Level.WARNING, "failed to decode task outcome"); 426 } 427 synchronized (JobDriver.this) { 428 JobDriver.this.results.add(task.getId() + " :: " + result); 429 LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{ 430 task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state}); 431 if (--JobDriver.this.expectCount <= 0) { 432 JobDriver.this.returnResults(); 433 JobDriver.this.state = State.READY; 434 if (JobDriver.this.cmd != null) { 435 JobDriver.this.submit(JobDriver.this.cmd); 436 } 437 } 438 } 439 } 440 } 441 442 /** 443 * Receive notification from the client. 444 */ 445 final class ClientMessageHandler implements EventHandler<byte[]> { 446 @Override 447 public void onNext(final byte[] message) { 448 synchronized (JobDriver.this) { 449 final String command = JVM_CODEC.decode(message); 450 LOG.log(Level.INFO, "Client message: {0} state: {1}", 451 new Object[]{command, JobDriver.this.state}); 452 assert (JobDriver.this.cmd == null); 453 if (JobDriver.this.state == State.READY) { 454 JobDriver.this.submit(command); 455 } else { 456 // not ready yet - save the command for better times. 457 assert (JobDriver.this.state == State.WAIT_EVALUATORS); 458 JobDriver.this.cmd = command; 459 } 460 } 461 } 462 } 463 464 /** 465 * Job Driver is ready and the clock is set up: request the evaluators. 466 */ 467 final class StartHandler implements EventHandler<StartTime> { 468 @Override 469 public void onNext(final StartTime startTime) { 470 LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime}); 471 assert (state == State.INIT); 472 requestEvaluators(); 473 } 474 } 475 476 /** 477 * Shutting down the job driver: close the evaluators. 478 */ 479 final class StopHandler implements EventHandler<StopTime> { 480 @Override 481 public void onNext(final StopTime time) { 482 LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time}); 483 for (final ActiveContext context : contexts.values()) { 484 context.close(); 485 } 486 } 487 } 488} 489