This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.javabridge.generic;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ClosedContext;
024import org.apache.reef.driver.context.ContextMessage;
025import org.apache.reef.driver.context.FailedContext;
026import org.apache.reef.driver.evaluator.*;
027import org.apache.reef.driver.restart.DriverRestarted;
028import org.apache.reef.driver.task.*;
029import org.apache.reef.io.network.naming.NameServer;
030import org.apache.reef.javabridge.*;
031import org.apache.reef.driver.restart.DriverRestartCompleted;
032import org.apache.reef.runtime.common.driver.DriverStatusManager;
033import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
034import org.apache.reef.runtime.common.files.REEFFileNames;
035import org.apache.reef.tang.annotations.Parameter;
036import org.apache.reef.tang.annotations.Unit;
037import org.apache.reef.util.Optional;
038import org.apache.reef.util.logging.CLRBufferedLogHandler;
039import org.apache.reef.util.logging.LoggingScope;
040import org.apache.reef.util.logging.LoggingScopeFactory;
041import org.apache.reef.wake.EventHandler;
042import org.apache.reef.wake.remote.address.LocalAddressProvider;
043import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
044import org.apache.reef.wake.time.Clock;
045import org.apache.reef.wake.time.event.Alarm;
046import org.apache.reef.wake.time.event.StartTime;
047import org.apache.reef.wake.time.event.StopTime;
048import org.apache.reef.webserver.*;
049
050import javax.inject.Inject;
051import javax.servlet.ServletException;
052import javax.servlet.http.HttpServletResponse;
053import java.io.*;
054import java.nio.charset.Charset;
055import java.nio.charset.StandardCharsets;
056import java.util.*;
057import java.util.logging.Handler;
058import java.util.logging.Level;
059import java.util.logging.Logger;
060
061/**
062 * Generic job driver for CLRBridge.
063 */
064@Unit
065public final class JobDriver {
066
067  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
068  /**
069   * String codec is used to encode the results
070   * before passing them back to the client.
071   */
072  private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
073  private final InteropLogger interopLogger = new InteropLogger();
074  private final NameServer nameServer;
075  private final String nameServerInfo;
076  private final HttpServer httpServer;
077  private final ActiveContextBridgeFactory activeContextBridgeFactory;
078  private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory;
079
080  /**
081   * Wake clock is used to schedule periodical job check-ups.
082   */
083  private final Clock clock;
084  /**
085   * Job observer on the client.
086   * We use it to send results from the driver back to the client.
087   */
088  private final JobMessageObserver jobMessageObserver;
089  /**
090   * Job driver uses EvaluatorRequestor
091   * to request Evaluators that will run the Tasks.
092   */
093  private final EvaluatorRequestor evaluatorRequestor;
094
095  /**
096   * Driver status manager to monitor driver status.
097   */
098  private final DriverStatusManager driverStatusManager;
099
100  /**
101   * Factory to setup new CLR process configurations.
102   */
103  private final CLRProcessFactory clrProcessFactory;
104
105  /**
106   * Shell execution results from each Evaluator.
107   */
108  private final List<String> results = new ArrayList<>();
109  /**
110   * Map from context ID to running evaluator context.
111   */
112  private final Map<String, ActiveContext> contexts = new HashMap<>();
113
114  private final REEFFileNames reefFileNames;
115  private final LocalAddressProvider localAddressProvider;
116  /**
117   * Logging scope factory that provides LoggingScope.
118   */
119  private final LoggingScopeFactory loggingScopeFactory;
120  private final Set<String> definedRuntimes;
121
122  private BridgeHandlerManager handlerManager = null;
123  private boolean isRestarted = false;
124  // We are holding on to following on bridge side.
125  // Need to add references here so that GC does not collect them.
126  private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges =
127      new HashMap<>();
128  private EvaluatorRequestorBridge evaluatorRequestorBridge;
129
130
131  /**
132   * Job driver constructor.
133   * All parameters are injected from TANG automatically.
134   *
135   * @param clock                      Wake clock to schedule and check up running jobs.
136   * @param jobMessageObserver         is used to send messages back to the client.
137   * @param evaluatorRequestor         is used to request Evaluators.
138   * @param activeContextBridgeFactory
139   */
140  @Inject
141  JobDriver(final Clock clock,
142            final HttpServer httpServer,
143            final NameServer nameServer,
144            final JobMessageObserver jobMessageObserver,
145            final EvaluatorRequestor evaluatorRequestor,
146            final DriverStatusManager driverStatusManager,
147            final LoggingScopeFactory loggingScopeFactory,
148            final LocalAddressProvider localAddressProvider,
149            final ActiveContextBridgeFactory activeContextBridgeFactory,
150            final REEFFileNames reefFileNames,
151            final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory,
152            final CLRProcessFactory clrProcessFactory,
153            @Parameter(DefinedRuntimes.class) final Set<String> definedRuntimes) {
154    this.clock = clock;
155    this.httpServer = httpServer;
156    this.jobMessageObserver = jobMessageObserver;
157    this.evaluatorRequestor = evaluatorRequestor;
158    this.nameServer = nameServer;
159    this.driverStatusManager = driverStatusManager;
160    this.activeContextBridgeFactory = activeContextBridgeFactory;
161    this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory;
162    this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort();
163    this.loggingScopeFactory = loggingScopeFactory;
164    this.reefFileNames = reefFileNames;
165    this.localAddressProvider = localAddressProvider;
166    this.clrProcessFactory = clrProcessFactory;
167    this.definedRuntimes = definedRuntimes;
168  }
169
170  private void setupBridge(final ClrHandlersInitializer initializer) {
171    // Signal to the clr buffered log handler that the driver has started and that
172    // we can begin logging
173    LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
174    try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) {
175      final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
176      if (handler == null) {
177        LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
178      } else {
179        handler.setDriverInitialized();
180        LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
181      }
182
183      final String portNumber = httpServer == null ? null : Integer.toString(httpServer.getPort());
184      if (portNumber != null) {
185        try {
186          final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint());
187          BufferedWriter out = new BufferedWriter(
188              new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8));
189          out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n");
190          out.close();
191        } catch (IOException ex) {
192          throw new RuntimeException(ex);
193        }
194      }
195
196      this.evaluatorRequestorBridge =
197          new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory,
198                  JobDriver.this.definedRuntimes);
199      JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge);
200
201      try (final LoggingScope lp =
202               this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) {
203        final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
204        NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
205            httpServerEventBridge, this.interopLogger);
206        final String specList = httpServerEventBridge.getUriSpecification();
207        LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
208        if (specList != null) {
209          final String[] specs = specList.split(":");
210          for (final String s : specs) {
211            final HttpHandler h = new HttpServerBridgeEventHandler();
212            h.setUriSpecification(s);
213            this.httpServer.addHttpHandler(h);
214          }
215        }
216      }
217    }
218    LOG.log(Level.INFO, "CLR Bridge setup.");
219  }
220
221  private CLRBufferedLogHandler getCLRBufferedLogHandler() {
222    for (final Handler handler : Logger.getLogger("").getHandlers()) {
223      if (handler instanceof CLRBufferedLogHandler) {
224        return (CLRBufferedLogHandler) handler;
225      }
226    }
227    return null;
228  }
229
230  private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) {
231    synchronized (JobDriver.this) {
232      eval.setProcess(process);
233      LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
234          new Object[]{eval.getId(), JobDriver.this.contexts.size()});
235      if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) {
236        throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
237      }
238      final AllocatedEvaluatorBridge allocatedEvaluatorBridge =
239          this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo);
240      allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge);
241      NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(
242          JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger);
243    }
244  }
245
246  private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) {
247    try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
248      synchronized (JobDriver.this) {
249        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
250        for (final FailedContext failedContext : eval.getFailedContextList()) {
251          final String failedContextId = failedContext.getId();
252          LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
253          JobDriver.this.contexts.remove(failedContextId);
254        }
255        String message = "Evaluator " + eval.getId() + " failed with message: "
256            + eval.getEvaluatorException().getMessage();
257        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
258
259        if (isRestartFailed) {
260          evaluatorFailedHandlerWaitForCLRBridgeSetup(
261              JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed);
262        } else {
263          evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
264              eval, isRestartFailed);
265        }
266      }
267    }
268  }
269
270  private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle,
271                                                           final FailedEvaluator eval,
272                                                           final boolean isRestartFailed) {
273    if (handle == 0) {
274      if (JobDriver.this.handlerManager != null) {
275        final String message = "No CLR FailedEvaluator handler was set, exiting now";
276        LOG.log(Level.WARNING, message);
277        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
278      } else {
279        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
280          @Override
281          public void onNext(final Alarm time) {
282            if (JobDriver.this.handlerManager != null) {
283              handleFailedEvaluatorInCLR(eval, isRestartFailed);
284            } else {
285              LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
286              clock.scheduleAlarm(5000, this);
287            }
288          }
289        });
290      }
291    } else{
292      handleFailedEvaluatorInCLR(eval, isRestartFailed);
293    }
294  }
295
296  private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) {
297    final String message = "CLR FailedEvaluator handler set, handling things with CLR handler.";
298    LOG.log(Level.INFO, message);
299    final FailedEvaluatorBridge failedEvaluatorBridge =
300        new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
301        JobDriver.this.isRestarted, loggingScopeFactory, activeContextBridgeFactory, JobDriver.this.definedRuntimes);
302    if (isRestartFailed) {
303      NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
304          JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(),
305          failedEvaluatorBridge, JobDriver.this.interopLogger);
306    } else {
307      NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(
308          JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
309          failedEvaluatorBridge,
310          JobDriver.this.interopLogger);
311    }
312
313    final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
314    if (additionalRequestedEvaluatorNumber > 0) {
315      LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " +
316          additionalRequestedEvaluatorNumber);
317    }
318
319    JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
320  }
321
322  /**
323   * Submit a Task to a single Evaluator.
324   */
325  private void submit(final ActiveContext context) {
326    try {
327      LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context});
328      if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) {
329        throw new RuntimeException("Active Context Handler not initialized by CLR.");
330      }
331      final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context);
332      NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(),
333          activeContextBridge, JobDriver.this.interopLogger);
334    } catch (final Exception ex) {
335      LOG.log(Level.SEVERE, "Fail to submit task to active context");
336      context.close();
337      throw new RuntimeException(ex);
338    }
339  }
340
341  /**
342   * Handles AllocatedEvaluator: Submit an empty context.
343   */
344  public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
345    @Override
346    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
347      try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
348        synchronized (JobDriver.this) {
349          LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
350          JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess());
351        }
352      }
353    }
354  }
355
356  /**
357   * Receive notification that a new Context is available.
358   */
359  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
360    @Override
361    public void onNext(final ActiveContext context) {
362      try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
363        synchronized (JobDriver.this) {
364          LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
365              new Object[]{context.getId()});
366          JobDriver.this.contexts.put(context.getId(), context);
367          JobDriver.this.submit(context);
368        }
369      }
370    }
371  }
372
373  /**
374   * Receive notification that the Task has completed successfully.
375   */
376  public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
377    @Override
378    public void onNext(final CompletedTask task) {
379      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
380      try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) {
381        // Take the message returned by the task and add it to the running result.
382        String result = "default result";
383        try {
384          result = new String(task.get(), StandardCharsets.UTF_8);
385        } catch (final Exception e) {
386          LOG.log(Level.WARNING, "failed to decode task outcome");
387        }
388        LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
389        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
390        if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) {
391          LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
392        } else {
393          LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
394          final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory);
395          NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(),
396              completedTaskBridge, JobDriver.this.interopLogger);
397        }
398      }
399    }
400  }
401
402  /**
403   * Receive notification that the entire Evaluator had failed.
404   */
405  public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
406    @Override
407    public void onNext(final FailedEvaluator eval) {
408      JobDriver.this.handleFailedEvaluator(eval, false);
409      allocatedEvaluatorBridges.remove(eval.getId());
410    }
411  }
412
413  /**
414   * Receive notification that the entire Evaluator had failed on Driver Restart.
415   */
416  public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
417    @Override
418    public void onNext(final FailedEvaluator eval) {
419      JobDriver.this.handleFailedEvaluator(eval, true);
420    }
421  }
422
423  final class HttpServerBridgeEventHandler implements HttpHandler {
424    private String uriSpecification;
425
426    /**
427     * returns URI specification for the handler.
428     */
429    @Override
430    public String getUriSpecification() {
431      return uriSpecification;
432    }
433
434    public void setUriSpecification(final String s) {
435      uriSpecification = s;
436    }
437
438    /**
439     * process http request.
440     */
441    @Override
442    public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response)
443        throws IOException, ServletException {
444      LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri());
445      try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) {
446        final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
447        final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
448
449        final String requestString = httpSerializer.toString(avroHttpRequest);
450        final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET));
451
452        try {
453          final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
454          NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
455              httpServerEventBridge, JobDriver.this.interopLogger);
456          final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
457          response.getWriter().println(responseBody);
458          LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
459        } catch (final Exception ex) {
460          LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
461          throw new RuntimeException(ex);
462        }
463      }
464    }
465  }
466
467  /**
468   * Handle failed task.
469   */
470  public final class FailedTaskHandler implements EventHandler<FailedTask> {
471    @Override
472    public void onNext(final FailedTask task) throws RuntimeException {
473      LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set.");
474      if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) {
475        LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real.");
476        throw new RuntimeException("Failed Task Handler not initialized by CLR.");
477      }
478      try {
479        final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory);
480        NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(),
481            failedTaskBridge, JobDriver.this.interopLogger);
482      } catch (final Exception ex) {
483        LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
484        throw new RuntimeException(ex);
485      }
486    }
487  }
488
489  /**
490   * Receive notification that the Task is running.
491   */
492  public final class RunningTaskHandler implements EventHandler<RunningTask> {
493    @Override
494    public void onNext(final RunningTask task) {
495      try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
496        if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) {
497          LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
498        } else {
499          LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
500          try {
501            final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory);
502            NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(),
503                runningTaskBridge, JobDriver.this.interopLogger);
504          } catch (final Exception ex) {
505            LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
506            throw new RuntimeException(ex);
507          }
508        }
509      }
510    }
511  }
512
513  /**
514   * Receive notification that the Task is running when driver restarted.
515   */
516  public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> {
517    @Override
518    public void onNext(final RunningTask task) {
519      try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) {
520        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
521          @Override
522          public void onNext(final Alarm time) {
523            if (JobDriver.this.handlerManager != null) {
524              if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) {
525                LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
526                NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext(
527                    JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(),
528                    new RunningTaskBridge(task, activeContextBridgeFactory));
529              } else {
530                LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " +
531                    "done with DriverRestartRunningTaskHandler.");
532              }
533            } else {
534              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
535                  "before checking out CLR driver restart RunningTaskHandler...");
536              clock.scheduleAlarm(2000, this);
537            }
538          }
539        });
540      }
541    }
542  }
543
544  /**
545   * Receive notification that an context is active on Evaluator when the driver restarted.
546   */
547  public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> {
548    @Override
549    public void onNext(final ActiveContext context) {
550      try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) {
551        JobDriver.this.contexts.put(context.getId(), context);
552        LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId());
553        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
554          @Override
555          public void onNext(final Alarm time) {
556            if (JobDriver.this.handlerManager != null) {
557              if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) {
558                LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
559                NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext(
560                    JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(),
561                    activeContextBridgeFactory.getActiveContextBridge(context));
562              } else {
563                LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " +
564                    "done with DriverRestartActiveContextHandler.");
565              }
566            } else {
567              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
568                  "before checking out CLR driver restart DriverRestartActiveContextHandler...");
569              clock.scheduleAlarm(2000, this);
570            }
571          }
572        });
573      }
574    }
575  }
576
577  /**
578   * Job Driver is ready and the clock is set up: request the evaluators.
579   */
580  public final class StartHandler implements EventHandler<StartTime> {
581    @Override
582    public void onNext(final StartTime startTime) {
583      try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
584        synchronized (JobDriver.this) {
585
586          setupBridge(new DriverStartClrHandlersInitializer(startTime));
587          LOG.log(Level.INFO, "Driver Started");
588        }
589      }
590    }
591  }
592
593
594  /**
595   * Job driver is restarted after previous crash.
596   */
597  public final class RestartHandler implements EventHandler<DriverRestarted> {
598    @Override
599    public void onNext(final DriverRestarted driverRestarted) {
600      try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) {
601        synchronized (JobDriver.this) {
602
603          JobDriver.this.isRestarted = true;
604          setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted));
605
606          LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
607        }
608      }
609    }
610  }
611
612  /**
613   * Receive notification that driver restart has completed.
614   */
615  public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
616    @Override
617    public void onNext(final DriverRestartCompleted driverRestartCompleted) {
618      LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ",
619          driverRestartCompleted.getCompletedTime());
620      try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
621          driverRestartCompleted.getCompletedTime().getTimeStamp())) {
622        if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) {
623          LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
624
625          NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext(
626              JobDriver.this.handlerManager.getDriverRestartCompletedHandler(),
627              new DriverRestartCompletedBridge(driverRestartCompleted));
628        } else {
629          LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
630        }
631      }
632    }
633  }
634
635  /**
636   * Shutting down the job driver: close the evaluators.
637   */
638  final class StopHandler implements EventHandler<StopTime> {
639    @Override
640    public void onNext(final StopTime time) {
641      LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
642      try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
643        for (final ActiveContext context : contexts.values()) {
644          context.close();
645        }
646      }
647    }
648  }
649
650  /**
651   * Handler for message received from the Task.
652   */
653  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
654    @Override
655    public void onNext(final TaskMessage taskMessage) {
656      final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8);
657      LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
658      //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
659      if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) {
660        final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
661        // if CLR implements the task message handler, handle the bytes in CLR handler
662        NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(),
663            taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger);
664      }
665      //}
666    }
667  }
668
669  /**
670   * Receive notification that the Task has been suspended.
671   */
672  public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
673    @Override
674    public void onNext(final SuspendedTask task) {
675      final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
676      LOG.log(Level.INFO, message);
677      try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
678        if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) {
679          final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory);
680          // if CLR implements the suspended task handler, handle it in CLR
681          LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
682          NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(),
683              suspendedTaskBridge);
684        }
685        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
686      }
687    }
688  }
689
690  /**
691   * Receive notification that the Evaluator has been shut down.
692   */
693  public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
694    @Override
695    public void onNext(final CompletedEvaluator evaluator) {
696      LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
697      try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
698        if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) {
699          final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
700          // if CLR implements the completed evaluator handler, handle it in CLR
701          LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
702          NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext(
703              JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge);
704          allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId());
705        }
706      }
707    }
708  }
709
710
711  /**
712   * Receive notification that the Context had completed.
713   * Remove context from the list of active context.
714   */
715  public final class ClosedContextHandler implements EventHandler<ClosedContext> {
716    @Override
717    public void onNext(final ClosedContext context) {
718      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
719      try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
720        if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) {
721          final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory);
722          // if CLR implements the closed context handler, handle it in CLR
723          LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
724          NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(),
725              closedContextBridge);
726        }
727        synchronized (JobDriver.this) {
728          JobDriver.this.contexts.remove(context.getId());
729        }
730      }
731    }
732  }
733
734
735  /**
736   * Receive notification that the Context had failed.
737   * Remove context from the list of active context and notify the client.
738   */
739  public final class FailedContextHandler implements EventHandler<FailedContext> {
740    @Override
741    public void onNext(final FailedContext context) {
742      LOG.log(Level.SEVERE, "FailedContext", context);
743      try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
744        if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) {
745          final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory);
746          // if CLR implements the failed context handler, handle it in CLR
747          LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
748          NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(),
749              failedContextBridge);
750        }
751        synchronized (JobDriver.this) {
752          JobDriver.this.contexts.remove(context.getId());
753        }
754        final Optional<byte[]> err = context.getData();
755        if (err.isPresent()) {
756          JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
757        }
758      }
759    }
760  }
761
762  /**
763   * Receive notification that a ContextMessage has been received.
764   */
765  public final class ContextMessageHandler implements EventHandler<ContextMessage> {
766    @Override
767    public void onNext(final ContextMessage message) {
768      LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
769      try (final LoggingScope ls =
770               loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) {
771        if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) {
772          final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
773          // if CLR implements the context message handler, handle it in CLR
774          LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
775          NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(),
776              contextMessageBridge);
777        }
778      }
779    }
780  }
781
782  /**
783   * Gets the progress of the application from .NET side.
784   */
785  public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider {
786    @Override
787    public float getProgress() {
788      if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) {
789        return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider());
790      }
791
792      return 0f;
793    }
794  }
795}