001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.evaluator.FailedEvaluator;
024import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
025import org.apache.reef.driver.restart.DriverRestartManager;
026import org.apache.reef.driver.restart.EvaluatorRestartState;
027import org.apache.reef.exception.NonSerializableException;
028import org.apache.reef.runtime.common.driver.api.*;
029import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
030import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO;
031import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
032import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO;
033import org.apache.reef.tang.ConfigurationProvider;
034import org.apache.reef.driver.context.ActiveContext;
035import org.apache.reef.driver.context.FailedContext;
036import org.apache.reef.driver.evaluator.AllocatedEvaluator;
037import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
038import org.apache.reef.driver.task.FailedTask;
039import org.apache.reef.exception.EvaluatorException;
040import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
041import org.apache.reef.io.naming.Identifiable;
042import org.apache.reef.proto.EvaluatorRuntimeProtocol;
043import org.apache.reef.proto.ReefServiceProtos;
044import org.apache.reef.driver.evaluator.EvaluatorProcess;
045import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
046import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
047import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
048import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
049import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
050import org.apache.reef.runtime.common.utils.ExceptionCodec;
051import org.apache.reef.runtime.common.utils.RemoteManager;
052import org.apache.reef.tang.annotations.Name;
053import org.apache.reef.tang.annotations.NamedParameter;
054import org.apache.reef.tang.annotations.Parameter;
055import org.apache.reef.tang.formats.ConfigurationSerializer;
056import org.apache.reef.util.Optional;
057import org.apache.reef.util.logging.LoggingScopeFactory;
058import org.apache.reef.wake.EventHandler;
059import org.apache.reef.wake.remote.RemoteMessage;
060import org.apache.reef.wake.time.Clock;
061import org.apache.reef.wake.time.event.Alarm;
062
063import javax.inject.Inject;
064import java.io.File;
065import java.util.ArrayList;
066import java.util.List;
067import java.util.Set;
068import java.util.logging.Level;
069import java.util.logging.Logger;
070
071/**
072 * Manages a single Evaluator instance including all lifecycle instances:
073 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
074 * <p>
075 * A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager.
076 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this
077 * heartbeat channel.
078 * <p>
079 * A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime.
080 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
081 * control information (e.g., shutdown, suspend).
082 */
083@Private
084@DriverSide
085public final class EvaluatorManager implements Identifiable, AutoCloseable {
086
087  private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
088
089  private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
090  private final Clock clock;
091  private final ResourceReleaseHandler resourceReleaseHandler;
092  private final ResourceLaunchHandler resourceLaunchHandler;
093  private final String evaluatorId;
094  private final EvaluatorDescriptorImpl evaluatorDescriptor;
095  private final ContextRepresenters contextRepresenters;
096  private final EvaluatorMessageDispatcher messageDispatcher;
097  private final EvaluatorControlHandler evaluatorControlHandler;
098  private final ContextControlHandler contextControlHandler;
099  private final EvaluatorStatusManager stateManager;
100  private final ExceptionCodec exceptionCodec;
101  private final EventHandlerIdlenessSource idlenessSource;
102  private final RemoteManager remoteManager;
103  private final ConfigurationSerializer configurationSerializer;
104  private final LoggingScopeFactory loggingScopeFactory;
105  private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
106  private final DriverRestartManager driverRestartManager;
107  private final EvaluatorIdlenessThreadPool idlenessThreadPool;
108
109  // Mutable fields
110  private Optional<TaskRepresenter> task = Optional.empty();
111  private boolean resourceNotReleased = true;
112  private boolean allocationNotFired = true;
113
114  @Inject
115  private EvaluatorManager(
116      @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
117      @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor,
118      @Parameter(EvaluatorConfigurationProviders.class)
119        final Set<ConfigurationProvider> evaluatorConfigurationProviders,
120      final Clock clock,
121      final RemoteManager remoteManager,
122      final ResourceReleaseHandler resourceReleaseHandler,
123      final ResourceLaunchHandler resourceLaunchHandler,
124      final ContextRepresenters contextRepresenters,
125      final ConfigurationSerializer configurationSerializer,
126      final EvaluatorMessageDispatcher messageDispatcher,
127      final EvaluatorControlHandler evaluatorControlHandler,
128      final ContextControlHandler contextControlHandler,
129      final EvaluatorStatusManager stateManager,
130      final ExceptionCodec exceptionCodec,
131      final EventHandlerIdlenessSource idlenessSource,
132      final LoggingScopeFactory loggingScopeFactory,
133      final DriverRestartManager driverRestartManager,
134      final EvaluatorIdlenessThreadPool idlenessThreadPool) {
135
136    LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
137
138    this.evaluatorId = evaluatorId;
139    this.evaluatorDescriptor = evaluatorDescriptor;
140    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
141
142    this.clock = clock;
143    this.contextRepresenters = contextRepresenters;
144    this.idlenessSource = idlenessSource;
145    this.resourceReleaseHandler = resourceReleaseHandler;
146    this.resourceLaunchHandler = resourceLaunchHandler;
147
148    this.messageDispatcher = messageDispatcher;
149    this.evaluatorControlHandler = evaluatorControlHandler;
150    this.contextControlHandler = contextControlHandler;
151    this.stateManager = stateManager;
152    this.exceptionCodec = exceptionCodec;
153
154    this.remoteManager = remoteManager;
155    this.configurationSerializer = configurationSerializer;
156    this.loggingScopeFactory = loggingScopeFactory;
157    this.driverRestartManager = driverRestartManager;
158    this.idlenessThreadPool = idlenessThreadPool;
159
160    LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
161  }
162
163  /**
164   * Get the id of current job/application.
165   */
166  public static String getJobIdentifier() {
167    // TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path
168    // #845 is open to get the id from RM properly
169    for (File directory = new File(System.getProperty("user.dir"));
170         directory != null; directory = directory.getParentFile()) {
171      final String currentDirectoryName = directory.getName();
172      if (currentDirectoryName.toLowerCase().contains("application_")) {
173        return currentDirectoryName;
174      }
175    }
176    // cannot find a directory that contains application_, presumably we are on local runtime
177    // again, this is a hack for now, we need #845 as a proper solution
178    return "REEF_LOCAL_RUNTIME";
179  }
180
181  /**
182   * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once.
183   */
184  public synchronized void fireEvaluatorAllocatedEvent() {
185
186    if (this.stateManager.isAllocated() && this.allocationNotFired) {
187
188      final AllocatedEvaluator allocatedEvaluator =
189          new AllocatedEvaluatorImpl(this,
190              this.remoteManager.getMyIdentifier(),
191              this.configurationSerializer,
192              getJobIdentifier(),
193              this.loggingScopeFactory,
194              this.evaluatorConfigurationProviders);
195
196      LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", this.evaluatorId);
197
198      this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
199      this.allocationNotFired = false;
200
201    } else {
202      LOG.log(Level.WARNING, "AllocatedEvaluator event fired more than once.");
203    }
204  }
205
206  @Override
207  public String getId() {
208    return this.evaluatorId;
209  }
210
211  public void setProcess(final EvaluatorProcess process) {
212    this.evaluatorDescriptor.setProcess(process);
213  }
214
215  public EvaluatorDescriptor getEvaluatorDescriptor() {
216    return this.evaluatorDescriptor;
217  }
218
219  @Override
220  public void close() {
221
222    LOG.log(Level.FINER, "Close EvaluatorManager {0} - begin", this.evaluatorId);
223
224    synchronized (this.evaluatorDescriptor) {
225
226      if (this.stateManager.isAvailable()) {
227
228        LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
229
230        try {
231
232          if (this.stateManager.isRunning()) {
233
234            // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
235            this.sendEvaluatorControlMessage(
236                EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
237                    .setTimestamp(System.currentTimeMillis())
238                    .setIdentifier(getId())
239                    .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
240                    .build());
241
242            this.stateManager.setClosing();
243
244          } else {
245            this.stateManager.setKilled();
246          }
247
248        } catch (Exception e) {
249          LOG.log(Level.WARNING, "Exception occurred when manager sends killing message to task.", e);
250          this.stateManager.setKilled();
251        }
252      }
253
254      if (this.resourceNotReleased) {
255
256        this.resourceNotReleased = false;
257
258        final ResourceReleaseEvent releaseEvent = ResourceReleaseEventImpl.newBuilder()
259            .setIdentifier(this.evaluatorId)
260            .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName())
261            .build();
262
263        try {
264          // We need to wait awhile before returning the container to the RM
265          // in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit.
266          this.clock.scheduleAlarm(200, new EventHandler<Alarm>() {
267            @Override
268            public void onNext(final Alarm alarm) {
269              LOG.log(Level.FINER, "Close EvaluatorManager {0} - release to RM", evaluatorId);
270              resourceReleaseHandler.onNext(releaseEvent);
271              shutdown();
272            }
273          });
274        } catch (final IllegalStateException e) {
275          LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
276          this.resourceReleaseHandler.onNext(releaseEvent);
277          this.shutdown();
278        }
279      }
280    }
281
282    this.idlenessThreadPool.runCheckAsync(this);
283
284    LOG.log(Level.FINER, "Close EvaluatorManager {0} - end", this.evaluatorId);
285  }
286
287  /**
288   * Close message dispatcher for the evaluator.
289   */
290  public void shutdown() {
291    LOG.log(Level.FINEST, "Shutdown EvaluatorManager: {0}", this.evaluatorId);
292    this.messageDispatcher.close();
293  }
294
295  /**
296   * Return true if the state is DONE, FAILED, or KILLED,
297   * <em>and</em> there are no messages queued or in processing.
298   */
299  public boolean isClosed() {
300    return this.messageDispatcher.isEmpty() && this.stateManager.isCompleted();
301  }
302
303  /**
304   * Return true if the state is CLOSING.
305   */
306  public boolean isClosing() {
307    return this.stateManager.isClosing();
308  }
309
310  /**
311   * Return true if the state is DONE, FAILED, KILLED, or CLOSING.
312   */
313  public boolean isClosedOrClosing() {
314    return isClosed() || isClosing();
315  }
316
317  /**
318   * Triggers a call to check the idleness of the Evaluator.
319   */
320  void checkIdlenessSource() {
321    this.idlenessSource.check();
322  }
323
324  /**
325   * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED.
326   *
327   * @param exception on the EvaluatorRuntime
328   */
329  public void onEvaluatorException(final EvaluatorException exception) {
330    synchronized (this.evaluatorDescriptor) {
331
332      if (this.stateManager.isCompleted()) {
333        LOG.log(Level.FINE,
334            "Ignoring an exception received for Evaluator {0} which is already in state {1}.",
335            new Object[] {this.getId(), this.stateManager});
336        return;
337      }
338
339      LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception);
340
341      try {
342
343        final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
344
345        final Optional<FailedTask> failedTaskOptional;
346        if (this.task.isPresent()) {
347
348          final String taskId = this.task.get().getId();
349          final Optional<ActiveContext> evaluatorContext = Optional.empty();
350          final Optional<byte[]> bytes = Optional.empty();
351          final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash"));
352          final String message = "Evaluator crash";
353          final Optional<String> description = Optional.empty();
354          final FailedTask failedTask =
355              new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
356
357          failedTaskOptional = Optional.of(failedTask);
358
359        } else {
360          failedTaskOptional = Optional.empty();
361        }
362
363        final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(
364            exception, failedContextList, failedTaskOptional, this.evaluatorId);
365
366        if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) {
367          this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
368        } else {
369          this.messageDispatcher.onEvaluatorFailed(failedEvaluator);
370        }
371      } catch (final Exception e) {
372        LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
373      } finally {
374        this.stateManager.setFailed();
375        this.close();
376      }
377    }
378  }
379
380  /**
381   * Process an evaluator heartbeat message.
382   */
383  public void onEvaluatorHeartbeatMessage(
384      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
385
386    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
387        evaluatorHeartbeatProtoRemoteMessage.getMessage();
388
389    LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
390
391    synchronized (this.evaluatorDescriptor) {
392
393      if (this.stateManager.isCompleted()) {
394
395        LOG.log(Level.FINE,
396            "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.",
397            new Object[] {this.getId(), this.stateManager});
398
399        return;
400
401      } else if (this.stateManager.isAvailable()) {
402
403        this.sanityChecker.check(this.evaluatorId, evaluatorHeartbeatProto.getTimestamp());
404        final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
405
406        final EvaluatorRestartState evaluatorRestartState =
407            this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId);
408
409        /*
410         * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator
411         * from a separate application attempt. In the case of a previous evaluator, if the restart period has not
412         * yet expired, we should register it and trigger context active and task events. If the restart period has
413         * expired, we should return immediately after setting its remote ID in order to close it.
414         */
415        if (this.stateManager.isSubmitted() ||
416            evaluatorRestartState == EvaluatorRestartState.REPORTED ||
417            evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
418
419          this.evaluatorControlHandler.setRemoteID(evaluatorRID);
420
421          if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
422            // Don't do anything if evaluator has expired. Close it immediately upon exit of this method.
423            return;
424          }
425
426          this.stateManager.setRunning();
427          LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
428
429          if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
430            this.driverRestartManager.setEvaluatorReregistered(this.evaluatorId);
431          }
432        }
433      }
434
435      // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806.
436      final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp();
437
438      // Process the Evaluator status message
439      if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
440        this.onEvaluatorStatusMessage(new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()));
441      }
442
443      // Process the Context status message(s)
444      final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
445      final List<ContextStatusPOJO> contextStatusList = new ArrayList<>();
446      for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) {
447        contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber));
448      }
449
450      this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts);
451
452      // Process the Task status message
453      if (evaluatorHeartbeatProto.hasTaskStatus()) {
454        this.onTaskStatusMessage(new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber));
455      }
456
457      LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
458    }
459  }
460
461  /**
462   * Process a evaluator status message.
463   *
464   * @param message
465   */
466  private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) {
467
468    switch (message.getState()) {
469    case DONE:
470      this.onEvaluatorDone(message);
471      break;
472    case FAILED:
473      this.onEvaluatorFailed(message);
474      break;
475    case KILLED:
476      this.onEvaluatorKilled(message);
477      break;
478    case INIT:
479    case RUNNING:
480    case SUSPEND:
481      break;
482    default:
483      throw new RuntimeException("Unknown state: " + message.getState());
484    }
485  }
486
487  /**
488   * Process an evaluator message that indicates that the evaluator shut down cleanly.
489   *
490   * @param message
491   */
492  private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) {
493
494    assert message.getState() == State.DONE;
495
496    LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
497
498    // Send an ACK to the Evaluator.
499    sendEvaluatorControlMessage(
500        EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
501            .setTimestamp(System.currentTimeMillis())
502            .setIdentifier(getId())
503            .setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build())
504            .build());
505
506    this.stateManager.setDone();
507    this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
508
509    this.close();
510  }
511
512  /**
513   * Process an evaluator message that indicates a crash.
514   *
515   * @param evaluatorStatus
516   */
517  private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) {
518
519    assert evaluatorStatus.getState() == State.FAILED;
520
521    final EvaluatorException evaluatorException;
522
523    if (evaluatorStatus.hasError()) {
524
525      final Optional<Throwable> exception =
526          this.exceptionCodec.fromBytes(evaluatorStatus.getError());
527
528      evaluatorException = new EvaluatorException(getId(), exception.isPresent() ? exception.get() :
529          new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError()));
530
531    } else {
532      evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
533    }
534
535    this.onEvaluatorException(evaluatorException);
536  }
537
538  /**
539   * Process an evaluator message that indicates that the evaluator completed the unclean shut down request.
540   *
541   * @param message
542   */
543  private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO message) {
544
545    assert message.getState() == State.KILLED;
546    assert this.stateManager.isClosing();
547
548    LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId());
549
550    this.stateManager.setKilled();
551  }
552
553  public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) {
554    synchronized (this.evaluatorDescriptor) {
555      if (this.stateManager.isAllocated()) {
556        this.stateManager.setSubmitted();
557        this.resourceLaunchHandler.onNext(resourceLaunchEvent);
558      } else if (this.stateManager.isCompletedAbnormally()) {
559        LOG.log(Level.WARNING, "Evaluator manager expected {0} state but instead is in state {1}",
560            new Object[] {EvaluatorState.ALLOCATED, this.stateManager});
561      } else {
562        throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
563            " state but instead is in state " + this.stateManager);
564      }
565    }
566  }
567
568  /**
569   * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime.
570   *
571   * @param contextControlProto message contains context control info.
572   */
573  public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
574    synchronized (this.evaluatorDescriptor) {
575      LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
576      this.contextControlHandler.send(contextControlProto);
577    }
578  }
579
580  /**
581   * Forward the EvaluatorControlProto to the EvaluatorRuntime.
582   *
583   * @param evaluatorControlProto message contains evaluator control information.
584   */
585  void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
586    synchronized (this.evaluatorDescriptor) {
587      this.evaluatorControlHandler.send(evaluatorControlProto);
588    }
589  }
590
591  /**
592   * Handle task status messages.
593   *
594   * @param taskStatus message contains the current task status.
595   */
596  private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
597
598    if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) {
599
600      final State state = taskStatus.getState();
601      if (state.isRestartable() ||
602          this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId).isReregistered()) {
603
604        // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order
605        // [REEF-289] is a related item which may fix the issue
606        if (state.isRunning()) {
607          LOG.log(Level.WARNING,
608              "Received a message of state {0} for Task {1} before receiving its {2} state",
609              new Object[] {State.RUNNING, taskStatus.getTaskId(), State.INIT});
610        }
611
612        // FAILED is a legal first state of a Task as it could have failed during construction.
613        this.task = Optional.of(
614            new TaskRepresenter(taskStatus.getTaskId(),
615                this.contextRepresenters.getContext(taskStatus.getContextId()),
616                this.messageDispatcher,
617                this,
618                this.exceptionCodec,
619                this.driverRestartManager));
620      } else {
621        throw new RuntimeException("Received a message of state " + state +
622            ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() +
623            " which we haven't heard from before.");
624      }
625    }
626
627    this.task.get().onTaskStatusMessage(taskStatus);
628
629    if (this.task.get().isNotRunning()) {
630      LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
631      this.task = Optional.empty();
632    }
633  }
634
635  /**
636   * Resource status information from the (actual) resource manager.
637   */
638  public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) {
639
640    synchronized (this.evaluatorDescriptor) {
641
642      final State state = resourceStatusEvent.getState();
643      LOG.log(Level.FINEST, "Resource manager state update: {0}", state);
644
645      if (!this.stateManager.isAvailable()) {
646
647        LOG.log(Level.FINE,
648            "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
649            new Object[] {this.getId(), this.stateManager});
650
651      } else if (state.isCompleted() && this.stateManager.isAvailable()) {
652
653        // Something is wrong. The resource manager reports that the Evaluator is done or failed,
654        // but the Driver assumes it to be alive.
655        final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
656            .append(this.evaluatorId)
657            .append("] is assumed to be in state [")
658            .append(this.stateManager.toString())
659            .append("]. But the resource manager reports it to be in state [")
660            .append(state)
661            .append("].");
662
663        if (this.stateManager.isSubmitted()) {
664          messageBuilder
665              .append(" This most likely means that the Evaluator suffered a failure before establishing " +
666                  "a communications link to the driver.");
667        } else if (this.stateManager.isAllocated()) {
668          messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
669        } else if (this.stateManager.isRunning()) {
670          messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " +
671              "back to the driver.");
672        }
673
674        if (this.task.isPresent()) {
675          messageBuilder.append(" Task [")
676              .append(this.task.get().getId())
677              .append("] was running when the Evaluator crashed.");
678        }
679
680        if (resourceStatusEvent.getState() == State.KILLED) {
681          this.onEvaluatorException(
682              new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString()));
683        } else {
684          this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
685        }
686      }
687    }
688  }
689
690  @Override
691  public String toString() {
692    return "EvaluatorManager:"
693        + " id=" + this.evaluatorId
694        + " state=" + this.stateManager
695        + " task=" + this.task;
696  }
697
698  // Dynamic Parameters
699
700  /**
701   * The Evaluator Identifier.
702   */
703  @NamedParameter(doc = "The Evaluator Identifier.")
704  public static final class EvaluatorIdentifier implements Name<String> {
705  }
706
707  /**
708   * The Evaluator Host.
709   */
710  @NamedParameter(doc = "The Evaluator Host.")
711  public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> {
712  }
713}