This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.evaluator.FailedEvaluator;
024import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
025import org.apache.reef.driver.restart.DriverRestartManager;
026import org.apache.reef.driver.restart.EvaluatorRestartState;
027import org.apache.reef.exception.NonSerializableException;
028import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
029import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO;
030import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
031import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO;
032import org.apache.reef.tang.ConfigurationProvider;
033import org.apache.reef.driver.context.ActiveContext;
034import org.apache.reef.driver.context.FailedContext;
035import org.apache.reef.driver.evaluator.AllocatedEvaluator;
036import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
037import org.apache.reef.driver.task.FailedTask;
038import org.apache.reef.exception.EvaluatorException;
039import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
040import org.apache.reef.io.naming.Identifiable;
041import org.apache.reef.proto.EvaluatorRuntimeProtocol;
042import org.apache.reef.proto.ReefServiceProtos;
043import org.apache.reef.driver.evaluator.EvaluatorProcess;
044import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
045import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
046import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
047import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
048import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
049import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
050import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
051import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
052import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
053import org.apache.reef.runtime.common.utils.ExceptionCodec;
054import org.apache.reef.runtime.common.utils.RemoteManager;
055import org.apache.reef.tang.annotations.Name;
056import org.apache.reef.tang.annotations.NamedParameter;
057import org.apache.reef.tang.annotations.Parameter;
058import org.apache.reef.tang.formats.ConfigurationSerializer;
059import org.apache.reef.util.Optional;
060import org.apache.reef.util.logging.LoggingScopeFactory;
061import org.apache.reef.wake.EventHandler;
062import org.apache.reef.wake.remote.RemoteMessage;
063import org.apache.reef.wake.time.Clock;
064import org.apache.reef.wake.time.event.Alarm;
065
066import javax.inject.Inject;
067import java.io.File;
068import java.util.ArrayList;
069import java.util.List;
070import java.util.Set;
071import java.util.logging.Level;
072import java.util.logging.Logger;
073
074/**
075 * Manages a single Evaluator instance including all lifecycle instances:
076 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
077 * <p>
078 * A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager.
079 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this
080 * heartbeat channel.
081 * <p>
082 * A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime.
083 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
084 * control information (e.g., shutdown, suspend).
085 */
086@Private
087@DriverSide
088public final class EvaluatorManager implements Identifiable, AutoCloseable {
089
090  private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
091
092  private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
093  private final Clock clock;
094  private final ResourceReleaseHandler resourceReleaseHandler;
095  private final ResourceLaunchHandler resourceLaunchHandler;
096  private final String evaluatorId;
097  private final EvaluatorDescriptorImpl evaluatorDescriptor;
098  private final ContextRepresenters contextRepresenters;
099  private final EvaluatorMessageDispatcher messageDispatcher;
100  private final EvaluatorControlHandler evaluatorControlHandler;
101  private final ContextControlHandler contextControlHandler;
102  private final EvaluatorStatusManager stateManager;
103  private final ExceptionCodec exceptionCodec;
104  private final EventHandlerIdlenessSource idlenessSource;
105  private final RemoteManager remoteManager;
106  private final ConfigurationSerializer configurationSerializer;
107  private final LoggingScopeFactory loggingScopeFactory;
108  private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
109  private final DriverRestartManager driverRestartManager;
110  private final EvaluatorIdlenessThreadPool idlenessThreadPool;
111
112  // Mutable fields
113  private Optional<TaskRepresenter> task = Optional.empty();
114  private boolean isResourceReleased = false;
115  private boolean allocationFired = false;
116
117  @Inject
118  private EvaluatorManager(
119      final Clock clock,
120      final RemoteManager remoteManager,
121      final ResourceReleaseHandler resourceReleaseHandler,
122      final ResourceLaunchHandler resourceLaunchHandler,
123      @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
124      @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor,
125      final ContextRepresenters contextRepresenters,
126      final ConfigurationSerializer configurationSerializer,
127      final EvaluatorMessageDispatcher messageDispatcher,
128      final EvaluatorControlHandler evaluatorControlHandler,
129      final ContextControlHandler contextControlHandler,
130      final EvaluatorStatusManager stateManager,
131      final ExceptionCodec exceptionCodec,
132      final EventHandlerIdlenessSource idlenessSource,
133      final LoggingScopeFactory loggingScopeFactory,
134      @Parameter(EvaluatorConfigurationProviders.class)
135      final Set<ConfigurationProvider> evaluatorConfigurationProviders,
136      final DriverRestartManager driverRestartManager,
137      final EvaluatorIdlenessThreadPool idlenessThreadPool) {
138    this.contextRepresenters = contextRepresenters;
139    this.idlenessSource = idlenessSource;
140    LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
141    this.clock = clock;
142    this.resourceReleaseHandler = resourceReleaseHandler;
143    this.resourceLaunchHandler = resourceLaunchHandler;
144    this.evaluatorId = evaluatorId;
145    this.evaluatorDescriptor = evaluatorDescriptor;
146
147    this.messageDispatcher = messageDispatcher;
148    this.evaluatorControlHandler = evaluatorControlHandler;
149    this.contextControlHandler = contextControlHandler;
150    this.stateManager = stateManager;
151    this.exceptionCodec = exceptionCodec;
152
153    this.remoteManager = remoteManager;
154    this.configurationSerializer = configurationSerializer;
155    this.loggingScopeFactory = loggingScopeFactory;
156    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
157    this.driverRestartManager = driverRestartManager;
158    this.idlenessThreadPool = idlenessThreadPool;
159
160    LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
161  }
162
163  /**
164   * Get the id of current job/application.
165   */
166  public static String getJobIdentifier() {
167    // TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path
168    // #845 is open to get the id from RM properly
169    for (File directory = new File(System.getProperty("user.dir"));
170         directory != null; directory = directory.getParentFile()) {
171      final String currentDirectoryName = directory.getName();
172      if (currentDirectoryName.toLowerCase().contains("application_")) {
173        return currentDirectoryName;
174      }
175    }
176    // cannot find a directory that contains application_, presumably we are on local runtime
177    // again, this is a hack for now, we need #845 as a proper solution
178    return "REEF_LOCAL_RUNTIME";
179  }
180
181  /**
182   * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once.
183   */
184  public synchronized void fireEvaluatorAllocatedEvent() {
185    if (!allocationFired && stateManager.isAllocated()) {
186      final AllocatedEvaluator allocatedEvaluator =
187          new AllocatedEvaluatorImpl(this,
188              remoteManager.getMyIdentifier(),
189              configurationSerializer,
190              getJobIdentifier(),
191              loggingScopeFactory,
192              evaluatorConfigurationProviders);
193      LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
194      messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
195      allocationFired = true;
196    } else {
197      LOG.log(Level.WARNING, "Evaluator allocated event fired more than once.");
198    }
199  }
200
201  private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) {
202    return resourceStatusEvent.getState() == State.DONE ||
203        resourceStatusEvent.getState() == State.FAILED ||
204        resourceStatusEvent.getState() == State.KILLED;
205  }
206
207  @Override
208  public String getId() {
209    return this.evaluatorId;
210  }
211
212  public void setProcess(final EvaluatorProcess process) {
213    this.evaluatorDescriptor.setProcess(process);
214  }
215
216  public EvaluatorDescriptor getEvaluatorDescriptor() {
217    return this.evaluatorDescriptor;
218  }
219
220  @Override
221  public void close() {
222    synchronized (this.evaluatorDescriptor) {
223      if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
224        LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
225        try {
226          if (this.stateManager.isRunning()){
227            // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
228            final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
229                EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
230                    .setTimestamp(System.currentTimeMillis())
231                    .setIdentifier(getId())
232                    .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
233                    .build();
234            sendEvaluatorControlMessage(evaluatorControlProto);
235          }
236        } finally {
237          this.stateManager.setKilled();
238        }
239      }
240
241
242      if (!this.isResourceReleased) {
243        this.isResourceReleased = true;
244        try {
245        /* We need to wait awhile before returning the container to the RM in order to
246         * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */
247          this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
248            @Override
249            public void onNext(final Alarm alarm) {
250              EvaluatorManager.this.resourceReleaseHandler.onNext(
251                      ResourceReleaseEventImpl.newBuilder()
252                              .setIdentifier(EvaluatorManager.this.evaluatorId)
253                              .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
254                              .build()
255              );
256            }
257          });
258        } catch (final IllegalStateException e) {
259          LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
260          EvaluatorManager.this.resourceReleaseHandler.onNext(
261                  ResourceReleaseEventImpl.newBuilder()
262                          .setIdentifier(EvaluatorManager.this.evaluatorId)
263                          .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
264                          .build()
265          );
266        }
267      }
268    }
269
270    idlenessThreadPool.runCheckAsync(this);
271  }
272
273  /**
274   * Return true if the state is DONE, FAILED, or KILLED,
275   * <em>and</em> there are no messages queued or in processing.
276   */
277  public boolean isClosed() {
278    return this.messageDispatcher.isEmpty() &&
279           this.stateManager.isDoneOrFailedOrKilled();
280  }
281
282  /**
283   * Triggers a call to check the idleness of the Evaluator.
284   */
285  void checkIdlenessSource() {
286    this.idlenessSource.check();
287  }
288
289  /**
290   * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED.
291   *
292   * @param exception on the EvaluatorRuntime
293   */
294  public void onEvaluatorException(final EvaluatorException exception) {
295    synchronized (this.evaluatorDescriptor) {
296      if (this.stateManager.isDoneOrFailedOrKilled()) {
297        LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} which is already in state {1}.",
298            new Object[]{this.getId(), this.stateManager});
299        return;
300      }
301
302      LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception);
303
304      try {
305
306        final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
307
308        final Optional<FailedTask> failedTaskOptional;
309        if (this.task.isPresent()) {
310          final String taskId = this.task.get().getId();
311          final Optional<ActiveContext> evaluatorContext = Optional.empty();
312          final Optional<byte[]> bytes = Optional.empty();
313          final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash"));
314          final String message = "Evaluator crash";
315          final Optional<String> description = Optional.empty();
316          final FailedTask failedTask =
317              new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
318          failedTaskOptional = Optional.of(failedTask);
319        } else {
320          failedTaskOptional = Optional.empty();
321        }
322
323        final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(exception, failedContextList,
324            failedTaskOptional, this.evaluatorId);
325
326        if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) {
327          this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
328        } else {
329          this.messageDispatcher.onEvaluatorFailed(failedEvaluator);
330        }
331      } catch (final Exception e) {
332        LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
333      } finally {
334        this.stateManager.setFailed();
335        close();
336      }
337    }
338  }
339
340  /**
341   * Process an evaluator heartbeat message.
342   */
343  public void onEvaluatorHeartbeatMessage(
344      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
345
346    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
347        evaluatorHeartbeatProtoRemoteMessage.getMessage();
348    LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
349
350    synchronized (this.evaluatorDescriptor) {
351      if (this.stateManager.isDoneOrFailedOrKilled()) {
352        LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.",
353            new Object[]{this.getId(), this.stateManager});
354        return;
355      }
356
357      this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp());
358      final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
359
360      final EvaluatorRestartState evaluatorRestartState = driverRestartManager.getEvaluatorRestartState(evaluatorId);
361
362      /*
363       * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator
364       * from a separate application attempt. In the case of a previous evaluator, if the restart period has not
365       * yet expired, we should register it and trigger context active and task events. If the restart period has
366       * expired, we should return immediately after setting its remote ID in order to close it.
367       */
368      if (this.stateManager.isSubmitted() ||
369          evaluatorRestartState == EvaluatorRestartState.REPORTED ||
370          evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
371
372        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
373
374        if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
375          // Don't do anything if evaluator has expired. Close it immediately upon exit of this method.
376          return;
377        }
378
379        this.stateManager.setRunning();
380        LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
381
382        if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
383          driverRestartManager.setEvaluatorReregistered(evaluatorId);
384        }
385      }
386
387      // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806.
388      final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp();
389
390      // Process the Evaluator status message
391      if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
392        EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus());
393        this.onEvaluatorStatusMessage(evaluatorStatus);
394      }
395
396      // Process the Context status message(s)
397      final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
398      final List<ContextStatusPOJO> contextStatusList = new ArrayList<>();
399      for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) {
400        contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber));
401      }
402
403      this.contextRepresenters.onContextStatusMessages(contextStatusList,
404          informClientOfNewContexts);
405
406      // Process the Task status message
407      if (evaluatorHeartbeatProto.hasTaskStatus()) {
408        TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber);
409        this.onTaskStatusMessage(taskStatus);
410      }
411      LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
412    }
413  }
414
415  /**
416   * Process a evaluator status message.
417   *
418   * @param message
419   */
420  private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) {
421
422    switch (message.getState()) {
423    case DONE:
424      this.onEvaluatorDone(message);
425      break;
426    case FAILED:
427      this.onEvaluatorFailed(message);
428      break;
429    case INIT:
430    case KILLED:
431    case RUNNING:
432    case SUSPEND:
433      break;
434    default:
435      throw new RuntimeException("Unknown state: " + message.getState());
436    }
437  }
438
439  /**
440   * Process an evaluator message that indicates that the evaluator shut down cleanly.
441   *
442   * @param message
443   */
444  private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) {
445    assert message.getState() == State.DONE;
446    LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
447
448    // Send an ACK to the Evaluator.
449    sendEvaluatorControlMessage(
450        EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
451            .setTimestamp(System.currentTimeMillis())
452            .setIdentifier(getId())
453            .setDoneEvaluator(EvaluatorRuntimeProtocol.DoneEvaluatorProto.newBuilder().build())
454            .build());
455
456    this.stateManager.setDone();
457    this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
458    close();
459  }
460
461  /**
462   * Process an evaluator message that indicates a crash.
463   *
464   * @param evaluatorStatus
465   */
466  private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) {
467    assert evaluatorStatus.getState()
468            == State.FAILED;
469    final EvaluatorException evaluatorException;
470    if (evaluatorStatus.hasError()) {
471      final Optional<Throwable> exception =
472          this.exceptionCodec.fromBytes(evaluatorStatus.getError());
473      if (exception.isPresent()) {
474        evaluatorException = new EvaluatorException(getId(), exception.get());
475      } else {
476        evaluatorException = new EvaluatorException(getId(),
477            new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError()));
478      }
479    } else {
480      evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
481    }
482    onEvaluatorException(evaluatorException);
483  }
484
485  public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) {
486    synchronized (this.evaluatorDescriptor) {
487      if (this.stateManager.isAllocated()) {
488        this.stateManager.setSubmitted();
489        this.resourceLaunchHandler.onNext(resourceLaunchEvent);
490      } else {
491        throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
492            " state but instead is in state " + this.stateManager);
493      }
494    }
495  }
496
497  /**
498   * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime.
499   *
500   * @param contextControlProto message contains context control info.
501   */
502  public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
503    synchronized (this.evaluatorDescriptor) {
504      LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
505      this.contextControlHandler.send(contextControlProto);
506    }
507  }
508
509  /**
510   * Forward the EvaluatorControlProto to the EvaluatorRuntime.
511   *
512   * @param evaluatorControlProto message contains evaluator control information.
513   */
514  void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
515    synchronized (this.evaluatorDescriptor) {
516      this.evaluatorControlHandler.send(evaluatorControlProto);
517    }
518  }
519
520  /**
521   * Handle task status messages.
522   *
523   * @param taskStatus message contains the current task status.
524   */
525  private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
526
527    if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) {
528      if (taskStatus.getState() == State.INIT ||
529          taskStatus.getState() == State.FAILED ||
530          taskStatus.getState() == State.RUNNING ||
531          driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) {
532
533        // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order
534        // [REEF-289] is a related item which may fix the issue
535        if (taskStatus.getState() == State.RUNNING) {
536          LOG.log(Level.WARNING,
537                  "Received a message of state " + ReefServiceProtos.State.RUNNING +
538                  " for Task " + taskStatus.getTaskId() +
539                  " before receiving its " + ReefServiceProtos.State.INIT + " state");
540        }
541
542        // FAILED is a legal first state of a Task as it could have failed during construction.
543        this.task = Optional.of(
544            new TaskRepresenter(taskStatus.getTaskId(),
545                this.contextRepresenters.getContext(taskStatus.getContextId()),
546                this.messageDispatcher,
547                this,
548                this.exceptionCodec,
549                this.driverRestartManager));
550      } else {
551        throw new RuntimeException("Received a message of state " + taskStatus.getState() +
552            ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() +
553            " which we haven't heard from before.");
554      }
555    }
556    this.task.get().onTaskStatusMessage(taskStatus);
557
558    if (this.task.get().isNotRunning()) {
559      LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
560      this.task = Optional.empty();
561    }
562  }
563
564  /**
565   * Resource status information from the (actual) resource manager.
566   */
567  public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) {
568    synchronized (this.evaluatorDescriptor) {
569      LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusEvent.getState());
570      if (this.stateManager.isDoneOrFailedOrKilled()) {
571        LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
572            new Object[]{this.getId(), this.stateManager});
573      } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) {
574        // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes
575        // it to be alive.
576        final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
577            .append(this.evaluatorId)
578            .append("] is assumed to be in state [")
579            .append(this.stateManager.toString())
580            .append("]. But the resource manager reports it to be in state [")
581            .append(resourceStatusEvent.getState())
582            .append("].");
583
584        if (this.stateManager.isSubmitted()) {
585          messageBuilder
586              .append(" This most likely means that the Evaluator suffered a failure before establishing " +
587                  "a communications link to the driver.");
588        } else if (this.stateManager.isAllocated()) {
589          messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
590        } else if (this.stateManager.isRunning()) {
591          messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " +
592              "back to the driver.");
593        }
594        if (this.task.isPresent()) {
595          messageBuilder.append(" Task [")
596              .append(this.task.get().getId())
597              .append("] was running when the Evaluator crashed.");
598        }
599
600        if (resourceStatusEvent.getState() == State.KILLED) {
601          this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId,
602              messageBuilder.toString()));
603        } else {
604          this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
605        }
606      }
607    }
608  }
609
610  @Override
611  public String toString() {
612    return "EvaluatorManager:"
613        + " id=" + this.evaluatorId
614        + " state=" + this.stateManager
615        + " task=" + this.task;
616  }
617
618  // Dynamic Parameters
619
620  /**
621   * The Evaluator Identifier.
622   */
623  @NamedParameter(doc = "The Evaluator Identifier.")
624  public static final class EvaluatorIdentifier implements Name<String> {
625  }
626
627  /**
628   * The Evaluator Host.
629   */
630  @NamedParameter(doc = "The Evaluator Host.")
631  public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> {
632  }
633}