001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.javabridge.generic; 020 021import org.apache.reef.driver.client.JobMessageObserver; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ClosedContext; 024import org.apache.reef.driver.context.ContextMessage; 025import org.apache.reef.driver.context.FailedContext; 026import org.apache.reef.driver.evaluator.*; 027import org.apache.reef.driver.restart.DriverRestarted; 028import org.apache.reef.driver.task.*; 029import org.apache.reef.io.network.naming.NameServer; 030import org.apache.reef.javabridge.*; 031import org.apache.reef.driver.restart.DriverRestartCompleted; 032import org.apache.reef.runtime.common.driver.DriverStatusManager; 033import org.apache.reef.runtime.common.files.REEFFileNames; 034import org.apache.reef.tang.annotations.Unit; 035import org.apache.reef.util.Optional; 036import org.apache.reef.util.logging.CLRBufferedLogHandler; 037import org.apache.reef.util.logging.LoggingScope; 038import org.apache.reef.util.logging.LoggingScopeFactory; 039import org.apache.reef.wake.EventHandler; 040import org.apache.reef.wake.remote.address.LocalAddressProvider; 041import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 042import org.apache.reef.wake.time.Clock; 043import org.apache.reef.wake.time.event.Alarm; 044import org.apache.reef.wake.time.event.StartTime; 045import org.apache.reef.wake.time.event.StopTime; 046import org.apache.reef.webserver.*; 047 048import javax.inject.Inject; 049import javax.servlet.ServletException; 050import javax.servlet.http.HttpServletResponse; 051import java.io.*; 052import java.nio.charset.Charset; 053import java.nio.charset.StandardCharsets; 054import java.util.ArrayList; 055import java.util.HashMap; 056import java.util.List; 057import java.util.Map; 058import java.util.logging.Handler; 059import java.util.logging.Level; 060import java.util.logging.Logger; 061 062/** 063 * Generic job driver for CLRBridge. 064 */ 065@Unit 066public final class JobDriver { 067 068 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 069 /** 070 * String codec is used to encode the results 071 * before passing them back to the client. 072 */ 073 private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); 074 private final InteropLogger interopLogger = new InteropLogger(); 075 private final NameServer nameServer; 076 private final String nameServerInfo; 077 private final HttpServer httpServer; 078 private final ActiveContextBridgeFactory activeContextBridgeFactory; 079 private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory; 080 081 /** 082 * Wake clock is used to schedule periodical job check-ups. 083 */ 084 private final Clock clock; 085 /** 086 * Job observer on the client. 087 * We use it to send results from the driver back to the client. 088 */ 089 private final JobMessageObserver jobMessageObserver; 090 /** 091 * Job driver uses EvaluatorRequestor 092 * to request Evaluators that will run the Tasks. 093 */ 094 private final EvaluatorRequestor evaluatorRequestor; 095 096 /** 097 * Driver status manager to monitor driver status. 098 */ 099 private final DriverStatusManager driverStatusManager; 100 101 /** 102 * Factory to setup new CLR process configurations. 103 */ 104 private final CLRProcessFactory clrProcessFactory; 105 106 /** 107 * Shell execution results from each Evaluator. 108 */ 109 private final List<String> results = new ArrayList<>(); 110 /** 111 * Map from context ID to running evaluator context. 112 */ 113 private final Map<String, ActiveContext> contexts = new HashMap<>(); 114 115 private final REEFFileNames reefFileNames; 116 private final LocalAddressProvider localAddressProvider; 117 /** 118 * Logging scope factory that provides LoggingScope. 119 */ 120 private final LoggingScopeFactory loggingScopeFactory; 121 122 private BridgeHandlerManager handlerManager = null; 123 private boolean isRestarted = false; 124 // We are holding on to following on bridge side. 125 // Need to add references here so that GC does not collect them. 126 private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges = 127 new HashMap<>(); 128 private EvaluatorRequestorBridge evaluatorRequestorBridge; 129 130 131 /** 132 * Job driver constructor. 133 * All parameters are injected from TANG automatically. 134 * 135 * @param clock Wake clock to schedule and check up running jobs. 136 * @param jobMessageObserver is used to send messages back to the client. 137 * @param evaluatorRequestor is used to request Evaluators. 138 * @param activeContextBridgeFactory 139 */ 140 @Inject 141 JobDriver(final Clock clock, 142 final HttpServer httpServer, 143 final NameServer nameServer, 144 final JobMessageObserver jobMessageObserver, 145 final EvaluatorRequestor evaluatorRequestor, 146 final DriverStatusManager driverStatusManager, 147 final LoggingScopeFactory loggingScopeFactory, 148 final LocalAddressProvider localAddressProvider, 149 final ActiveContextBridgeFactory activeContextBridgeFactory, 150 final REEFFileNames reefFileNames, 151 final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory, 152 final CLRProcessFactory clrProcessFactory) { 153 this.clock = clock; 154 this.httpServer = httpServer; 155 this.jobMessageObserver = jobMessageObserver; 156 this.evaluatorRequestor = evaluatorRequestor; 157 this.nameServer = nameServer; 158 this.driverStatusManager = driverStatusManager; 159 this.activeContextBridgeFactory = activeContextBridgeFactory; 160 this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory; 161 this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort(); 162 this.loggingScopeFactory = loggingScopeFactory; 163 this.reefFileNames = reefFileNames; 164 this.localAddressProvider = localAddressProvider; 165 this.clrProcessFactory = clrProcessFactory; 166 } 167 168 private void setupBridge(final ClrHandlersInitializer initializer) { 169 // Signal to the clr buffered log handler that the driver has started and that 170 // we can begin logging 171 LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler..."); 172 try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) { 173 final CLRBufferedLogHandler handler = getCLRBufferedLogHandler(); 174 if (handler == null) { 175 LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized"); 176 } else { 177 handler.setDriverInitialized(); 178 LOG.log(Level.INFO, "CLRBufferedLogHandler init complete."); 179 } 180 181 final String portNumber = httpServer == null ? null : Integer.toString(httpServer.getPort()); 182 if (portNumber != null) { 183 try { 184 final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint()); 185 BufferedWriter out = new BufferedWriter( 186 new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8)); 187 out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n"); 188 out.close(); 189 } catch (IOException ex) { 190 throw new RuntimeException(ex); 191 } 192 } 193 194 this.evaluatorRequestorBridge = 195 new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); 196 JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); 197 198 try (final LoggingScope lp = 199 this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) { 200 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC"); 201 NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), 202 httpServerEventBridge, this.interopLogger); 203 final String specList = httpServerEventBridge.getUriSpecification(); 204 LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList); 205 if (specList != null) { 206 final String[] specs = specList.split(":"); 207 for (final String s : specs) { 208 final HttpHandler h = new HttpServerBridgeEventHandler(); 209 h.setUriSpecification(s); 210 this.httpServer.addHttpHandler(h); 211 } 212 } 213 } 214 } 215 LOG.log(Level.INFO, "CLR Bridge setup."); 216 } 217 218 private CLRBufferedLogHandler getCLRBufferedLogHandler() { 219 for (final Handler handler : Logger.getLogger("").getHandlers()) { 220 if (handler instanceof CLRBufferedLogHandler) { 221 return (CLRBufferedLogHandler) handler; 222 } 223 } 224 return null; 225 } 226 227 private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) { 228 synchronized (JobDriver.this) { 229 eval.setProcess(process); 230 LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}", 231 new Object[]{eval.getId(), JobDriver.this.contexts.size()}); 232 if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) { 233 throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); 234 } 235 final AllocatedEvaluatorBridge allocatedEvaluatorBridge = 236 this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); 237 allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); 238 NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext( 239 JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger); 240 } 241 } 242 243 private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) { 244 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) { 245 synchronized (JobDriver.this) { 246 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 247 for (final FailedContext failedContext : eval.getFailedContextList()) { 248 final String failedContextId = failedContext.getId(); 249 LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); 250 JobDriver.this.contexts.remove(failedContextId); 251 } 252 String message = "Evaluator " + eval.getId() + " failed with message: " 253 + eval.getEvaluatorException().getMessage(); 254 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 255 256 if (isRestartFailed) { 257 evaluatorFailedHandlerWaitForCLRBridgeSetup( 258 JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed); 259 } else { 260 evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(), 261 eval, isRestartFailed); 262 } 263 } 264 } 265 } 266 267 private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle, 268 final FailedEvaluator eval, 269 final boolean isRestartFailed) { 270 if (handle == 0) { 271 if (JobDriver.this.handlerManager != null) { 272 final String message = "No CLR FailedEvaluator handler was set, exiting now"; 273 LOG.log(Level.WARNING, message); 274 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 275 } else { 276 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 277 @Override 278 public void onNext(final Alarm time) { 279 if (JobDriver.this.handlerManager != null) { 280 handleFailedEvaluatorInCLR(eval, isRestartFailed); 281 } else { 282 LOG.log(Level.INFO, "Waiting for CLR bridge to be set up"); 283 clock.scheduleAlarm(5000, this); 284 } 285 } 286 }); 287 } 288 } else{ 289 handleFailedEvaluatorInCLR(eval, isRestartFailed); 290 } 291 } 292 293 private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) { 294 final String message = "CLR FailedEvaluator handler set, handling things with CLR handler."; 295 LOG.log(Level.INFO, message); 296 final FailedEvaluatorBridge failedEvaluatorBridge = 297 new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, 298 JobDriver.this.isRestarted, loggingScopeFactory); 299 if (isRestartFailed) { 300 NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext( 301 JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), 302 failedEvaluatorBridge, JobDriver.this.interopLogger); 303 } else { 304 NativeInterop.clrSystemFailedEvaluatorHandlerOnNext( 305 JobDriver.this.handlerManager.getFailedEvaluatorHandler(), 306 failedEvaluatorBridge, 307 JobDriver.this.interopLogger); 308 } 309 310 final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber(); 311 if (additionalRequestedEvaluatorNumber > 0) { 312 LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + 313 additionalRequestedEvaluatorNumber); 314 } 315 316 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 317 } 318 319 /** 320 * Submit a Task to a single Evaluator. 321 */ 322 private void submit(final ActiveContext context) { 323 try { 324 LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context}); 325 if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) { 326 throw new RuntimeException("Active Context Handler not initialized by CLR."); 327 } 328 final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context); 329 NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(), 330 activeContextBridge, JobDriver.this.interopLogger); 331 } catch (final Exception ex) { 332 LOG.log(Level.SEVERE, "Fail to submit task to active context"); 333 context.close(); 334 throw new RuntimeException(ex); 335 } 336 } 337 338 /** 339 * Handles AllocatedEvaluator: Submit an empty context. 340 */ 341 public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 342 @Override 343 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 344 try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) { 345 synchronized (JobDriver.this) { 346 LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); 347 JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess()); 348 } 349 } 350 } 351 } 352 353 /** 354 * Receive notification that a new Context is available. 355 */ 356 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 357 @Override 358 public void onNext(final ActiveContext context) { 359 try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) { 360 synchronized (JobDriver.this) { 361 LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", 362 new Object[]{context.getId()}); 363 JobDriver.this.contexts.put(context.getId(), context); 364 JobDriver.this.submit(context); 365 } 366 } 367 } 368 } 369 370 /** 371 * Receive notification that the Task has completed successfully. 372 */ 373 public final class CompletedTaskHandler implements EventHandler<CompletedTask> { 374 @Override 375 public void onNext(final CompletedTask task) { 376 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 377 try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) { 378 // Take the message returned by the task and add it to the running result. 379 String result = "default result"; 380 try { 381 result = new String(task.get(), StandardCharsets.UTF_8); 382 } catch (final Exception e) { 383 LOG.log(Level.WARNING, "failed to decode task outcome"); 384 } 385 LOG.log(Level.INFO, "Return results to the client:\n{0}", result); 386 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result)); 387 if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) { 388 LOG.log(Level.INFO, "No CLR handler bound to handle completed task."); 389 } else { 390 LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler."); 391 final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory); 392 NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(), 393 completedTaskBridge, JobDriver.this.interopLogger); 394 } 395 } 396 } 397 } 398 399 /** 400 * Receive notification that the entire Evaluator had failed. 401 */ 402 public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 403 @Override 404 public void onNext(final FailedEvaluator eval) { 405 JobDriver.this.handleFailedEvaluator(eval, false); 406 allocatedEvaluatorBridges.remove(eval.getId()); 407 } 408 } 409 410 /** 411 * Receive notification that the entire Evaluator had failed on Driver Restart. 412 */ 413 public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 414 @Override 415 public void onNext(final FailedEvaluator eval) { 416 JobDriver.this.handleFailedEvaluator(eval, true); 417 } 418 } 419 420 final class HttpServerBridgeEventHandler implements HttpHandler { 421 private String uriSpecification; 422 423 /** 424 * returns URI specification for the handler. 425 */ 426 @Override 427 public String getUriSpecification() { 428 return uriSpecification; 429 } 430 431 public void setUriSpecification(final String s) { 432 uriSpecification = s; 433 } 434 435 /** 436 * process http request. 437 */ 438 @Override 439 public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) 440 throws IOException, ServletException { 441 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri()); 442 try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) { 443 final AvroHttpSerializer httpSerializer = new AvroHttpSerializer(); 444 final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest); 445 446 final String requestString = httpSerializer.toString(avroHttpRequest); 447 final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET)); 448 449 try { 450 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes); 451 NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), 452 httpServerEventBridge, JobDriver.this.interopLogger); 453 final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8"); 454 response.getWriter().println(responseBody); 455 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody); 456 } catch (final Exception ex) { 457 LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex); 458 throw new RuntimeException(ex); 459 } 460 } 461 } 462 } 463 464 /** 465 * Handle failed task. 466 */ 467 public final class FailedTaskHandler implements EventHandler<FailedTask> { 468 @Override 469 public void onNext(final FailedTask task) throws RuntimeException { 470 LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set."); 471 if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) { 472 LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real."); 473 throw new RuntimeException("Failed Task Handler not initialized by CLR."); 474 } 475 try { 476 final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory); 477 NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(), 478 failedTaskBridge, JobDriver.this.interopLogger); 479 } catch (final Exception ex) { 480 LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler"); 481 throw new RuntimeException(ex); 482 } 483 } 484 } 485 486 /** 487 * Receive notification that the Task is running. 488 */ 489 public final class RunningTaskHandler implements EventHandler<RunningTask> { 490 @Override 491 public void onNext(final RunningTask task) { 492 try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) { 493 if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) { 494 LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler."); 495 } else { 496 LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId()); 497 try { 498 final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory); 499 NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(), 500 runningTaskBridge, JobDriver.this.interopLogger); 501 } catch (final Exception ex) { 502 LOG.log(Level.WARNING, "Fail to invoke CLR running task handler"); 503 throw new RuntimeException(ex); 504 } 505 } 506 } 507 } 508 } 509 510 /** 511 * Receive notification that the Task is running when driver restarted. 512 */ 513 public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> { 514 @Override 515 public void onNext(final RunningTask task) { 516 try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) { 517 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 518 @Override 519 public void onNext(final Alarm time) { 520 if (JobDriver.this.handlerManager != null) { 521 if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) { 522 LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR."); 523 NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext( 524 JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(), 525 new RunningTaskBridge(task, activeContextBridgeFactory)); 526 } else { 527 LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " + 528 "done with DriverRestartRunningTaskHandler."); 529 } 530 } else { 531 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 532 "before checking out CLR driver restart RunningTaskHandler..."); 533 clock.scheduleAlarm(2000, this); 534 } 535 } 536 }); 537 } 538 } 539 } 540 541 /** 542 * Receive notification that an context is active on Evaluator when the driver restarted. 543 */ 544 public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> { 545 @Override 546 public void onNext(final ActiveContext context) { 547 try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) { 548 JobDriver.this.contexts.put(context.getId(), context); 549 LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId()); 550 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 551 @Override 552 public void onNext(final Alarm time) { 553 if (JobDriver.this.handlerManager != null) { 554 if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) { 555 LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR."); 556 NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext( 557 JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(), 558 activeContextBridgeFactory.getActiveContextBridge(context)); 559 } else { 560 LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " + 561 "done with DriverRestartActiveContextHandler."); 562 } 563 } else { 564 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 565 "before checking out CLR driver restart DriverRestartActiveContextHandler..."); 566 clock.scheduleAlarm(2000, this); 567 } 568 } 569 }); 570 } 571 } 572 } 573 574 /** 575 * Job Driver is ready and the clock is set up: request the evaluators. 576 */ 577 public final class StartHandler implements EventHandler<StartTime> { 578 @Override 579 public void onNext(final StartTime startTime) { 580 try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) { 581 synchronized (JobDriver.this) { 582 583 setupBridge(new DriverStartClrHandlersInitializer(startTime)); 584 LOG.log(Level.INFO, "Driver Started"); 585 } 586 } 587 } 588 } 589 590 591 /** 592 * Job driver is restarted after previous crash. 593 */ 594 public final class RestartHandler implements EventHandler<DriverRestarted> { 595 @Override 596 public void onNext(final DriverRestarted driverRestarted) { 597 try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) { 598 synchronized (JobDriver.this) { 599 600 JobDriver.this.isRestarted = true; 601 setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted)); 602 603 LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up."); 604 } 605 } 606 } 607 } 608 609 /** 610 * Receive notification that driver restart has completed. 611 */ 612 public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> { 613 @Override 614 public void onNext(final DriverRestartCompleted driverRestartCompleted) { 615 LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", 616 driverRestartCompleted.getCompletedTime()); 617 try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted( 618 driverRestartCompleted.getCompletedTime().getTimeStamp())) { 619 if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) { 620 LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); 621 622 NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext( 623 JobDriver.this.handlerManager.getDriverRestartCompletedHandler(), 624 new DriverRestartCompletedBridge(driverRestartCompleted)); 625 } else { 626 LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler."); 627 } 628 } 629 } 630 } 631 632 /** 633 * Shutting down the job driver: close the evaluators. 634 */ 635 final class StopHandler implements EventHandler<StopTime> { 636 @Override 637 public void onNext(final StopTime time) { 638 LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time}); 639 try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) { 640 for (final ActiveContext context : contexts.values()) { 641 context.close(); 642 } 643 } 644 } 645 } 646 647 /** 648 * Handler for message received from the Task. 649 */ 650 public final class TaskMessageHandler implements EventHandler<TaskMessage> { 651 @Override 652 public void onNext(final TaskMessage taskMessage) { 653 final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8); 654 LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg); 655 //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) { 656 if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) { 657 final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage); 658 // if CLR implements the task message handler, handle the bytes in CLR handler 659 NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(), 660 taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger); 661 } 662 //} 663 } 664 } 665 666 /** 667 * Receive notification that the Task has been suspended. 668 */ 669 public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { 670 @Override 671 public void onNext(final SuspendedTask task) { 672 final String message = "Received notification that task [" + task.getId() + "] has been suspended."; 673 LOG.log(Level.INFO, message); 674 try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) { 675 if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) { 676 final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory); 677 // if CLR implements the suspended task handler, handle it in CLR 678 LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge."); 679 NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(), 680 suspendedTaskBridge); 681 } 682 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message)); 683 } 684 } 685 } 686 687 /** 688 * Receive notification that the Evaluator has been shut down. 689 */ 690 public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { 691 @Override 692 public void onNext(final CompletedEvaluator evaluator) { 693 LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId()); 694 try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) { 695 if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) { 696 final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator); 697 // if CLR implements the completed evaluator handler, handle it in CLR 698 LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge."); 699 NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext( 700 JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge); 701 allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId()); 702 } 703 } 704 } 705 } 706 707 708 /** 709 * Receive notification that the Context had completed. 710 * Remove context from the list of active context. 711 */ 712 public final class ClosedContextHandler implements EventHandler<ClosedContext> { 713 @Override 714 public void onNext(final ClosedContext context) { 715 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 716 try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) { 717 if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) { 718 final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory); 719 // if CLR implements the closed context handler, handle it in CLR 720 LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge."); 721 NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(), 722 closedContextBridge); 723 } 724 synchronized (JobDriver.this) { 725 JobDriver.this.contexts.remove(context.getId()); 726 } 727 } 728 } 729 } 730 731 732 /** 733 * Receive notification that the Context had failed. 734 * Remove context from the list of active context and notify the client. 735 */ 736 public final class FailedContextHandler implements EventHandler<FailedContext> { 737 @Override 738 public void onNext(final FailedContext context) { 739 LOG.log(Level.SEVERE, "FailedContext", context); 740 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) { 741 if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) { 742 final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory); 743 // if CLR implements the failed context handler, handle it in CLR 744 LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge."); 745 NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(), 746 failedContextBridge); 747 } 748 synchronized (JobDriver.this) { 749 JobDriver.this.contexts.remove(context.getId()); 750 } 751 final Optional<byte[]> err = context.getData(); 752 if (err.isPresent()) { 753 JobDriver.this.jobMessageObserver.sendMessageToClient(err.get()); 754 } 755 } 756 } 757 } 758 759 /** 760 * Receive notification that a ContextMessage has been received. 761 */ 762 public final class ContextMessageHandler implements EventHandler<ContextMessage> { 763 @Override 764 public void onNext(final ContextMessage message) { 765 LOG.log(Level.SEVERE, "Received ContextMessage:", message.get()); 766 try (final LoggingScope ls = 767 loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) { 768 if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) { 769 final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message); 770 // if CLR implements the context message handler, handle it in CLR 771 LOG.log(Level.INFO, "Handling the event of context message in CLR bridge."); 772 NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(), 773 contextMessageBridge); 774 } 775 } 776 } 777 } 778 779 /** 780 * Gets the progress of the application from .NET side. 781 */ 782 public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider { 783 @Override 784 public float getProgress() { 785 if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) { 786 return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider()); 787 } 788 789 return 0f; 790 } 791 } 792}