This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.javabridge.generic;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ClosedContext;
024import org.apache.reef.driver.context.ContextMessage;
025import org.apache.reef.driver.context.FailedContext;
026import org.apache.reef.driver.evaluator.*;
027import org.apache.reef.driver.restart.DriverRestarted;
028import org.apache.reef.driver.task.*;
029import org.apache.reef.io.network.naming.NameServer;
030import org.apache.reef.javabridge.*;
031import org.apache.reef.driver.restart.DriverRestartCompleted;
032import org.apache.reef.runtime.common.driver.DriverStatusManager;
033import org.apache.reef.runtime.common.files.REEFFileNames;
034import org.apache.reef.tang.annotations.Unit;
035import org.apache.reef.util.Optional;
036import org.apache.reef.util.logging.CLRBufferedLogHandler;
037import org.apache.reef.util.logging.LoggingScope;
038import org.apache.reef.util.logging.LoggingScopeFactory;
039import org.apache.reef.wake.EventHandler;
040import org.apache.reef.wake.remote.address.LocalAddressProvider;
041import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
042import org.apache.reef.wake.time.Clock;
043import org.apache.reef.wake.time.event.Alarm;
044import org.apache.reef.wake.time.event.StartTime;
045import org.apache.reef.wake.time.event.StopTime;
046import org.apache.reef.webserver.*;
047
048import javax.inject.Inject;
049import javax.servlet.ServletException;
050import javax.servlet.http.HttpServletResponse;
051import java.io.*;
052import java.nio.charset.Charset;
053import java.nio.charset.StandardCharsets;
054import java.util.ArrayList;
055import java.util.HashMap;
056import java.util.List;
057import java.util.Map;
058import java.util.logging.Handler;
059import java.util.logging.Level;
060import java.util.logging.Logger;
061
062/**
063 * Generic job driver for CLRBridge.
064 */
065@Unit
066public final class JobDriver {
067
068  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
069  /**
070   * String codec is used to encode the results
071   * before passing them back to the client.
072   */
073  private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
074  private final InteropLogger interopLogger = new InteropLogger();
075  private final NameServer nameServer;
076  private final String nameServerInfo;
077  private final HttpServer httpServer;
078  private final ActiveContextBridgeFactory activeContextBridgeFactory;
079  private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory;
080
081  /**
082   * Wake clock is used to schedule periodical job check-ups.
083   */
084  private final Clock clock;
085  /**
086   * Job observer on the client.
087   * We use it to send results from the driver back to the client.
088   */
089  private final JobMessageObserver jobMessageObserver;
090  /**
091   * Job driver uses EvaluatorRequestor
092   * to request Evaluators that will run the Tasks.
093   */
094  private final EvaluatorRequestor evaluatorRequestor;
095
096  /**
097   * Driver status manager to monitor driver status.
098   */
099  private final DriverStatusManager driverStatusManager;
100
101  /**
102   * Factory to setup new CLR process configurations.
103   */
104  private final CLRProcessFactory clrProcessFactory;
105
106  /**
107   * Shell execution results from each Evaluator.
108   */
109  private final List<String> results = new ArrayList<>();
110  /**
111   * Map from context ID to running evaluator context.
112   */
113  private final Map<String, ActiveContext> contexts = new HashMap<>();
114
115  private final REEFFileNames reefFileNames;
116  private final LocalAddressProvider localAddressProvider;
117  /**
118   * Logging scope factory that provides LoggingScope.
119   */
120  private final LoggingScopeFactory loggingScopeFactory;
121
122  private BridgeHandlerManager handlerManager = null;
123  private boolean isRestarted = false;
124  // We are holding on to following on bridge side.
125  // Need to add references here so that GC does not collect them.
126  private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges =
127      new HashMap<>();
128  private EvaluatorRequestorBridge evaluatorRequestorBridge;
129
130
131  /**
132   * Job driver constructor.
133   * All parameters are injected from TANG automatically.
134   *
135   * @param clock                      Wake clock to schedule and check up running jobs.
136   * @param jobMessageObserver         is used to send messages back to the client.
137   * @param evaluatorRequestor         is used to request Evaluators.
138   * @param activeContextBridgeFactory
139   */
140  @Inject
141  JobDriver(final Clock clock,
142            final HttpServer httpServer,
143            final NameServer nameServer,
144            final JobMessageObserver jobMessageObserver,
145            final EvaluatorRequestor evaluatorRequestor,
146            final DriverStatusManager driverStatusManager,
147            final LoggingScopeFactory loggingScopeFactory,
148            final LocalAddressProvider localAddressProvider,
149            final ActiveContextBridgeFactory activeContextBridgeFactory,
150            final REEFFileNames reefFileNames,
151            final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory,
152            final CLRProcessFactory clrProcessFactory) {
153    this.clock = clock;
154    this.httpServer = httpServer;
155    this.jobMessageObserver = jobMessageObserver;
156    this.evaluatorRequestor = evaluatorRequestor;
157    this.nameServer = nameServer;
158    this.driverStatusManager = driverStatusManager;
159    this.activeContextBridgeFactory = activeContextBridgeFactory;
160    this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory;
161    this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort();
162    this.loggingScopeFactory = loggingScopeFactory;
163    this.reefFileNames = reefFileNames;
164    this.localAddressProvider = localAddressProvider;
165    this.clrProcessFactory = clrProcessFactory;
166  }
167
168  private void setupBridge(final ClrHandlersInitializer initializer) {
169    // Signal to the clr buffered log handler that the driver has started and that
170    // we can begin logging
171    LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
172    try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) {
173      final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
174      if (handler == null) {
175        LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
176      } else {
177        handler.setDriverInitialized();
178        LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
179      }
180
181      final String portNumber = httpServer == null ? null : Integer.toString(httpServer.getPort());
182      if (portNumber != null) {
183        try {
184          final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint());
185          BufferedWriter out = new BufferedWriter(
186              new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8));
187          out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n");
188          out.close();
189        } catch (IOException ex) {
190          throw new RuntimeException(ex);
191        }
192      }
193
194      this.evaluatorRequestorBridge =
195          new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory);
196      JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge);
197
198      try (final LoggingScope lp =
199               this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) {
200        final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
201        NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
202            httpServerEventBridge, this.interopLogger);
203        final String specList = httpServerEventBridge.getUriSpecification();
204        LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
205        if (specList != null) {
206          final String[] specs = specList.split(":");
207          for (final String s : specs) {
208            final HttpHandler h = new HttpServerBridgeEventHandler();
209            h.setUriSpecification(s);
210            this.httpServer.addHttpHandler(h);
211          }
212        }
213      }
214    }
215    LOG.log(Level.INFO, "CLR Bridge setup.");
216  }
217
218  private CLRBufferedLogHandler getCLRBufferedLogHandler() {
219    for (final Handler handler : Logger.getLogger("").getHandlers()) {
220      if (handler instanceof CLRBufferedLogHandler) {
221        return (CLRBufferedLogHandler) handler;
222      }
223    }
224    return null;
225  }
226
227  private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) {
228    synchronized (JobDriver.this) {
229      eval.setProcess(process);
230      LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
231          new Object[]{eval.getId(), JobDriver.this.contexts.size()});
232      if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) {
233        throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
234      }
235      final AllocatedEvaluatorBridge allocatedEvaluatorBridge =
236          this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo);
237      allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge);
238      NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(
239          JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger);
240    }
241  }
242
243  private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) {
244    try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
245      synchronized (JobDriver.this) {
246        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
247        for (final FailedContext failedContext : eval.getFailedContextList()) {
248          final String failedContextId = failedContext.getId();
249          LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
250          JobDriver.this.contexts.remove(failedContextId);
251        }
252        String message = "Evaluator " + eval.getId() + " failed with message: "
253            + eval.getEvaluatorException().getMessage();
254        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
255
256        if (isRestartFailed) {
257          evaluatorFailedHandlerWaitForCLRBridgeSetup(
258              JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed);
259        } else {
260          evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
261              eval, isRestartFailed);
262        }
263      }
264    }
265  }
266
267  private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle,
268                                                           final FailedEvaluator eval,
269                                                           final boolean isRestartFailed) {
270    if (handle == 0) {
271      if (JobDriver.this.handlerManager != null) {
272        final String message = "No CLR FailedEvaluator handler was set, exiting now";
273        LOG.log(Level.WARNING, message);
274        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
275      } else {
276        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
277          @Override
278          public void onNext(final Alarm time) {
279            if (JobDriver.this.handlerManager != null) {
280              handleFailedEvaluatorInCLR(eval, isRestartFailed);
281            } else {
282              LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
283              clock.scheduleAlarm(5000, this);
284            }
285          }
286        });
287      }
288    } else{
289      handleFailedEvaluatorInCLR(eval, isRestartFailed);
290    }
291  }
292
293  private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) {
294    final String message = "CLR FailedEvaluator handler set, handling things with CLR handler.";
295    LOG.log(Level.INFO, message);
296    final FailedEvaluatorBridge failedEvaluatorBridge =
297        new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
298        JobDriver.this.isRestarted, loggingScopeFactory);
299    if (isRestartFailed) {
300      NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
301          JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(),
302          failedEvaluatorBridge, JobDriver.this.interopLogger);
303    } else {
304      NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(
305          JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
306          failedEvaluatorBridge,
307          JobDriver.this.interopLogger);
308    }
309
310    final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
311    if (additionalRequestedEvaluatorNumber > 0) {
312      LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " +
313          additionalRequestedEvaluatorNumber);
314    }
315
316    JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
317  }
318
319  /**
320   * Submit a Task to a single Evaluator.
321   */
322  private void submit(final ActiveContext context) {
323    try {
324      LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context});
325      if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) {
326        throw new RuntimeException("Active Context Handler not initialized by CLR.");
327      }
328      final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context);
329      NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(),
330          activeContextBridge, JobDriver.this.interopLogger);
331    } catch (final Exception ex) {
332      LOG.log(Level.SEVERE, "Fail to submit task to active context");
333      context.close();
334      throw new RuntimeException(ex);
335    }
336  }
337
338  /**
339   * Handles AllocatedEvaluator: Submit an empty context.
340   */
341  public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
342    @Override
343    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
344      try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
345        synchronized (JobDriver.this) {
346          LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
347          JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess());
348        }
349      }
350    }
351  }
352
353  /**
354   * Receive notification that a new Context is available.
355   */
356  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
357    @Override
358    public void onNext(final ActiveContext context) {
359      try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
360        synchronized (JobDriver.this) {
361          LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
362              new Object[]{context.getId()});
363          JobDriver.this.contexts.put(context.getId(), context);
364          JobDriver.this.submit(context);
365        }
366      }
367    }
368  }
369
370  /**
371   * Receive notification that the Task has completed successfully.
372   */
373  public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
374    @Override
375    public void onNext(final CompletedTask task) {
376      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
377      try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) {
378        // Take the message returned by the task and add it to the running result.
379        String result = "default result";
380        try {
381          result = new String(task.get(), StandardCharsets.UTF_8);
382        } catch (final Exception e) {
383          LOG.log(Level.WARNING, "failed to decode task outcome");
384        }
385        LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
386        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
387        if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) {
388          LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
389        } else {
390          LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
391          final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory);
392          NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(),
393              completedTaskBridge, JobDriver.this.interopLogger);
394        }
395      }
396    }
397  }
398
399  /**
400   * Receive notification that the entire Evaluator had failed.
401   */
402  public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
403    @Override
404    public void onNext(final FailedEvaluator eval) {
405      JobDriver.this.handleFailedEvaluator(eval, false);
406      allocatedEvaluatorBridges.remove(eval.getId());
407    }
408  }
409
410  /**
411   * Receive notification that the entire Evaluator had failed on Driver Restart.
412   */
413  public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
414    @Override
415    public void onNext(final FailedEvaluator eval) {
416      JobDriver.this.handleFailedEvaluator(eval, true);
417    }
418  }
419
420  final class HttpServerBridgeEventHandler implements HttpHandler {
421    private String uriSpecification;
422
423    /**
424     * returns URI specification for the handler.
425     */
426    @Override
427    public String getUriSpecification() {
428      return uriSpecification;
429    }
430
431    public void setUriSpecification(final String s) {
432      uriSpecification = s;
433    }
434
435    /**
436     * process http request.
437     */
438    @Override
439    public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response)
440        throws IOException, ServletException {
441      LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri());
442      try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) {
443        final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
444        final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
445
446        final String requestString = httpSerializer.toString(avroHttpRequest);
447        final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET));
448
449        try {
450          final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
451          NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
452              httpServerEventBridge, JobDriver.this.interopLogger);
453          final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
454          response.getWriter().println(responseBody);
455          LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
456        } catch (final Exception ex) {
457          LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
458          throw new RuntimeException(ex);
459        }
460      }
461    }
462  }
463
464  /**
465   * Handle failed task.
466   */
467  public final class FailedTaskHandler implements EventHandler<FailedTask> {
468    @Override
469    public void onNext(final FailedTask task) throws RuntimeException {
470      LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set.");
471      if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) {
472        LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real.");
473        throw new RuntimeException("Failed Task Handler not initialized by CLR.");
474      }
475      try {
476        final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory);
477        NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(),
478            failedTaskBridge, JobDriver.this.interopLogger);
479      } catch (final Exception ex) {
480        LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
481        throw new RuntimeException(ex);
482      }
483    }
484  }
485
486  /**
487   * Receive notification that the Task is running.
488   */
489  public final class RunningTaskHandler implements EventHandler<RunningTask> {
490    @Override
491    public void onNext(final RunningTask task) {
492      try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
493        if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) {
494          LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
495        } else {
496          LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
497          try {
498            final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory);
499            NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(),
500                runningTaskBridge, JobDriver.this.interopLogger);
501          } catch (final Exception ex) {
502            LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
503            throw new RuntimeException(ex);
504          }
505        }
506      }
507    }
508  }
509
510  /**
511   * Receive notification that the Task is running when driver restarted.
512   */
513  public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> {
514    @Override
515    public void onNext(final RunningTask task) {
516      try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) {
517        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
518          @Override
519          public void onNext(final Alarm time) {
520            if (JobDriver.this.handlerManager != null) {
521              if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) {
522                LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
523                NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext(
524                    JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(),
525                    new RunningTaskBridge(task, activeContextBridgeFactory));
526              } else {
527                LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " +
528                    "done with DriverRestartRunningTaskHandler.");
529              }
530            } else {
531              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
532                  "before checking out CLR driver restart RunningTaskHandler...");
533              clock.scheduleAlarm(2000, this);
534            }
535          }
536        });
537      }
538    }
539  }
540
541  /**
542   * Receive notification that an context is active on Evaluator when the driver restarted.
543   */
544  public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> {
545    @Override
546    public void onNext(final ActiveContext context) {
547      try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) {
548        JobDriver.this.contexts.put(context.getId(), context);
549        LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId());
550        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
551          @Override
552          public void onNext(final Alarm time) {
553            if (JobDriver.this.handlerManager != null) {
554              if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) {
555                LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
556                NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext(
557                    JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(),
558                    activeContextBridgeFactory.getActiveContextBridge(context));
559              } else {
560                LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " +
561                    "done with DriverRestartActiveContextHandler.");
562              }
563            } else {
564              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
565                  "before checking out CLR driver restart DriverRestartActiveContextHandler...");
566              clock.scheduleAlarm(2000, this);
567            }
568          }
569        });
570      }
571    }
572  }
573
574  /**
575   * Job Driver is ready and the clock is set up: request the evaluators.
576   */
577  public final class StartHandler implements EventHandler<StartTime> {
578    @Override
579    public void onNext(final StartTime startTime) {
580      try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
581        synchronized (JobDriver.this) {
582
583          setupBridge(new DriverStartClrHandlersInitializer(startTime));
584          LOG.log(Level.INFO, "Driver Started");
585        }
586      }
587    }
588  }
589
590
591  /**
592   * Job driver is restarted after previous crash.
593   */
594  public final class RestartHandler implements EventHandler<DriverRestarted> {
595    @Override
596    public void onNext(final DriverRestarted driverRestarted) {
597      try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) {
598        synchronized (JobDriver.this) {
599
600          JobDriver.this.isRestarted = true;
601          setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted));
602
603          LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
604        }
605      }
606    }
607  }
608
609  /**
610   * Receive notification that driver restart has completed.
611   */
612  public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
613    @Override
614    public void onNext(final DriverRestartCompleted driverRestartCompleted) {
615      LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ",
616          driverRestartCompleted.getCompletedTime());
617      try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
618          driverRestartCompleted.getCompletedTime().getTimeStamp())) {
619        if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) {
620          LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
621
622          NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext(
623              JobDriver.this.handlerManager.getDriverRestartCompletedHandler(),
624              new DriverRestartCompletedBridge(driverRestartCompleted));
625        } else {
626          LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
627        }
628      }
629    }
630  }
631
632  /**
633   * Shutting down the job driver: close the evaluators.
634   */
635  final class StopHandler implements EventHandler<StopTime> {
636    @Override
637    public void onNext(final StopTime time) {
638      LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
639      try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
640        for (final ActiveContext context : contexts.values()) {
641          context.close();
642        }
643      }
644    }
645  }
646
647  /**
648   * Handler for message received from the Task.
649   */
650  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
651    @Override
652    public void onNext(final TaskMessage taskMessage) {
653      final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8);
654      LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
655      //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
656      if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) {
657        final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
658        // if CLR implements the task message handler, handle the bytes in CLR handler
659        NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(),
660            taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger);
661      }
662      //}
663    }
664  }
665
666  /**
667   * Receive notification that the Task has been suspended.
668   */
669  public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
670    @Override
671    public void onNext(final SuspendedTask task) {
672      final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
673      LOG.log(Level.INFO, message);
674      try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
675        if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) {
676          final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory);
677          // if CLR implements the suspended task handler, handle it in CLR
678          LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
679          NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(),
680              suspendedTaskBridge);
681        }
682        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
683      }
684    }
685  }
686
687  /**
688   * Receive notification that the Evaluator has been shut down.
689   */
690  public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
691    @Override
692    public void onNext(final CompletedEvaluator evaluator) {
693      LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
694      try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
695        if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) {
696          final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
697          // if CLR implements the completed evaluator handler, handle it in CLR
698          LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
699          NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext(
700              JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge);
701          allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId());
702        }
703      }
704    }
705  }
706
707
708  /**
709   * Receive notification that the Context had completed.
710   * Remove context from the list of active context.
711   */
712  public final class ClosedContextHandler implements EventHandler<ClosedContext> {
713    @Override
714    public void onNext(final ClosedContext context) {
715      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
716      try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
717        if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) {
718          final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory);
719          // if CLR implements the closed context handler, handle it in CLR
720          LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
721          NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(),
722              closedContextBridge);
723        }
724        synchronized (JobDriver.this) {
725          JobDriver.this.contexts.remove(context.getId());
726        }
727      }
728    }
729  }
730
731
732  /**
733   * Receive notification that the Context had failed.
734   * Remove context from the list of active context and notify the client.
735   */
736  public final class FailedContextHandler implements EventHandler<FailedContext> {
737    @Override
738    public void onNext(final FailedContext context) {
739      LOG.log(Level.SEVERE, "FailedContext", context);
740      try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
741        if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) {
742          final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory);
743          // if CLR implements the failed context handler, handle it in CLR
744          LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
745          NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(),
746              failedContextBridge);
747        }
748        synchronized (JobDriver.this) {
749          JobDriver.this.contexts.remove(context.getId());
750        }
751        final Optional<byte[]> err = context.getData();
752        if (err.isPresent()) {
753          JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
754        }
755      }
756    }
757  }
758
759  /**
760   * Receive notification that a ContextMessage has been received.
761   */
762  public final class ContextMessageHandler implements EventHandler<ContextMessage> {
763    @Override
764    public void onNext(final ContextMessage message) {
765      LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
766      try (final LoggingScope ls =
767               loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) {
768        if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) {
769          final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
770          // if CLR implements the context message handler, handle it in CLR
771          LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
772          NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(),
773              contextMessageBridge);
774        }
775      }
776    }
777  }
778
779  /**
780   * Gets the progress of the application from .NET side.
781   */
782  public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider {
783    @Override
784    public float getProgress() {
785      if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) {
786        return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider());
787      }
788
789      return 0f;
790    }
791  }
792}