001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.javabridge.generic; 020 021import org.apache.reef.driver.client.JobMessageObserver; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ClosedContext; 024import org.apache.reef.driver.context.ContextMessage; 025import org.apache.reef.driver.context.FailedContext; 026import org.apache.reef.driver.evaluator.*; 027import org.apache.reef.driver.restart.DriverRestarted; 028import org.apache.reef.driver.task.*; 029import org.apache.reef.io.network.naming.NameServer; 030import org.apache.reef.javabridge.*; 031import org.apache.reef.driver.restart.DriverRestartCompleted; 032import org.apache.reef.runtime.common.driver.DriverStatusManager; 033import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; 034import org.apache.reef.runtime.common.files.REEFFileNames; 035import org.apache.reef.tang.annotations.Parameter; 036import org.apache.reef.tang.annotations.Unit; 037import org.apache.reef.util.Optional; 038import org.apache.reef.util.logging.CLRBufferedLogHandler; 039import org.apache.reef.util.logging.LoggingScope; 040import org.apache.reef.util.logging.LoggingScopeFactory; 041import org.apache.reef.wake.EventHandler; 042import org.apache.reef.wake.remote.address.LocalAddressProvider; 043import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 044import org.apache.reef.wake.time.Clock; 045import org.apache.reef.wake.time.event.Alarm; 046import org.apache.reef.wake.time.event.StartTime; 047import org.apache.reef.wake.time.event.StopTime; 048import org.apache.reef.webserver.*; 049 050import javax.inject.Inject; 051import javax.servlet.ServletException; 052import javax.servlet.http.HttpServletResponse; 053import java.io.*; 054import java.nio.charset.Charset; 055import java.nio.charset.StandardCharsets; 056import java.util.*; 057import java.util.logging.Handler; 058import java.util.logging.Level; 059import java.util.logging.Logger; 060 061/** 062 * Generic job driver for CLRBridge. 063 */ 064@Unit 065public final class JobDriver { 066 067 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 068 /** 069 * String codec is used to encode the results 070 * before passing them back to the client. 071 */ 072 private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); 073 private final InteropLogger interopLogger = new InteropLogger(); 074 private final NameServer nameServer; 075 private final String nameServerInfo; 076 private final HttpServer httpServer; 077 private final ActiveContextBridgeFactory activeContextBridgeFactory; 078 private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory; 079 080 /** 081 * Wake clock is used to schedule periodical job check-ups. 082 */ 083 private final Clock clock; 084 /** 085 * Job observer on the client. 086 * We use it to send results from the driver back to the client. 087 */ 088 private final JobMessageObserver jobMessageObserver; 089 /** 090 * Job driver uses EvaluatorRequestor 091 * to request Evaluators that will run the Tasks. 092 */ 093 private final EvaluatorRequestor evaluatorRequestor; 094 095 /** 096 * Driver status manager to monitor driver status. 097 */ 098 private final DriverStatusManager driverStatusManager; 099 100 /** 101 * Factory to setup new CLR process configurations. 102 */ 103 private final CLRProcessFactory clrProcessFactory; 104 105 /** 106 * Shell execution results from each Evaluator. 107 */ 108 private final List<String> results = new ArrayList<>(); 109 /** 110 * Map from context ID to running evaluator context. 111 */ 112 private final Map<String, ActiveContext> contexts = new HashMap<>(); 113 114 private final REEFFileNames reefFileNames; 115 private final LocalAddressProvider localAddressProvider; 116 /** 117 * Logging scope factory that provides LoggingScope. 118 */ 119 private final LoggingScopeFactory loggingScopeFactory; 120 private final Set<String> definedRuntimes; 121 122 private BridgeHandlerManager handlerManager = null; 123 private boolean isRestarted = false; 124 // We are holding on to following on bridge side. 125 // Need to add references here so that GC does not collect them. 126 private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges = 127 new HashMap<>(); 128 private EvaluatorRequestorBridge evaluatorRequestorBridge; 129 130 131 /** 132 * Job driver constructor. 133 * All parameters are injected from TANG automatically. 134 * 135 * @param clock Wake clock to schedule and check up running jobs. 136 * @param jobMessageObserver is used to send messages back to the client. 137 * @param evaluatorRequestor is used to request Evaluators. 138 * @param activeContextBridgeFactory 139 */ 140 @Inject 141 JobDriver(final Clock clock, 142 final HttpServer httpServer, 143 final NameServer nameServer, 144 final JobMessageObserver jobMessageObserver, 145 final EvaluatorRequestor evaluatorRequestor, 146 final DriverStatusManager driverStatusManager, 147 final LoggingScopeFactory loggingScopeFactory, 148 final LocalAddressProvider localAddressProvider, 149 final ActiveContextBridgeFactory activeContextBridgeFactory, 150 final REEFFileNames reefFileNames, 151 final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory, 152 final CLRProcessFactory clrProcessFactory, 153 @Parameter(DefinedRuntimes.class) final Set<String> definedRuntimes) { 154 this.clock = clock; 155 this.httpServer = httpServer; 156 this.jobMessageObserver = jobMessageObserver; 157 this.evaluatorRequestor = evaluatorRequestor; 158 this.nameServer = nameServer; 159 this.driverStatusManager = driverStatusManager; 160 this.activeContextBridgeFactory = activeContextBridgeFactory; 161 this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory; 162 this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort(); 163 this.loggingScopeFactory = loggingScopeFactory; 164 this.reefFileNames = reefFileNames; 165 this.localAddressProvider = localAddressProvider; 166 this.clrProcessFactory = clrProcessFactory; 167 this.definedRuntimes = definedRuntimes; 168 } 169 170 private void setupBridge(final ClrHandlersInitializer initializer) { 171 // Signal to the clr buffered log handler that the driver has started and that 172 // we can begin logging 173 LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler..."); 174 try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) { 175 final CLRBufferedLogHandler handler = getCLRBufferedLogHandler(); 176 if (handler == null) { 177 LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized"); 178 } else { 179 handler.setDriverInitialized(); 180 LOG.log(Level.INFO, "CLRBufferedLogHandler init complete."); 181 } 182 183 final String portNumber = httpServer == null ? null : Integer.toString(httpServer.getPort()); 184 if (portNumber != null) { 185 try { 186 final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint()); 187 BufferedWriter out = new BufferedWriter( 188 new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8)); 189 out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n"); 190 out.close(); 191 } catch (IOException ex) { 192 throw new RuntimeException(ex); 193 } 194 } 195 196 this.evaluatorRequestorBridge = 197 new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory, 198 JobDriver.this.definedRuntimes); 199 JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); 200 201 try (final LoggingScope lp = 202 this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) { 203 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC"); 204 NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), 205 httpServerEventBridge, this.interopLogger); 206 final String specList = httpServerEventBridge.getUriSpecification(); 207 LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList); 208 if (specList != null) { 209 final String[] specs = specList.split(":"); 210 for (final String s : specs) { 211 final HttpHandler h = new HttpServerBridgeEventHandler(); 212 h.setUriSpecification(s); 213 this.httpServer.addHttpHandler(h); 214 } 215 } 216 } 217 } 218 LOG.log(Level.INFO, "CLR Bridge setup."); 219 } 220 221 private CLRBufferedLogHandler getCLRBufferedLogHandler() { 222 for (final Handler handler : Logger.getLogger("").getHandlers()) { 223 if (handler instanceof CLRBufferedLogHandler) { 224 return (CLRBufferedLogHandler) handler; 225 } 226 } 227 return null; 228 } 229 230 private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) { 231 synchronized (JobDriver.this) { 232 eval.setProcess(process); 233 LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}", 234 new Object[]{eval.getId(), JobDriver.this.contexts.size()}); 235 if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) { 236 throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); 237 } 238 final AllocatedEvaluatorBridge allocatedEvaluatorBridge = 239 this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); 240 allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); 241 NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext( 242 JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger); 243 } 244 } 245 246 private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) { 247 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) { 248 synchronized (JobDriver.this) { 249 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 250 for (final FailedContext failedContext : eval.getFailedContextList()) { 251 final String failedContextId = failedContext.getId(); 252 LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); 253 JobDriver.this.contexts.remove(failedContextId); 254 } 255 String message = "Evaluator " + eval.getId() + " failed with message: " 256 + eval.getEvaluatorException().getMessage(); 257 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 258 259 if (isRestartFailed) { 260 evaluatorFailedHandlerWaitForCLRBridgeSetup( 261 JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed); 262 } else { 263 evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(), 264 eval, isRestartFailed); 265 } 266 } 267 } 268 } 269 270 private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle, 271 final FailedEvaluator eval, 272 final boolean isRestartFailed) { 273 if (handle == 0) { 274 if (JobDriver.this.handlerManager != null) { 275 final String message = "No CLR FailedEvaluator handler was set, exiting now"; 276 LOG.log(Level.WARNING, message); 277 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 278 } else { 279 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 280 @Override 281 public void onNext(final Alarm time) { 282 if (JobDriver.this.handlerManager != null) { 283 handleFailedEvaluatorInCLR(eval, isRestartFailed); 284 } else { 285 LOG.log(Level.INFO, "Waiting for CLR bridge to be set up"); 286 clock.scheduleAlarm(5000, this); 287 } 288 } 289 }); 290 } 291 } else{ 292 handleFailedEvaluatorInCLR(eval, isRestartFailed); 293 } 294 } 295 296 private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) { 297 final String message = "CLR FailedEvaluator handler set, handling things with CLR handler."; 298 LOG.log(Level.INFO, message); 299 final FailedEvaluatorBridge failedEvaluatorBridge = 300 new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, 301 JobDriver.this.isRestarted, loggingScopeFactory, activeContextBridgeFactory, JobDriver.this.definedRuntimes); 302 if (isRestartFailed) { 303 NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext( 304 JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), 305 failedEvaluatorBridge, JobDriver.this.interopLogger); 306 } else { 307 NativeInterop.clrSystemFailedEvaluatorHandlerOnNext( 308 JobDriver.this.handlerManager.getFailedEvaluatorHandler(), 309 failedEvaluatorBridge, 310 JobDriver.this.interopLogger); 311 } 312 313 final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber(); 314 if (additionalRequestedEvaluatorNumber > 0) { 315 LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + 316 additionalRequestedEvaluatorNumber); 317 } 318 319 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 320 } 321 322 /** 323 * Submit a Task to a single Evaluator. 324 */ 325 private void submit(final ActiveContext context) { 326 try { 327 LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context}); 328 if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) { 329 throw new RuntimeException("Active Context Handler not initialized by CLR."); 330 } 331 final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context); 332 NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(), 333 activeContextBridge, JobDriver.this.interopLogger); 334 } catch (final Exception ex) { 335 LOG.log(Level.SEVERE, "Fail to submit task to active context"); 336 context.close(); 337 throw new RuntimeException(ex); 338 } 339 } 340 341 /** 342 * Handles AllocatedEvaluator: Submit an empty context. 343 */ 344 public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 345 @Override 346 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 347 try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) { 348 synchronized (JobDriver.this) { 349 LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); 350 JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess()); 351 } 352 } 353 } 354 } 355 356 /** 357 * Receive notification that a new Context is available. 358 */ 359 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 360 @Override 361 public void onNext(final ActiveContext context) { 362 try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) { 363 synchronized (JobDriver.this) { 364 LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", 365 new Object[]{context.getId()}); 366 JobDriver.this.contexts.put(context.getId(), context); 367 JobDriver.this.submit(context); 368 } 369 } 370 } 371 } 372 373 /** 374 * Receive notification that the Task has completed successfully. 375 */ 376 public final class CompletedTaskHandler implements EventHandler<CompletedTask> { 377 @Override 378 public void onNext(final CompletedTask task) { 379 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 380 try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) { 381 // Take the message returned by the task and add it to the running result. 382 String result = "default result"; 383 try { 384 result = new String(task.get(), StandardCharsets.UTF_8); 385 } catch (final Exception e) { 386 LOG.log(Level.WARNING, "failed to decode task outcome"); 387 } 388 LOG.log(Level.INFO, "Return results to the client:\n{0}", result); 389 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result)); 390 if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) { 391 LOG.log(Level.INFO, "No CLR handler bound to handle completed task."); 392 } else { 393 LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler."); 394 final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory); 395 NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(), 396 completedTaskBridge, JobDriver.this.interopLogger); 397 } 398 } 399 } 400 } 401 402 /** 403 * Receive notification that the entire Evaluator had failed. 404 */ 405 public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 406 @Override 407 public void onNext(final FailedEvaluator eval) { 408 JobDriver.this.handleFailedEvaluator(eval, false); 409 allocatedEvaluatorBridges.remove(eval.getId()); 410 } 411 } 412 413 /** 414 * Receive notification that the entire Evaluator had failed on Driver Restart. 415 */ 416 public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 417 @Override 418 public void onNext(final FailedEvaluator eval) { 419 JobDriver.this.handleFailedEvaluator(eval, true); 420 } 421 } 422 423 final class HttpServerBridgeEventHandler implements HttpHandler { 424 private String uriSpecification; 425 426 /** 427 * returns URI specification for the handler. 428 */ 429 @Override 430 public String getUriSpecification() { 431 return uriSpecification; 432 } 433 434 public void setUriSpecification(final String s) { 435 uriSpecification = s; 436 } 437 438 /** 439 * process http request. 440 */ 441 @Override 442 public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) 443 throws IOException, ServletException { 444 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri()); 445 try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) { 446 final AvroHttpSerializer httpSerializer = new AvroHttpSerializer(); 447 final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest); 448 449 final String requestString = httpSerializer.toString(avroHttpRequest); 450 final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET)); 451 452 try { 453 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes); 454 NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), 455 httpServerEventBridge, JobDriver.this.interopLogger); 456 final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8"); 457 response.getWriter().println(responseBody); 458 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody); 459 } catch (final Exception ex) { 460 LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex); 461 throw new RuntimeException(ex); 462 } 463 } 464 } 465 } 466 467 /** 468 * Handle failed task. 469 */ 470 public final class FailedTaskHandler implements EventHandler<FailedTask> { 471 @Override 472 public void onNext(final FailedTask task) throws RuntimeException { 473 LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set."); 474 if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) { 475 LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real."); 476 throw new RuntimeException("Failed Task Handler not initialized by CLR."); 477 } 478 try { 479 final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory); 480 NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(), 481 failedTaskBridge, JobDriver.this.interopLogger); 482 } catch (final Exception ex) { 483 LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler"); 484 throw new RuntimeException(ex); 485 } 486 } 487 } 488 489 /** 490 * Receive notification that the Task is running. 491 */ 492 public final class RunningTaskHandler implements EventHandler<RunningTask> { 493 @Override 494 public void onNext(final RunningTask task) { 495 try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) { 496 if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) { 497 LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler."); 498 } else { 499 LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId()); 500 try { 501 final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory); 502 NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(), 503 runningTaskBridge, JobDriver.this.interopLogger); 504 } catch (final Exception ex) { 505 LOG.log(Level.WARNING, "Fail to invoke CLR running task handler"); 506 throw new RuntimeException(ex); 507 } 508 } 509 } 510 } 511 } 512 513 /** 514 * Receive notification that the Task is running when driver restarted. 515 */ 516 public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> { 517 @Override 518 public void onNext(final RunningTask task) { 519 try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) { 520 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 521 @Override 522 public void onNext(final Alarm time) { 523 if (JobDriver.this.handlerManager != null) { 524 if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) { 525 LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR."); 526 NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext( 527 JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(), 528 new RunningTaskBridge(task, activeContextBridgeFactory)); 529 } else { 530 LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " + 531 "done with DriverRestartRunningTaskHandler."); 532 } 533 } else { 534 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 535 "before checking out CLR driver restart RunningTaskHandler..."); 536 clock.scheduleAlarm(2000, this); 537 } 538 } 539 }); 540 } 541 } 542 } 543 544 /** 545 * Receive notification that an context is active on Evaluator when the driver restarted. 546 */ 547 public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> { 548 @Override 549 public void onNext(final ActiveContext context) { 550 try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) { 551 JobDriver.this.contexts.put(context.getId(), context); 552 LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId()); 553 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 554 @Override 555 public void onNext(final Alarm time) { 556 if (JobDriver.this.handlerManager != null) { 557 if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) { 558 LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR."); 559 NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext( 560 JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(), 561 activeContextBridgeFactory.getActiveContextBridge(context)); 562 } else { 563 LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " + 564 "done with DriverRestartActiveContextHandler."); 565 } 566 } else { 567 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 568 "before checking out CLR driver restart DriverRestartActiveContextHandler..."); 569 clock.scheduleAlarm(2000, this); 570 } 571 } 572 }); 573 } 574 } 575 } 576 577 /** 578 * Job Driver is ready and the clock is set up: request the evaluators. 579 */ 580 public final class StartHandler implements EventHandler<StartTime> { 581 @Override 582 public void onNext(final StartTime startTime) { 583 try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) { 584 synchronized (JobDriver.this) { 585 586 setupBridge(new DriverStartClrHandlersInitializer(startTime)); 587 LOG.log(Level.INFO, "Driver Started"); 588 } 589 } 590 } 591 } 592 593 594 /** 595 * Job driver is restarted after previous crash. 596 */ 597 public final class RestartHandler implements EventHandler<DriverRestarted> { 598 @Override 599 public void onNext(final DriverRestarted driverRestarted) { 600 try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) { 601 synchronized (JobDriver.this) { 602 603 JobDriver.this.isRestarted = true; 604 setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted)); 605 606 LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up."); 607 } 608 } 609 } 610 } 611 612 /** 613 * Receive notification that driver restart has completed. 614 */ 615 public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> { 616 @Override 617 public void onNext(final DriverRestartCompleted driverRestartCompleted) { 618 LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", 619 driverRestartCompleted.getCompletedTime()); 620 try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted( 621 driverRestartCompleted.getCompletedTime().getTimeStamp())) { 622 if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) { 623 LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); 624 625 NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext( 626 JobDriver.this.handlerManager.getDriverRestartCompletedHandler(), 627 new DriverRestartCompletedBridge(driverRestartCompleted)); 628 } else { 629 LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler."); 630 } 631 } 632 } 633 } 634 635 /** 636 * Shutting down the job driver: close the evaluators. 637 */ 638 final class StopHandler implements EventHandler<StopTime> { 639 @Override 640 public void onNext(final StopTime time) { 641 LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time}); 642 try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) { 643 for (final ActiveContext context : contexts.values()) { 644 context.close(); 645 } 646 } 647 } 648 } 649 650 /** 651 * Handler for message received from the Task. 652 */ 653 public final class TaskMessageHandler implements EventHandler<TaskMessage> { 654 @Override 655 public void onNext(final TaskMessage taskMessage) { 656 final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8); 657 LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg); 658 //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) { 659 if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) { 660 final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage); 661 // if CLR implements the task message handler, handle the bytes in CLR handler 662 NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(), 663 taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger); 664 } 665 //} 666 } 667 } 668 669 /** 670 * Receive notification that the Task has been suspended. 671 */ 672 public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { 673 @Override 674 public void onNext(final SuspendedTask task) { 675 final String message = "Received notification that task [" + task.getId() + "] has been suspended."; 676 LOG.log(Level.INFO, message); 677 try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) { 678 if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) { 679 final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory); 680 // if CLR implements the suspended task handler, handle it in CLR 681 LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge."); 682 NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(), 683 suspendedTaskBridge); 684 } 685 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message)); 686 } 687 } 688 } 689 690 /** 691 * Receive notification that the Evaluator has been shut down. 692 */ 693 public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { 694 @Override 695 public void onNext(final CompletedEvaluator evaluator) { 696 LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId()); 697 try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) { 698 if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) { 699 final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator); 700 // if CLR implements the completed evaluator handler, handle it in CLR 701 LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge."); 702 NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext( 703 JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge); 704 allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId()); 705 } 706 } 707 } 708 } 709 710 711 /** 712 * Receive notification that the Context had completed. 713 * Remove context from the list of active context. 714 */ 715 public final class ClosedContextHandler implements EventHandler<ClosedContext> { 716 @Override 717 public void onNext(final ClosedContext context) { 718 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 719 try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) { 720 if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) { 721 final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory); 722 // if CLR implements the closed context handler, handle it in CLR 723 LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge."); 724 NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(), 725 closedContextBridge); 726 } 727 synchronized (JobDriver.this) { 728 JobDriver.this.contexts.remove(context.getId()); 729 } 730 } 731 } 732 } 733 734 735 /** 736 * Receive notification that the Context had failed. 737 * Remove context from the list of active context and notify the client. 738 */ 739 public final class FailedContextHandler implements EventHandler<FailedContext> { 740 @Override 741 public void onNext(final FailedContext context) { 742 LOG.log(Level.SEVERE, "FailedContext", context); 743 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) { 744 if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) { 745 final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory); 746 // if CLR implements the failed context handler, handle it in CLR 747 LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge."); 748 NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(), 749 failedContextBridge); 750 } 751 synchronized (JobDriver.this) { 752 JobDriver.this.contexts.remove(context.getId()); 753 } 754 final Optional<byte[]> err = context.getData(); 755 if (err.isPresent()) { 756 JobDriver.this.jobMessageObserver.sendMessageToClient(err.get()); 757 } 758 } 759 } 760 } 761 762 /** 763 * Receive notification that a ContextMessage has been received. 764 */ 765 public final class ContextMessageHandler implements EventHandler<ContextMessage> { 766 @Override 767 public void onNext(final ContextMessage message) { 768 LOG.log(Level.SEVERE, "Received ContextMessage:", message.get()); 769 try (final LoggingScope ls = 770 loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) { 771 if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) { 772 final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message); 773 // if CLR implements the context message handler, handle it in CLR 774 LOG.log(Level.INFO, "Handling the event of context message in CLR bridge."); 775 NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(), 776 contextMessageBridge); 777 } 778 } 779 } 780 } 781 782 /** 783 * Gets the progress of the application from .NET side. 784 */ 785 public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider { 786 @Override 787 public float getProgress() { 788 if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) { 789 return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider()); 790 } 791 792 return 0f; 793 } 794 } 795}