This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.javabridge.generic;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ClosedContext;
024import org.apache.reef.driver.context.ContextMessage;
025import org.apache.reef.driver.context.FailedContext;
026import org.apache.reef.driver.evaluator.*;
027import org.apache.reef.driver.restart.DriverRestarted;
028import org.apache.reef.driver.task.*;
029import org.apache.reef.io.network.naming.NameServer;
030import org.apache.reef.javabridge.*;
031import org.apache.reef.driver.restart.DriverRestartCompleted;
032import org.apache.reef.runtime.common.driver.DriverStatusManager;
033import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
034import org.apache.reef.runtime.common.files.REEFFileNames;
035import org.apache.reef.tang.annotations.Parameter;
036import org.apache.reef.tang.annotations.Unit;
037import org.apache.reef.util.Optional;
038import org.apache.reef.util.logging.CLRBufferedLogHandler;
039import org.apache.reef.util.logging.LoggingScope;
040import org.apache.reef.util.logging.LoggingScopeFactory;
041import org.apache.reef.wake.EventHandler;
042import org.apache.reef.wake.remote.address.LocalAddressProvider;
043import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
044import org.apache.reef.wake.time.Clock;
045import org.apache.reef.wake.time.event.Alarm;
046import org.apache.reef.wake.time.event.StartTime;
047import org.apache.reef.wake.time.event.StopTime;
048import org.apache.reef.webserver.*;
049
050import javax.inject.Inject;
051import javax.servlet.ServletException;
052import javax.servlet.http.HttpServletResponse;
053import java.io.*;
054import java.nio.charset.Charset;
055import java.nio.charset.StandardCharsets;
056import java.util.*;
057import java.util.logging.Handler;
058import java.util.logging.Level;
059import java.util.logging.Logger;
060
061/**
062 * Generic job driver for CLRBridge.
063 */
064@Unit
065public final class JobDriver {
066
067  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
068  /**
069   * String codec is used to encode the results
070   * before passing them back to the client.
071   */
072  private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
073  private final InteropLogger interopLogger = new InteropLogger();
074  private final NameServer nameServer;
075  private final String nameServerInfo;
076  private final HttpServer httpServer;
077  private final ActiveContextBridgeFactory activeContextBridgeFactory;
078  private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory;
079
080  /**
081   * Wake clock is used to schedule periodical job check-ups.
082   */
083  private final Clock clock;
084  /**
085   * Job observer on the client.
086   * We use it to send results from the driver back to the client.
087   */
088  private final JobMessageObserver jobMessageObserver;
089  /**
090   * Job driver uses EvaluatorRequestor
091   * to request Evaluators that will run the Tasks.
092   */
093  private final EvaluatorRequestor evaluatorRequestor;
094
095  /**
096   * Driver status manager to monitor driver status.
097   */
098  private final DriverStatusManager driverStatusManager;
099
100  /**
101   * Factory to setup new CLR process configurations.
102   */
103  private final CLRProcessFactory clrProcessFactory;
104
105  /**
106   * Shell execution results from each Evaluator.
107   */
108  private final List<String> results = new ArrayList<>();
109  /**
110   * Map from context ID to running evaluator context.
111   */
112  private final Map<String, ActiveContext> contexts = new HashMap<>();
113
114  private final REEFFileNames reefFileNames;
115  private final LocalAddressProvider localAddressProvider;
116  /**
117   * Logging scope factory that provides LoggingScope.
118   */
119  private final LoggingScopeFactory loggingScopeFactory;
120  private final Set<String> definedRuntimes;
121
122  private BridgeHandlerManager handlerManager = null;
123  private boolean isRestarted = false;
124  // We are holding on to following on bridge side.
125  // Need to add references here so that GC does not collect them.
126  private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges =
127      new HashMap<>();
128  private EvaluatorRequestorBridge evaluatorRequestorBridge;
129
130
131  /**
132   * Job driver constructor.
133   * All parameters are injected from TANG automatically.
134   *
135   * @param clock                      Wake clock to schedule and check up running jobs.
136   * @param jobMessageObserver         is used to send messages back to the client.
137   * @param evaluatorRequestor         is used to request Evaluators.
138   * @param activeContextBridgeFactory
139   */
140  @Inject
141  JobDriver(final Clock clock,
142            final HttpServer httpServer,
143            final NameServer nameServer,
144            final JobMessageObserver jobMessageObserver,
145            final EvaluatorRequestor evaluatorRequestor,
146            final DriverStatusManager driverStatusManager,
147            final LoggingScopeFactory loggingScopeFactory,
148            final LocalAddressProvider localAddressProvider,
149            final ActiveContextBridgeFactory activeContextBridgeFactory,
150            final REEFFileNames reefFileNames,
151            final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory,
152            final CLRProcessFactory clrProcessFactory,
153            @Parameter(DefinedRuntimes.class) final Set<String> definedRuntimes) {
154    this.clock = clock;
155    this.httpServer = httpServer;
156    this.jobMessageObserver = jobMessageObserver;
157    this.evaluatorRequestor = evaluatorRequestor;
158    this.nameServer = nameServer;
159    this.driverStatusManager = driverStatusManager;
160    this.activeContextBridgeFactory = activeContextBridgeFactory;
161    this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory;
162    this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort();
163    this.loggingScopeFactory = loggingScopeFactory;
164    this.reefFileNames = reefFileNames;
165    this.localAddressProvider = localAddressProvider;
166    this.clrProcessFactory = clrProcessFactory;
167    this.definedRuntimes = definedRuntimes;
168  }
169
170  private void setupBridge() {
171    // Signal to the clr buffered log handler that the driver has started and that
172    // we can begin logging
173    LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
174    try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) {
175      final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
176      if (handler == null) {
177        LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
178      } else {
179        handler.setDriverInitialized();
180        LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
181      }
182
183      final String portNumber = httpServer == null ? null : Integer.toString(httpServer.getPort());
184      if (portNumber != null) {
185        try {
186          final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint());
187          BufferedWriter out = new BufferedWriter(
188              new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8));
189          out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n");
190          out.close();
191        } catch (IOException ex) {
192          throw new RuntimeException(ex);
193        }
194      }
195
196      this.evaluatorRequestorBridge =
197          new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory,
198                  JobDriver.this.definedRuntimes);
199      JobDriver.this.handlerManager = new BridgeHandlerManager();
200      NativeInterop.clrSystemSetupBridgeHandlerManager(portNumber,
201          JobDriver.this.handlerManager, evaluatorRequestorBridge);
202
203      try (final LoggingScope lp =
204               this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) {
205        final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
206        NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
207            httpServerEventBridge, this.interopLogger);
208        final String specList = httpServerEventBridge.getUriSpecification();
209        LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
210        if (specList != null) {
211          final String[] specs = specList.split(":");
212          for (final String s : specs) {
213            final HttpHandler h = new HttpServerBridgeEventHandler();
214            h.setUriSpecification(s);
215            this.httpServer.addHttpHandler(h);
216          }
217        }
218      }
219    }
220    LOG.log(Level.INFO, "CLR Bridge setup.");
221  }
222
223  private CLRBufferedLogHandler getCLRBufferedLogHandler() {
224    for (final Handler handler : Logger.getLogger("").getHandlers()) {
225      if (handler instanceof CLRBufferedLogHandler) {
226        return (CLRBufferedLogHandler) handler;
227      }
228    }
229    return null;
230  }
231
232  private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) {
233    synchronized (JobDriver.this) {
234      eval.setProcess(process);
235      LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
236          new Object[]{eval.getId(), JobDriver.this.contexts.size()});
237      if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) {
238        throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
239      }
240      final AllocatedEvaluatorBridge allocatedEvaluatorBridge =
241          this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo);
242      allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge);
243      NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(
244          JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger);
245    }
246  }
247
248  private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) {
249    try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
250      synchronized (JobDriver.this) {
251        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
252        for (final FailedContext failedContext : eval.getFailedContextList()) {
253          final String failedContextId = failedContext.getId();
254          LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
255          JobDriver.this.contexts.remove(failedContextId);
256        }
257        String message = "Evaluator " + eval.getId() + " failed with message: "
258            + eval.getEvaluatorException().getMessage();
259        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
260
261        if (isRestartFailed) {
262          evaluatorFailedHandlerWaitForCLRBridgeSetup(
263              JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed);
264        } else {
265          evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
266              eval, isRestartFailed);
267        }
268      }
269    }
270  }
271
272  private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle,
273                                                           final FailedEvaluator eval,
274                                                           final boolean isRestartFailed) {
275    if (handle == 0) {
276      if (JobDriver.this.handlerManager != null) {
277        final String message = "No CLR FailedEvaluator handler was set, exiting now";
278        LOG.log(Level.WARNING, message);
279        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
280      } else {
281        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
282          @Override
283          public void onNext(final Alarm time) {
284            if (JobDriver.this.handlerManager != null) {
285              handleFailedEvaluatorInCLR(eval, isRestartFailed);
286            } else {
287              LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
288              clock.scheduleAlarm(5000, this);
289            }
290          }
291        });
292      }
293    } else{
294      handleFailedEvaluatorInCLR(eval, isRestartFailed);
295    }
296  }
297
298  private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) {
299    final String message = "CLR FailedEvaluator handler set, handling things with CLR handler.";
300    LOG.log(Level.INFO, message);
301    final FailedEvaluatorBridge failedEvaluatorBridge =
302        new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
303        JobDriver.this.isRestarted, loggingScopeFactory, activeContextBridgeFactory, JobDriver.this.definedRuntimes);
304    if (isRestartFailed) {
305      NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
306          JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(),
307          failedEvaluatorBridge, JobDriver.this.interopLogger);
308    } else {
309      NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(
310          JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
311          failedEvaluatorBridge,
312          JobDriver.this.interopLogger);
313    }
314
315    final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
316    if (additionalRequestedEvaluatorNumber > 0) {
317      LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " +
318          additionalRequestedEvaluatorNumber);
319    }
320
321    JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
322  }
323
324  /**
325   * Submit a Task to a single Evaluator.
326   */
327  private void submit(final ActiveContext context) {
328    try {
329      LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context});
330      if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) {
331        throw new RuntimeException("Active Context Handler not initialized by CLR.");
332      }
333      final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context);
334      NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(),
335          activeContextBridge, JobDriver.this.interopLogger);
336    } catch (final Exception ex) {
337      LOG.log(Level.SEVERE, "Fail to submit task to active context");
338      context.close();
339      throw new RuntimeException(ex);
340    }
341  }
342
343  /**
344   * Handles AllocatedEvaluator: Submit an empty context.
345   */
346  public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
347    @Override
348    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
349      try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
350        synchronized (JobDriver.this) {
351          LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
352          JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess());
353        }
354      }
355    }
356  }
357
358  /**
359   * Receive notification that a new Context is available.
360   */
361  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
362    @Override
363    public void onNext(final ActiveContext context) {
364      try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
365        synchronized (JobDriver.this) {
366          LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
367              new Object[]{context.getId()});
368          JobDriver.this.contexts.put(context.getId(), context);
369          JobDriver.this.submit(context);
370        }
371      }
372    }
373  }
374
375  /**
376   * Receive notification that the Task has completed successfully.
377   */
378  public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
379    @Override
380    public void onNext(final CompletedTask task) {
381      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
382      try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) {
383        // Take the message returned by the task and add it to the running result.
384        String result = "default result";
385        try {
386          result = new String(task.get(), StandardCharsets.UTF_8);
387        } catch (final Exception e) {
388          LOG.log(Level.WARNING, "failed to decode task outcome");
389        }
390        LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
391        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
392        if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) {
393          LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
394        } else {
395          LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
396          final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory);
397          NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(),
398              completedTaskBridge, JobDriver.this.interopLogger);
399        }
400      }
401    }
402  }
403
404  /**
405   * Receive notification that the entire Evaluator had failed.
406   */
407  public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
408    @Override
409    public void onNext(final FailedEvaluator eval) {
410      JobDriver.this.handleFailedEvaluator(eval, false);
411      allocatedEvaluatorBridges.remove(eval.getId());
412    }
413  }
414
415  /**
416   * Receive notification that the entire Evaluator had failed on Driver Restart.
417   */
418  public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
419    @Override
420    public void onNext(final FailedEvaluator eval) {
421      JobDriver.this.handleFailedEvaluator(eval, true);
422    }
423  }
424
425  final class HttpServerBridgeEventHandler implements HttpHandler {
426    private String uriSpecification;
427
428    /**
429     * returns URI specification for the handler.
430     */
431    @Override
432    public String getUriSpecification() {
433      return uriSpecification;
434    }
435
436    public void setUriSpecification(final String s) {
437      uriSpecification = s;
438    }
439
440    /**
441     * process http request.
442     */
443    @Override
444    public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response)
445        throws IOException, ServletException {
446      LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri());
447      try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) {
448        final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
449        final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
450
451        final String requestString = httpSerializer.toString(avroHttpRequest);
452        final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET));
453
454        try {
455          final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
456          NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
457              httpServerEventBridge, JobDriver.this.interopLogger);
458          final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
459          response.getWriter().println(responseBody);
460          LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
461        } catch (final Exception ex) {
462          LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
463          throw new RuntimeException(ex);
464        }
465      }
466    }
467  }
468
469  /**
470   * Handle failed task.
471   */
472  public final class FailedTaskHandler implements EventHandler<FailedTask> {
473    @Override
474    public void onNext(final FailedTask task) throws RuntimeException {
475      LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set.");
476      if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) {
477        LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real.");
478        throw new RuntimeException("Failed Task Handler not initialized by CLR.");
479      }
480      try {
481        final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory);
482        NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(),
483            failedTaskBridge, JobDriver.this.interopLogger);
484      } catch (final Exception ex) {
485        LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
486        throw new RuntimeException(ex);
487      }
488    }
489  }
490
491  /**
492   * Receive notification that the Task is running.
493   */
494  public final class RunningTaskHandler implements EventHandler<RunningTask> {
495    @Override
496    public void onNext(final RunningTask task) {
497      try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
498        if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) {
499          LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
500        } else {
501          LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
502          try {
503            final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory);
504            NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(),
505                runningTaskBridge, JobDriver.this.interopLogger);
506          } catch (final Exception ex) {
507            LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
508            throw new RuntimeException(ex);
509          }
510        }
511      }
512    }
513  }
514
515  /**
516   * Receive notification that the Task is running when driver restarted.
517   */
518  public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> {
519    @Override
520    public void onNext(final RunningTask task) {
521      try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) {
522        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
523          @Override
524          public void onNext(final Alarm time) {
525            if (JobDriver.this.handlerManager != null) {
526              if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) {
527                LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
528                NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext(
529                    JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(),
530                    new RunningTaskBridge(task, activeContextBridgeFactory));
531              } else {
532                LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " +
533                    "done with DriverRestartRunningTaskHandler.");
534              }
535            } else {
536              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
537                  "before checking out CLR driver restart RunningTaskHandler...");
538              clock.scheduleAlarm(2000, this);
539            }
540          }
541        });
542      }
543    }
544  }
545
546  /**
547   * Receive notification that an context is active on Evaluator when the driver restarted.
548   */
549  public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> {
550    @Override
551    public void onNext(final ActiveContext context) {
552      try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) {
553        JobDriver.this.contexts.put(context.getId(), context);
554        LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId());
555        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
556          @Override
557          public void onNext(final Alarm time) {
558            if (JobDriver.this.handlerManager != null) {
559              if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) {
560                LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
561                NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext(
562                    JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(),
563                    activeContextBridgeFactory.getActiveContextBridge(context));
564              } else {
565                LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " +
566                    "done with DriverRestartActiveContextHandler.");
567              }
568            } else {
569              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
570                  "before checking out CLR driver restart DriverRestartActiveContextHandler...");
571              clock.scheduleAlarm(2000, this);
572            }
573          }
574        });
575      }
576    }
577  }
578
579  /**
580   * Job Driver is ready and the clock is set up: request the evaluators.
581   */
582  public final class StartHandler implements EventHandler<StartTime> {
583    @Override
584    public void onNext(final StartTime startTime) {
585      try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
586        // CLR bridge setup must be done before other event handlers try to access the CLR bridge
587        // thus we grab a lock on this instance
588        synchronized (JobDriver.this) {
589          setupBridge();
590          LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", startTime);
591        }
592
593        NativeInterop.callClrSystemOnStartHandler();
594        LOG.log(Level.INFO, "Driver Started");
595      }
596    }
597  }
598
599
600  /**
601   * Job driver is restarted after previous crash.
602   */
603  public final class RestartHandler implements EventHandler<DriverRestarted> {
604    @Override
605    public void onNext(final DriverRestarted driverRestarted) {
606      try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) {
607        // CLR bridge setup must be done before other event handlers try to access the CLR bridge
608        // thus we lock on this instance
609        synchronized (JobDriver.this) {
610          JobDriver.this.isRestarted = true;
611          setupBridge();
612          LOG.log(Level.INFO, "Finished CLR bridge setup for {0}", driverRestarted);
613        }
614
615        NativeInterop.callClrSystemOnRestartHandler(new DriverRestartedBridge(driverRestarted));
616        LOG.log(Level.INFO, "Driver Restarted");
617      }
618    }
619  }
620
621  /**
622   * Receive notification that driver restart has completed.
623   */
624  public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
625    @Override
626    public void onNext(final DriverRestartCompleted driverRestartCompleted) {
627      LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ",
628          driverRestartCompleted.getCompletedTime());
629      try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
630          driverRestartCompleted.getCompletedTime().getTimestamp())) {
631        if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) {
632          LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
633
634          NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext(
635              JobDriver.this.handlerManager.getDriverRestartCompletedHandler(),
636              new DriverRestartCompletedBridge(driverRestartCompleted));
637        } else {
638          LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
639        }
640      }
641    }
642  }
643
644  /**
645   * Shutting down the job driver: close the evaluators.
646   */
647  final class StopHandler implements EventHandler<StopTime> {
648    @Override
649    public void onNext(final StopTime time) {
650      LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
651      try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimestamp())) {
652        for (final ActiveContext context : contexts.values()) {
653          context.close();
654        }
655      }
656    }
657  }
658
659  /**
660   * Handler for message received from the Task.
661   */
662  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
663    @Override
664    public void onNext(final TaskMessage taskMessage) {
665      final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8);
666      LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
667      //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
668      if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) {
669        final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
670        // if CLR implements the task message handler, handle the bytes in CLR handler
671        NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(),
672            taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger);
673      }
674      //}
675    }
676  }
677
678  /**
679   * Receive notification that the Task has been suspended.
680   */
681  public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
682    @Override
683    public void onNext(final SuspendedTask task) {
684      final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
685      LOG.log(Level.INFO, message);
686      try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
687        if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) {
688          final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory);
689          // if CLR implements the suspended task handler, handle it in CLR
690          LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
691          NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(),
692              suspendedTaskBridge);
693        }
694        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
695      }
696    }
697  }
698
699  /**
700   * Receive notification that the Evaluator has been shut down.
701   */
702  public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
703    @Override
704    public void onNext(final CompletedEvaluator evaluator) {
705      LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
706      try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
707        if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) {
708          final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
709          // if CLR implements the completed evaluator handler, handle it in CLR
710          LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
711          NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext(
712              JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge);
713          allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId());
714        }
715      }
716    }
717  }
718
719
720  /**
721   * Receive notification that the Context had completed.
722   * Remove context from the list of active context.
723   */
724  public final class ClosedContextHandler implements EventHandler<ClosedContext> {
725    @Override
726    public void onNext(final ClosedContext context) {
727      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
728      try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
729        if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) {
730          final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory);
731          // if CLR implements the closed context handler, handle it in CLR
732          LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
733          NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(),
734              closedContextBridge);
735        }
736        synchronized (JobDriver.this) {
737          JobDriver.this.contexts.remove(context.getId());
738        }
739      }
740    }
741  }
742
743
744  /**
745   * Receive notification that the Context had failed.
746   * Remove context from the list of active context and notify the client.
747   */
748  public final class FailedContextHandler implements EventHandler<FailedContext> {
749    @Override
750    public void onNext(final FailedContext context) {
751      LOG.log(Level.SEVERE, "FailedContext", context);
752      try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
753        if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) {
754          final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory);
755          // if CLR implements the failed context handler, handle it in CLR
756          LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
757          NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(),
758              failedContextBridge);
759        }
760        synchronized (JobDriver.this) {
761          JobDriver.this.contexts.remove(context.getId());
762        }
763        final Optional<byte[]> err = context.getData();
764        if (err.isPresent()) {
765          JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
766        }
767      }
768    }
769  }
770
771  /**
772   * Receive notification that a ContextMessage has been received.
773   */
774  public final class ContextMessageHandler implements EventHandler<ContextMessage> {
775    @Override
776    public void onNext(final ContextMessage message) {
777      LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
778      try (final LoggingScope ls =
779               loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) {
780        if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) {
781          final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
782          // if CLR implements the context message handler, handle it in CLR
783          LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
784          NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(),
785              contextMessageBridge);
786        }
787      }
788    }
789  }
790
791  /**
792   * Gets the progress of the application from .NET side.
793   */
794  public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider {
795    @Override
796    public float getProgress() {
797      if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) {
798        return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider());
799      }
800
801      return 0f;
802    }
803  }
804}