001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.javabridge.generic; 020 021import org.apache.reef.driver.client.JobMessageObserver; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ClosedContext; 024import org.apache.reef.driver.context.ContextMessage; 025import org.apache.reef.driver.context.FailedContext; 026import org.apache.reef.driver.evaluator.*; 027import org.apache.reef.driver.restart.DriverRestarted; 028import org.apache.reef.driver.task.*; 029import org.apache.reef.io.network.naming.NameServer; 030import org.apache.reef.javabridge.*; 031import org.apache.reef.driver.restart.DriverRestartCompleted; 032import org.apache.reef.runtime.common.driver.DriverStatusManager; 033import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; 034import org.apache.reef.runtime.common.files.REEFFileNames; 035import org.apache.reef.tang.annotations.Parameter; 036import org.apache.reef.tang.annotations.Unit; 037import org.apache.reef.util.Optional; 038import org.apache.reef.util.logging.CLRBufferedLogHandler; 039import org.apache.reef.util.logging.LoggingScope; 040import org.apache.reef.util.logging.LoggingScopeFactory; 041import org.apache.reef.wake.EventHandler; 042import org.apache.reef.wake.remote.address.LocalAddressProvider; 043import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 044import org.apache.reef.wake.time.Clock; 045import org.apache.reef.wake.time.event.Alarm; 046import org.apache.reef.wake.time.event.StartTime; 047import org.apache.reef.wake.time.event.StopTime; 048import org.apache.reef.webserver.*; 049 050import javax.inject.Inject; 051import javax.servlet.ServletException; 052import javax.servlet.http.HttpServletResponse; 053import java.io.*; 054import java.nio.charset.Charset; 055import java.nio.charset.StandardCharsets; 056import java.util.*; 057import java.util.logging.Handler; 058import java.util.logging.Level; 059import java.util.logging.Logger; 060 061/** 062 * Generic job driver for CLRBridge. 063 */ 064@Unit 065public final class JobDriver { 066 067 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 068 /** 069 * String codec is used to encode the results 070 * before passing them back to the client. 071 */ 072 private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); 073 private final InteropLogger interopLogger = new InteropLogger(); 074 private final NameServer nameServer; 075 private final String nameServerInfo; 076 private final HttpServer httpServer; 077 private final ActiveContextBridgeFactory activeContextBridgeFactory; 078 private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory; 079 080 /** 081 * Wake clock is used to schedule periodical job check-ups. 082 */ 083 private final Clock clock; 084 /** 085 * Job observer on the client. 086 * We use it to send results from the driver back to the client. 087 */ 088 private final JobMessageObserver jobMessageObserver; 089 /** 090 * Job driver uses EvaluatorRequestor 091 * to request Evaluators that will run the Tasks. 092 */ 093 private final EvaluatorRequestor evaluatorRequestor; 094 095 /** 096 * Driver status manager to monitor driver status. 097 */ 098 private final DriverStatusManager driverStatusManager; 099 100 /** 101 * Factory to setup new CLR process configurations. 102 */ 103 private final CLRProcessFactory clrProcessFactory; 104 105 /** 106 * Shell execution results from each Evaluator. 107 */ 108 private final List<String> results = new ArrayList<>(); 109 /** 110 * Map from context ID to running evaluator context. 111 */ 112 private final Map<String, ActiveContext> contexts = new HashMap<>(); 113 114 private final REEFFileNames reefFileNames; 115 private final LocalAddressProvider localAddressProvider; 116 /** 117 * Logging scope factory that provides LoggingScope. 118 */ 119 private final LoggingScopeFactory loggingScopeFactory; 120 private final Set<String> definedRuntimes; 121 122 private BridgeHandlerManager handlerManager = null; 123 private boolean isRestarted = false; 124 // We are holding on to following on bridge side. 125 // Need to add references here so that GC does not collect them. 126 private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges = 127 new HashMap<>(); 128 private EvaluatorRequestorBridge evaluatorRequestorBridge; 129 130 131 /** 132 * Job driver constructor. 133 * All parameters are injected from TANG automatically. 134 * 135 * @param clock Wake clock to schedule and check up running jobs. 136 * @param jobMessageObserver is used to send messages back to the client. 137 * @param evaluatorRequestor is used to request Evaluators. 138 * @param activeContextBridgeFactory 139 */ 140 @Inject 141 JobDriver(final Clock clock, 142 final HttpServer httpServer, 143 final NameServer nameServer, 144 final JobMessageObserver jobMessageObserver, 145 final EvaluatorRequestor evaluatorRequestor, 146 final DriverStatusManager driverStatusManager, 147 final LoggingScopeFactory loggingScopeFactory, 148 final LocalAddressProvider localAddressProvider, 149 final ActiveContextBridgeFactory activeContextBridgeFactory, 150 final REEFFileNames reefFileNames, 151 final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory, 152 final CLRProcessFactory clrProcessFactory, 153 @Parameter(DefinedRuntimes.class) final Set<String> definedRuntimes) { 154 this.clock = clock; 155 this.httpServer = httpServer; 156 this.jobMessageObserver = jobMessageObserver; 157 this.evaluatorRequestor = evaluatorRequestor; 158 this.nameServer = nameServer; 159 this.driverStatusManager = driverStatusManager; 160 this.activeContextBridgeFactory = activeContextBridgeFactory; 161 this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory; 162 this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort(); 163 this.loggingScopeFactory = loggingScopeFactory; 164 this.reefFileNames = reefFileNames; 165 this.localAddressProvider = localAddressProvider; 166 this.clrProcessFactory = clrProcessFactory; 167 this.definedRuntimes = definedRuntimes; 168 } 169 170 private void setupBridge() { 171 // Signal to the clr buffered log handler that the driver has started and that 172 // we can begin logging 173 LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler..."); 174 try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) { 175 final CLRBufferedLogHandler handler = getCLRBufferedLogHandler(); 176 if (handler == null) { 177 LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized"); 178 } else { 179 handler.setDriverInitialized(); 180 LOG.log(Level.INFO, "CLRBufferedLogHandler init complete."); 181 } 182 183 final String portNumber = httpServer == null ? null : Integer.toString(httpServer.getPort()); 184 if (portNumber != null) { 185 try { 186 final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint()); 187 BufferedWriter out = new BufferedWriter( 188 new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8)); 189 out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n"); 190 out.close(); 191 } catch (IOException ex) { 192 throw new RuntimeException(ex); 193 } 194 } 195 196 this.evaluatorRequestorBridge = 197 new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory, 198 JobDriver.this.definedRuntimes); 199 JobDriver.this.handlerManager = new BridgeHandlerManager(); 200 NativeInterop.clrSystemSetupBridgeHandlerManager(portNumber, 201 JobDriver.this.handlerManager, evaluatorRequestorBridge); 202 203 try (final LoggingScope lp = 204 this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) { 205 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC"); 206 NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), 207 httpServerEventBridge, this.interopLogger); 208 final String specList = httpServerEventBridge.getUriSpecification(); 209 LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList); 210 if (specList != null) { 211 final String[] specs = specList.split(":"); 212 for (final String s : specs) { 213 final HttpHandler h = new HttpServerBridgeEventHandler(); 214 h.setUriSpecification(s); 215 this.httpServer.addHttpHandler(h); 216 } 217 } 218 } 219 } 220 LOG.log(Level.INFO, "CLR Bridge setup."); 221 } 222 223 private CLRBufferedLogHandler getCLRBufferedLogHandler() { 224 for (final Handler handler : Logger.getLogger("").getHandlers()) { 225 if (handler instanceof CLRBufferedLogHandler) { 226 return (CLRBufferedLogHandler) handler; 227 } 228 } 229 return null; 230 } 231 232 private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) { 233 synchronized (JobDriver.this) { 234 eval.setProcess(process); 235 LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}", 236 new Object[]{eval.getId(), JobDriver.this.contexts.size()}); 237 if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) { 238 throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); 239 } 240 final AllocatedEvaluatorBridge allocatedEvaluatorBridge = 241 this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); 242 allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); 243 NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext( 244 JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger); 245 } 246 } 247 248 private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) { 249 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) { 250 synchronized (JobDriver.this) { 251 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 252 for (final FailedContext failedContext : eval.getFailedContextList()) { 253 final String failedContextId = failedContext.getId(); 254 LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); 255 JobDriver.this.contexts.remove(failedContextId); 256 } 257 String message = "Evaluator " + eval.getId() + " failed with message: " 258 + eval.getEvaluatorException().getMessage(); 259 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 260 261 if (isRestartFailed) { 262 evaluatorFailedHandlerWaitForCLRBridgeSetup( 263 JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed); 264 } else { 265 evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(), 266 eval, isRestartFailed); 267 } 268 } 269 } 270 } 271 272 private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle, 273 final FailedEvaluator eval, 274 final boolean isRestartFailed) { 275 if (handle == 0) { 276 if (JobDriver.this.handlerManager != null) { 277 final String message = "No CLR FailedEvaluator handler was set, exiting now"; 278 LOG.log(Level.WARNING, message); 279 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 280 } else { 281 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 282 @Override 283 public void onNext(final Alarm time) { 284 if (JobDriver.this.handlerManager != null) { 285 handleFailedEvaluatorInCLR(eval, isRestartFailed); 286 } else { 287 LOG.log(Level.INFO, "Waiting for CLR bridge to be set up"); 288 clock.scheduleAlarm(5000, this); 289 } 290 } 291 }); 292 } 293 } else{ 294 handleFailedEvaluatorInCLR(eval, isRestartFailed); 295 } 296 } 297 298 private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) { 299 final String message = "CLR FailedEvaluator handler set, handling things with CLR handler."; 300 LOG.log(Level.INFO, message); 301 final FailedEvaluatorBridge failedEvaluatorBridge = 302 new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, 303 JobDriver.this.isRestarted, loggingScopeFactory, activeContextBridgeFactory, JobDriver.this.definedRuntimes); 304 if (isRestartFailed) { 305 NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext( 306 JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), 307 failedEvaluatorBridge, JobDriver.this.interopLogger); 308 } else { 309 NativeInterop.clrSystemFailedEvaluatorHandlerOnNext( 310 JobDriver.this.handlerManager.getFailedEvaluatorHandler(), 311 failedEvaluatorBridge, 312 JobDriver.this.interopLogger); 313 } 314 315 final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber(); 316 if (additionalRequestedEvaluatorNumber > 0) { 317 LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + 318 additionalRequestedEvaluatorNumber); 319 } 320 321 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 322 } 323 324 /** 325 * Submit a Task to a single Evaluator. 326 */ 327 private void submit(final ActiveContext context) { 328 try { 329 LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context}); 330 if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) { 331 throw new RuntimeException("Active Context Handler not initialized by CLR."); 332 } 333 final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context); 334 NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(), 335 activeContextBridge, JobDriver.this.interopLogger); 336 } catch (final Exception ex) { 337 LOG.log(Level.SEVERE, "Fail to submit task to active context"); 338 context.close(); 339 throw new RuntimeException(ex); 340 } 341 } 342 343 /** 344 * Handles AllocatedEvaluator: Submit an empty context. 345 */ 346 public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 347 @Override 348 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 349 try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) { 350 synchronized (JobDriver.this) { 351 LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); 352 JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess()); 353 } 354 } 355 } 356 } 357 358 /** 359 * Receive notification that a new Context is available. 360 */ 361 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 362 @Override 363 public void onNext(final ActiveContext context) { 364 try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) { 365 synchronized (JobDriver.this) { 366 LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", 367 new Object[]{context.getId()}); 368 JobDriver.this.contexts.put(context.getId(), context); 369 JobDriver.this.submit(context); 370 } 371 } 372 } 373 } 374 375 /** 376 * Receive notification that the Task has completed successfully. 377 */ 378 public final class CompletedTaskHandler implements EventHandler<CompletedTask> { 379 @Override 380 public void onNext(final CompletedTask task) { 381 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 382 try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) { 383 // Take the message returned by the task and add it to the running result. 384 String result = "default result"; 385 try { 386 result = new String(task.get(), StandardCharsets.UTF_8); 387 } catch (final Exception e) { 388 LOG.log(Level.WARNING, "failed to decode task outcome"); 389 } 390 LOG.log(Level.INFO, "Return results to the client:\n{0}", result); 391 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result)); 392 if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) { 393 LOG.log(Level.INFO, "No CLR handler bound to handle completed task."); 394 } else { 395 LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler."); 396 final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory); 397 NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(), 398 completedTaskBridge, JobDriver.this.interopLogger); 399 } 400 } 401 } 402 } 403 404 /** 405 * Receive notification that the entire Evaluator had failed. 406 */ 407 public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 408 @Override 409 public void onNext(final FailedEvaluator eval) { 410 JobDriver.this.handleFailedEvaluator(eval, false); 411 allocatedEvaluatorBridges.remove(eval.getId()); 412 } 413 } 414 415 /** 416 * Receive notification that the entire Evaluator had failed on Driver Restart. 417 */ 418 public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 419 @Override 420 public void onNext(final FailedEvaluator eval) { 421 JobDriver.this.handleFailedEvaluator(eval, true); 422 } 423 } 424 425 final class HttpServerBridgeEventHandler implements HttpHandler { 426 private String uriSpecification; 427 428 /** 429 * returns URI specification for the handler. 430 */ 431 @Override 432 public String getUriSpecification() { 433 return uriSpecification; 434 } 435 436 public void setUriSpecification(final String s) { 437 uriSpecification = s; 438 } 439 440 /** 441 * process http request. 442 */ 443 @Override 444 public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) 445 throws IOException, ServletException { 446 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri()); 447 try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) { 448 final AvroHttpSerializer httpSerializer = new AvroHttpSerializer(); 449 final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest); 450 451 final String requestString = httpSerializer.toString(avroHttpRequest); 452 final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET)); 453 454 try { 455 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes); 456 NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), 457 httpServerEventBridge, JobDriver.this.interopLogger); 458 final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8"); 459 response.getWriter().println(responseBody); 460 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody); 461 } catch (final Exception ex) { 462 LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex); 463 throw new RuntimeException(ex); 464 } 465 } 466 } 467 } 468 469 /** 470 * Handle failed task. 471 */ 472 public final class FailedTaskHandler implements EventHandler<FailedTask> { 473 @Override 474 public void onNext(final FailedTask task) throws RuntimeException { 475 LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set."); 476 if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) { 477 LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real."); 478 throw new RuntimeException("Failed Task Handler not initialized by CLR."); 479 } 480 try { 481 final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory); 482 NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(), 483 failedTaskBridge, JobDriver.this.interopLogger); 484 } catch (final Exception ex) { 485 LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler"); 486 throw new RuntimeException(ex); 487 } 488 } 489 } 490 491 /** 492 * Receive notification that the Task is running. 493 */ 494 public final class RunningTaskHandler implements EventHandler<RunningTask> { 495 @Override 496 public void onNext(final RunningTask task) { 497 try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) { 498 if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) { 499 LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler."); 500 } else { 501 LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId()); 502 try { 503 final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory); 504 NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(), 505 runningTaskBridge, JobDriver.this.interopLogger); 506 } catch (final Exception ex) { 507 LOG.log(Level.WARNING, "Fail to invoke CLR running task handler"); 508 throw new RuntimeException(ex); 509 } 510 } 511 } 512 } 513 } 514 515 /** 516 * Receive notification that the Task is running when driver restarted. 517 */ 518 public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> { 519 @Override 520 public void onNext(final RunningTask task) { 521 try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) { 522 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 523 @Override 524 public void onNext(final Alarm time) { 525 if (JobDriver.this.handlerManager != null) { 526 if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) { 527 LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR."); 528 NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext( 529 JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(), 530 new RunningTaskBridge(task, activeContextBridgeFactory)); 531 } else { 532 LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " + 533 "done with DriverRestartRunningTaskHandler."); 534 } 535 } else { 536 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 537 "before checking out CLR driver restart RunningTaskHandler..."); 538 clock.scheduleAlarm(2000, this); 539 } 540 } 541 }); 542 } 543 } 544 } 545 546 /** 547 * Receive notification that an context is active on Evaluator when the driver restarted. 548 */ 549 public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> { 550 @Override 551 public void onNext(final ActiveContext context) { 552 try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) { 553 JobDriver.this.contexts.put(context.getId(), context); 554 LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId()); 555 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 556 @Override 557 public void onNext(final Alarm time) { 558 if (JobDriver.this.handlerManager != null) { 559 if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) { 560 LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR."); 561 NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext( 562 JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(), 563 activeContextBridgeFactory.getActiveContextBridge(context)); 564 } else { 565 LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " + 566 "done with DriverRestartActiveContextHandler."); 567 } 568 } else { 569 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 570 "before checking out CLR driver restart DriverRestartActiveContextHandler..."); 571 clock.scheduleAlarm(2000, this); 572 } 573 } 574 }); 575 } 576 } 577 } 578 579 /** 580 * Job Driver is ready and the clock is set up: request the evaluators. 581 */ 582 public final class StartHandler implements EventHandler<StartTime> { 583 @Override 584 public void onNext(final StartTime startTime) { 585 try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) { 586 // CLR bridge setup must be done before other event handlers try to access the CLR bridge 587 // thus we grab a lock on this instance 588 synchronized (JobDriver.this) { 589 setupBridge(); 590 LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", startTime); 591 } 592 593 NativeInterop.callClrSystemOnStartHandler(); 594 LOG.log(Level.INFO, "Driver Started"); 595 } 596 } 597 } 598 599 600 /** 601 * Job driver is restarted after previous crash. 602 */ 603 public final class RestartHandler implements EventHandler<DriverRestarted> { 604 @Override 605 public void onNext(final DriverRestarted driverRestarted) { 606 try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) { 607 // CLR bridge setup must be done before other event handlers try to access the CLR bridge 608 // thus we lock on this instance 609 synchronized (JobDriver.this) { 610 JobDriver.this.isRestarted = true; 611 setupBridge(); 612 LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", driverRestarted); 613 } 614 615 NativeInterop.callClrSystemOnRestartHandler(new DriverRestartedBridge(driverRestarted)); 616 LOG.log(Level.INFO, "Driver Restarted"); 617 } 618 } 619 } 620 621 /** 622 * Receive notification that driver restart has completed. 623 */ 624 public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> { 625 @Override 626 public void onNext(final DriverRestartCompleted driverRestartCompleted) { 627 LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", 628 driverRestartCompleted.getCompletedTime()); 629 try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted( 630 driverRestartCompleted.getCompletedTime().getTimestamp())) { 631 if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) { 632 LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); 633 634 NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext( 635 JobDriver.this.handlerManager.getDriverRestartCompletedHandler(), 636 new DriverRestartCompletedBridge(driverRestartCompleted)); 637 } else { 638 LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler."); 639 } 640 } 641 } 642 } 643 644 /** 645 * Shutting down the job driver: close the evaluators. 646 */ 647 final class StopHandler implements EventHandler<StopTime> { 648 @Override 649 public void onNext(final StopTime time) { 650 LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time}); 651 try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimestamp())) { 652 for (final ActiveContext context : contexts.values()) { 653 context.close(); 654 } 655 } 656 } 657 } 658 659 /** 660 * Handler for message received from the Task. 661 */ 662 public final class TaskMessageHandler implements EventHandler<TaskMessage> { 663 @Override 664 public void onNext(final TaskMessage taskMessage) { 665 final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8); 666 LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg); 667 //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) { 668 if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) { 669 final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage); 670 // if CLR implements the task message handler, handle the bytes in CLR handler 671 NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(), 672 taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger); 673 } 674 //} 675 } 676 } 677 678 /** 679 * Receive notification that the Task has been suspended. 680 */ 681 public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { 682 @Override 683 public void onNext(final SuspendedTask task) { 684 final String message = "Received notification that task [" + task.getId() + "] has been suspended."; 685 LOG.log(Level.INFO, message); 686 try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) { 687 if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) { 688 final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory); 689 // if CLR implements the suspended task handler, handle it in CLR 690 LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge."); 691 NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(), 692 suspendedTaskBridge); 693 } 694 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message)); 695 } 696 } 697 } 698 699 /** 700 * Receive notification that the Evaluator has been shut down. 701 */ 702 public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { 703 @Override 704 public void onNext(final CompletedEvaluator evaluator) { 705 LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId()); 706 try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) { 707 if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) { 708 final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator); 709 // if CLR implements the completed evaluator handler, handle it in CLR 710 LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge."); 711 NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext( 712 JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge); 713 allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId()); 714 } 715 } 716 } 717 } 718 719 720 /** 721 * Receive notification that the Context had completed. 722 * Remove context from the list of active context. 723 */ 724 public final class ClosedContextHandler implements EventHandler<ClosedContext> { 725 @Override 726 public void onNext(final ClosedContext context) { 727 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 728 try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) { 729 if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) { 730 final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory); 731 // if CLR implements the closed context handler, handle it in CLR 732 LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge."); 733 NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(), 734 closedContextBridge); 735 } 736 synchronized (JobDriver.this) { 737 JobDriver.this.contexts.remove(context.getId()); 738 } 739 } 740 } 741 } 742 743 744 /** 745 * Receive notification that the Context had failed. 746 * Remove context from the list of active context and notify the client. 747 */ 748 public final class FailedContextHandler implements EventHandler<FailedContext> { 749 @Override 750 public void onNext(final FailedContext context) { 751 LOG.log(Level.SEVERE, "FailedContext", context); 752 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) { 753 if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) { 754 final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory); 755 // if CLR implements the failed context handler, handle it in CLR 756 LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge."); 757 NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(), 758 failedContextBridge); 759 } 760 synchronized (JobDriver.this) { 761 JobDriver.this.contexts.remove(context.getId()); 762 } 763 final Optional<byte[]> err = context.getData(); 764 if (err.isPresent()) { 765 JobDriver.this.jobMessageObserver.sendMessageToClient(err.get()); 766 } 767 } 768 } 769 } 770 771 /** 772 * Receive notification that a ContextMessage has been received. 773 */ 774 public final class ContextMessageHandler implements EventHandler<ContextMessage> { 775 @Override 776 public void onNext(final ContextMessage message) { 777 LOG.log(Level.SEVERE, "Received ContextMessage:", message.get()); 778 try (final LoggingScope ls = 779 loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) { 780 if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) { 781 final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message); 782 // if CLR implements the context message handler, handle it in CLR 783 LOG.log(Level.INFO, "Handling the event of context message in CLR bridge."); 784 NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(), 785 contextMessageBridge); 786 } 787 } 788 } 789 } 790 791 /** 792 * Gets the progress of the application from .NET side. 793 */ 794 public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider { 795 @Override 796 public float getProgress() { 797 if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) { 798 return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider()); 799 } 800 801 return 0f; 802 } 803 } 804}