This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.evaluator.FailedEvaluator;
024import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
025import org.apache.reef.driver.restart.DriverRestartManager;
026import org.apache.reef.driver.restart.EvaluatorRestartState;
027import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
028import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO;
029import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
030import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO;
031import org.apache.reef.tang.ConfigurationProvider;
032import org.apache.reef.driver.context.ActiveContext;
033import org.apache.reef.driver.context.FailedContext;
034import org.apache.reef.driver.evaluator.AllocatedEvaluator;
035import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
036import org.apache.reef.driver.task.FailedTask;
037import org.apache.reef.exception.EvaluatorException;
038import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
039import org.apache.reef.io.naming.Identifiable;
040import org.apache.reef.proto.EvaluatorRuntimeProtocol;
041import org.apache.reef.proto.ReefServiceProtos;
042import org.apache.reef.driver.evaluator.EvaluatorProcess;
043import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
044import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
045import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
046import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
047import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
048import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
049import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
050import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
051import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
052import org.apache.reef.runtime.common.utils.ExceptionCodec;
053import org.apache.reef.runtime.common.utils.RemoteManager;
054import org.apache.reef.tang.annotations.Name;
055import org.apache.reef.tang.annotations.NamedParameter;
056import org.apache.reef.tang.annotations.Parameter;
057import org.apache.reef.tang.formats.ConfigurationSerializer;
058import org.apache.reef.util.Optional;
059import org.apache.reef.util.logging.LoggingScopeFactory;
060import org.apache.reef.wake.EventHandler;
061import org.apache.reef.wake.remote.RemoteMessage;
062import org.apache.reef.wake.time.Clock;
063import org.apache.reef.wake.time.event.Alarm;
064
065import javax.inject.Inject;
066import java.io.File;
067import java.util.ArrayList;
068import java.util.List;
069import java.util.Set;
070import java.util.logging.Level;
071import java.util.logging.Logger;
072
073/**
074 * Manages a single Evaluator instance including all lifecycle instances:
075 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
076 * <p>
077 * A (periodic) heartbeat channel is established from EvaluatorRuntime to EvaluatorManager.
078 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this
079 * heartbeat channel.
080 * <p>
081 * A (push-based) EventHandler channel is established from EvaluatorManager to EvaluatorRuntime.
082 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
083 * control information (e.g., shutdown, suspend).
084 */
085@Private
086@DriverSide
087public final class EvaluatorManager implements Identifiable, AutoCloseable {
088
089  private static final Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
090
091  private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
092  private final Clock clock;
093  private final ResourceReleaseHandler resourceReleaseHandler;
094  private final ResourceLaunchHandler resourceLaunchHandler;
095  private final String evaluatorId;
096  private final EvaluatorDescriptorImpl evaluatorDescriptor;
097  private final ContextRepresenters contextRepresenters;
098  private final EvaluatorMessageDispatcher messageDispatcher;
099  private final EvaluatorControlHandler evaluatorControlHandler;
100  private final ContextControlHandler contextControlHandler;
101  private final EvaluatorStatusManager stateManager;
102  private final ExceptionCodec exceptionCodec;
103  private final EventHandlerIdlenessSource idlenessSource;
104  private final RemoteManager remoteManager;
105  private final ConfigurationSerializer configurationSerializer;
106  private final LoggingScopeFactory loggingScopeFactory;
107  private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
108  private final DriverRestartManager driverRestartManager;
109
110  // Mutable fields
111  private Optional<TaskRepresenter> task = Optional.empty();
112  private boolean isResourceReleased = false;
113  private boolean allocationFired = false;
114
115  @Inject
116  private EvaluatorManager(
117      final Clock clock,
118      final RemoteManager remoteManager,
119      final ResourceReleaseHandler resourceReleaseHandler,
120      final ResourceLaunchHandler resourceLaunchHandler,
121      @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
122      @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor,
123      final ContextRepresenters contextRepresenters,
124      final ConfigurationSerializer configurationSerializer,
125      final EvaluatorMessageDispatcher messageDispatcher,
126      final EvaluatorControlHandler evaluatorControlHandler,
127      final ContextControlHandler contextControlHandler,
128      final EvaluatorStatusManager stateManager,
129      final ExceptionCodec exceptionCodec,
130      final EventHandlerIdlenessSource idlenessSource,
131      final LoggingScopeFactory loggingScopeFactory,
132      @Parameter(EvaluatorConfigurationProviders.class)
133      final Set<ConfigurationProvider> evaluatorConfigurationProviders,
134      final DriverRestartManager driverRestartManager) {
135    this.contextRepresenters = contextRepresenters;
136    this.idlenessSource = idlenessSource;
137    LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
138    this.clock = clock;
139    this.resourceReleaseHandler = resourceReleaseHandler;
140    this.resourceLaunchHandler = resourceLaunchHandler;
141    this.evaluatorId = evaluatorId;
142    this.evaluatorDescriptor = evaluatorDescriptor;
143
144    this.messageDispatcher = messageDispatcher;
145    this.evaluatorControlHandler = evaluatorControlHandler;
146    this.contextControlHandler = contextControlHandler;
147    this.stateManager = stateManager;
148    this.exceptionCodec = exceptionCodec;
149
150    this.remoteManager = remoteManager;
151    this.configurationSerializer = configurationSerializer;
152    this.loggingScopeFactory = loggingScopeFactory;
153    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
154    this.driverRestartManager = driverRestartManager;
155
156    LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
157  }
158
159  /**
160   * Get the id of current job/application.
161   */
162  public static String getJobIdentifier() {
163    // TODO[JIRA REEF-818]: currently we obtain the job id directly by parsing execution (container) directory path
164    // #845 is open to get the id from RM properly
165    for (File directory = new File(System.getProperty("user.dir"));
166         directory != null; directory = directory.getParentFile()) {
167      final String currentDirectoryName = directory.getName();
168      if (currentDirectoryName.toLowerCase().contains("application_")) {
169        return currentDirectoryName;
170      }
171    }
172    // cannot find a directory that contains application_, presumably we are on local runtime
173    // again, this is a hack for now, we need #845 as a proper solution
174    return "REEF_LOCAL_RUNTIME";
175  }
176
177  /**
178   * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once.
179   */
180  public synchronized void fireEvaluatorAllocatedEvent() {
181    if (!allocationFired && stateManager.isAllocated()) {
182      final AllocatedEvaluator allocatedEvaluator =
183          new AllocatedEvaluatorImpl(this,
184              remoteManager.getMyIdentifier(),
185              configurationSerializer,
186              getJobIdentifier(),
187              loggingScopeFactory,
188              evaluatorConfigurationProviders);
189      LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
190      messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
191      allocationFired = true;
192    } else {
193      LOG.log(Level.WARNING, "Evaluator allocated event fired more than once.");
194    }
195  }
196
197  private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) {
198    return resourceStatusEvent.getState() == State.DONE ||
199        resourceStatusEvent.getState() == State.FAILED ||
200        resourceStatusEvent.getState() == State.KILLED;
201  }
202
203  @Override
204  public String getId() {
205    return this.evaluatorId;
206  }
207
208  public void setProcess(final EvaluatorProcess process) {
209    this.evaluatorDescriptor.setProcess(process);
210  }
211
212  public EvaluatorDescriptor getEvaluatorDescriptor() {
213    return this.evaluatorDescriptor;
214  }
215
216  @Override
217  public void close() {
218    synchronized (this.evaluatorDescriptor) {
219      if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
220        LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
221        try {
222          if (this.stateManager.isRunning()){
223            // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
224            final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
225                EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
226                    .setTimestamp(System.currentTimeMillis())
227                    .setIdentifier(getId())
228                    .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
229                    .build();
230            sendEvaluatorControlMessage(evaluatorControlProto);
231          }
232        } finally {
233          this.stateManager.setKilled();
234        }
235      }
236
237
238      if (!this.isResourceReleased) {
239        this.isResourceReleased = true;
240        try {
241        /* We need to wait awhile before returning the container to the RM in order to
242         * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */
243          this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
244            @Override
245            public void onNext(final Alarm alarm) {
246              EvaluatorManager.this.resourceReleaseHandler.onNext(
247                      ResourceReleaseEventImpl.newBuilder()
248                              .setIdentifier(EvaluatorManager.this.evaluatorId)
249                              .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
250                              .build()
251              );
252            }
253          });
254        } catch (final IllegalStateException e) {
255          LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
256          EvaluatorManager.this.resourceReleaseHandler.onNext(
257                  ResourceReleaseEventImpl.newBuilder()
258                          .setIdentifier(EvaluatorManager.this.evaluatorId)
259                          .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
260                          .build()
261          );
262        }
263      }
264    }
265
266    try {
267      this.messageDispatcher.close();
268    } catch (Exception e) {
269      LOG.log(Level.SEVERE, "Exception while closing EvaluatorManager", e);
270    }
271    this.idlenessSource.check();
272  }
273
274  /**
275   * Return true if the state is DONE, FAILED, or KILLED,
276   * <em>and</em> there are no messages queued or in processing.
277   */
278  public boolean isClosed() {
279    return this.messageDispatcher.isEmpty() &&
280           this.stateManager.isDoneOrFailedOrKilled();
281  }
282
283  /**
284   * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED.
285   *
286   * @param exception on the EvaluatorRuntime
287   */
288  public void onEvaluatorException(final EvaluatorException exception) {
289    synchronized (this.evaluatorDescriptor) {
290      if (this.stateManager.isDoneOrFailedOrKilled()) {
291        LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} which is already in state {1}.",
292            new Object[]{this.getId(), this.stateManager});
293        return;
294      }
295
296      LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception);
297
298      try {
299
300        final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
301
302        final Optional<FailedTask> failedTaskOptional;
303        if (this.task.isPresent()) {
304          final String taskId = this.task.get().getId();
305          final Optional<ActiveContext> evaluatorContext = Optional.empty();
306          final Optional<byte[]> bytes = Optional.empty();
307          final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash"));
308          final String message = "Evaluator crash";
309          final Optional<String> description = Optional.empty();
310          final FailedTask failedTask =
311              new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
312          failedTaskOptional = Optional.of(failedTask);
313        } else {
314          failedTaskOptional = Optional.empty();
315        }
316
317        final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(exception, failedContextList,
318            failedTaskOptional, this.evaluatorId);
319
320        if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) {
321          this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
322        } else {
323          this.messageDispatcher.onEvaluatorFailed(failedEvaluator);
324        }
325      } catch (final Exception e) {
326        LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
327      } finally {
328        this.stateManager.setFailed();
329        close();
330      }
331    }
332  }
333
334  /**
335   * Process an evaluator heartbeat message.
336   */
337  public void onEvaluatorHeartbeatMessage(
338      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
339
340    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
341        evaluatorHeartbeatProtoRemoteMessage.getMessage();
342    LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
343
344    synchronized (this.evaluatorDescriptor) {
345      if (this.stateManager.isDoneOrFailedOrKilled()) {
346        LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.",
347            new Object[]{this.getId(), this.stateManager});
348        return;
349      }
350
351      this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp());
352      final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
353
354      final EvaluatorRestartState evaluatorRestartState = driverRestartManager.getEvaluatorRestartState(evaluatorId);
355
356      /*
357       * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator
358       * from a separate application attempt. In the case of a previous evaluator, if the restart period has not
359       * yet expired, we should register it and trigger context active and task events. If the restart period has
360       * expired, we should return immediately after setting its remote ID in order to close it.
361       */
362      if (this.stateManager.isSubmitted() ||
363          evaluatorRestartState == EvaluatorRestartState.REPORTED ||
364          evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
365
366        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
367
368        if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) {
369          // Don't do anything if evaluator has expired. Close it immediately upon exit of this method.
370          return;
371        }
372
373        this.stateManager.setRunning();
374        LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
375
376        if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
377          driverRestartManager.setEvaluatorReregistered(evaluatorId);
378        }
379      }
380
381      // All messages from a heartbeat receive the heartbeat timestamp as a sequence number. See REEF-806.
382      final long messageSequenceNumber = evaluatorHeartbeatProto.getTimestamp();
383
384      // Process the Evaluator status message
385      if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
386        EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus());
387        this.onEvaluatorStatusMessage(evaluatorStatus);
388      }
389
390      // Process the Context status message(s)
391      final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
392      final List<ContextStatusPOJO> contextStatusList = new ArrayList<>();
393      for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) {
394        contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber));
395      }
396
397      this.contextRepresenters.onContextStatusMessages(contextStatusList,
398          informClientOfNewContexts);
399
400      // Process the Task status message
401      if (evaluatorHeartbeatProto.hasTaskStatus()) {
402        TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber);
403        this.onTaskStatusMessage(taskStatus);
404      }
405      LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
406    }
407  }
408
409  /**
410   * Process a evaluator status message.
411   *
412   * @param message
413   */
414  private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) {
415
416    switch (message.getState()) {
417    case DONE:
418      this.onEvaluatorDone(message);
419      break;
420    case FAILED:
421      this.onEvaluatorFailed(message);
422      break;
423    case INIT:
424    case KILLED:
425    case RUNNING:
426    case SUSPEND:
427      break;
428    default:
429      throw new RuntimeException("Unknown state: " + message.getState());
430    }
431  }
432
433  /**
434   * Process an evaluator message that indicates that the evaluator shut down cleanly.
435   *
436   * @param message
437   */
438  private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) {
439    assert message.getState() == State.DONE;
440    LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
441    this.stateManager.setDone();
442    this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
443    close();
444  }
445
446  /**
447   * Process an evaluator message that indicates a crash.
448   *
449   * @param evaluatorStatus
450   */
451  private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) {
452    assert evaluatorStatus.getState()
453            == State.FAILED;
454    final EvaluatorException evaluatorException;
455    if (evaluatorStatus.hasError()) {
456      final Optional<Throwable> exception =
457          this.exceptionCodec.fromBytes(evaluatorStatus.getError());
458      if (exception.isPresent()) {
459        evaluatorException = new EvaluatorException(getId(), exception.get());
460      } else {
461        evaluatorException = new EvaluatorException(getId(),
462            new Exception("Exception sent, but can't be deserialized"));
463      }
464    } else {
465      evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
466    }
467    onEvaluatorException(evaluatorException);
468  }
469
470  public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) {
471    synchronized (this.evaluatorDescriptor) {
472      if (this.stateManager.isAllocated()) {
473        this.stateManager.setSubmitted();
474        this.resourceLaunchHandler.onNext(resourceLaunchEvent);
475      } else {
476        throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
477            " state but instead is in state " + this.stateManager);
478      }
479    }
480  }
481
482  /**
483   * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime.
484   *
485   * @param contextControlProto message contains context control info.
486   */
487  public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
488    synchronized (this.evaluatorDescriptor) {
489      LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
490      this.contextControlHandler.send(contextControlProto);
491    }
492  }
493
494  /**
495   * Forward the EvaluatorControlProto to the EvaluatorRuntime.
496   *
497   * @param evaluatorControlProto message contains evaluator control information.
498   */
499  void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
500    synchronized (this.evaluatorDescriptor) {
501      this.evaluatorControlHandler.send(evaluatorControlProto);
502    }
503  }
504
505  /**
506   * Handle task status messages.
507   *
508   * @param taskStatus message contains the current task status.
509   */
510  private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
511
512    if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) {
513      if (taskStatus.getState() == State.INIT ||
514          taskStatus.getState() == State.FAILED ||
515          taskStatus.getState() == State.RUNNING ||
516          driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) {
517
518        // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order
519        // [REEF-289] is a related item which may fix the issue
520        if (taskStatus.getState() == State.RUNNING) {
521          LOG.log(Level.WARNING,
522                  "Received a message of state " + ReefServiceProtos.State.RUNNING +
523                  " for Task " + taskStatus.getTaskId() +
524                  " before receiving its " + ReefServiceProtos.State.INIT + " state");
525        }
526
527        // FAILED is a legal first state of a Task as it could have failed during construction.
528        this.task = Optional.of(
529            new TaskRepresenter(taskStatus.getTaskId(),
530                this.contextRepresenters.getContext(taskStatus.getContextId()),
531                this.messageDispatcher,
532                this,
533                this.exceptionCodec,
534                this.driverRestartManager));
535      } else {
536        throw new RuntimeException("Received a message of state " + taskStatus.getState() +
537            ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() +
538            " which we haven't heard from before.");
539      }
540    }
541    this.task.get().onTaskStatusMessage(taskStatus);
542
543    if (this.task.get().isNotRunning()) {
544      LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
545      this.task = Optional.empty();
546    }
547  }
548
549  /**
550   * Resource status information from the (actual) resource manager.
551   */
552  public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) {
553    synchronized (this.evaluatorDescriptor) {
554      LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusEvent.getState());
555      if (this.stateManager.isDoneOrFailedOrKilled()) {
556        LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
557            new Object[]{this.getId(), this.stateManager});
558      } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) {
559        // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes
560        // it to be alive.
561        final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
562            .append(this.evaluatorId)
563            .append("] is assumed to be in state [")
564            .append(this.stateManager.toString())
565            .append("]. But the resource manager reports it to be in state [")
566            .append(resourceStatusEvent.getState())
567            .append("].");
568
569        if (this.stateManager.isSubmitted()) {
570          messageBuilder
571              .append(" This most likely means that the Evaluator suffered a failure before establishing " +
572                  "a communications link to the driver.");
573        } else if (this.stateManager.isAllocated()) {
574          messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
575        } else if (this.stateManager.isRunning()) {
576          messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " +
577              "back to the driver.");
578        }
579        if (this.task.isPresent()) {
580          messageBuilder.append(" Task [")
581              .append(this.task.get().getId())
582              .append("] was running when the Evaluator crashed.");
583        }
584
585        if (resourceStatusEvent.getState() == State.KILLED) {
586          this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId,
587              messageBuilder.toString()));
588        } else {
589          this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
590        }
591      }
592    }
593  }
594
595  @Override
596  public String toString() {
597    return "EvaluatorManager:"
598        + " id=" + this.evaluatorId
599        + " state=" + this.stateManager
600        + " task=" + this.task;
601  }
602
603  // Dynamic Parameters
604
605  /**
606   * The Evaluator Identifier.
607   */
608  @NamedParameter(doc = "The Evaluator Identifier.")
609  public static final class EvaluatorIdentifier implements Name<String> {
610  }
611
612  /**
613   * The Evaluator Host.
614   */
615  @NamedParameter(doc = "The Evaluator Host.")
616  public static final class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> {
617  }
618}