This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.evaluator;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.context.ActiveContext;
024import org.apache.reef.driver.context.FailedContext;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
027import org.apache.reef.driver.evaluator.EvaluatorType;
028import org.apache.reef.driver.task.FailedTask;
029import org.apache.reef.exception.EvaluatorException;
030import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
031import org.apache.reef.io.naming.Identifiable;
032import org.apache.reef.proto.DriverRuntimeProtocol;
033import org.apache.reef.proto.EvaluatorRuntimeProtocol;
034import org.apache.reef.proto.ReefServiceProtos;
035import org.apache.reef.runtime.common.DriverRestartCompleted;
036import org.apache.reef.runtime.common.driver.DriverStatusManager;
037import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
038import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
039import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
040import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
041import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
042import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
043import org.apache.reef.runtime.common.utils.ExceptionCodec;
044import org.apache.reef.runtime.common.utils.RemoteManager;
045import org.apache.reef.tang.annotations.Name;
046import org.apache.reef.tang.annotations.NamedParameter;
047import org.apache.reef.tang.annotations.Parameter;
048import org.apache.reef.tang.formats.ConfigurationSerializer;
049import org.apache.reef.util.Optional;
050import org.apache.reef.util.logging.LoggingScopeFactory;
051import org.apache.reef.wake.EventHandler;
052import org.apache.reef.wake.remote.RemoteMessage;
053import org.apache.reef.wake.time.Clock;
054import org.apache.reef.wake.time.event.Alarm;
055
056import javax.inject.Inject;
057import java.io.File;
058import java.util.List;
059import java.util.logging.Level;
060import java.util.logging.Logger;
061
062/**
063 * Manages a single Evaluator instance including all lifecycle instances:
064 * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
065 * <p/>
066 * A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
067 * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this
068 * heartbeat channel.
069 * <p/>
070 * A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
071 * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
072 * control information (e.g., shutdown, suspend).
073 */
074@Private
075@DriverSide
076public final class EvaluatorManager implements Identifiable, AutoCloseable {
077
078  private final static Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
079
080  private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
081  private final Clock clock;
082  private final ResourceReleaseHandler resourceReleaseHandler;
083  private final ResourceLaunchHandler resourceLaunchHandler;
084  private final String evaluatorId;
085  private final EvaluatorDescriptorImpl evaluatorDescriptor;
086  private final ContextRepresenters contextRepresenters;
087  private final EvaluatorMessageDispatcher messageDispatcher;
088  private final EvaluatorControlHandler evaluatorControlHandler;
089  private final ContextControlHandler contextControlHandler;
090  private final EvaluatorStatusManager stateManager;
091  private final ExceptionCodec exceptionCodec;
092  private final DriverStatusManager driverStatusManager;
093  private final EventHandlerIdlenessSource idlenessSource;
094  private final LoggingScopeFactory loggingScopeFactory;
095
096
097  // Mutable fields
098  private Optional<TaskRepresenter> task = Optional.empty();
099  private boolean isResourceReleased = false;
100
101  @Inject
102  private EvaluatorManager(
103      final Clock clock,
104      final RemoteManager remoteManager,
105      final ResourceReleaseHandler resourceReleaseHandler,
106      final ResourceLaunchHandler resourceLaunchHandler,
107      final @Parameter(EvaluatorIdentifier.class) String evaluatorId,
108      final @Parameter(EvaluatorDescriptorName.class) EvaluatorDescriptorImpl evaluatorDescriptor,
109      final ContextRepresenters contextRepresenters,
110      final ConfigurationSerializer configurationSerializer,
111      final EvaluatorMessageDispatcher messageDispatcher,
112      final EvaluatorControlHandler evaluatorControlHandler,
113      final ContextControlHandler contextControlHandler,
114      final EvaluatorStatusManager stateManager,
115      final DriverStatusManager driverStatusManager,
116      final ExceptionCodec exceptionCodec,
117      final EventHandlerIdlenessSource idlenessSource,
118      final LoggingScopeFactory loggingScopeFactory) {
119    this.contextRepresenters = contextRepresenters;
120    this.idlenessSource = idlenessSource;
121    LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
122    this.clock = clock;
123    this.resourceReleaseHandler = resourceReleaseHandler;
124    this.resourceLaunchHandler = resourceLaunchHandler;
125    this.evaluatorId = evaluatorId;
126    this.evaluatorDescriptor = evaluatorDescriptor;
127
128    this.messageDispatcher = messageDispatcher;
129    this.evaluatorControlHandler = evaluatorControlHandler;
130    this.contextControlHandler = contextControlHandler;
131    this.stateManager = stateManager;
132    this.driverStatusManager = driverStatusManager;
133    this.exceptionCodec = exceptionCodec;
134    this.loggingScopeFactory = loggingScopeFactory;
135
136    final AllocatedEvaluator allocatedEvaluator =
137        new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier(), configurationSerializer, getJobIdentifier(), loggingScopeFactory);
138    LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
139    this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
140    LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
141  }
142
143  /**
144   * Get the id of current job/application
145   */
146  public static String getJobIdentifier() {
147    // TODO: currently we obtain the job id directly by parsing execution (container) directory path
148    // #845 is open to get the id from RM properly
149    for (File directory = new File(System.getProperty("user.dir"));
150         directory != null; directory = directory.getParentFile()) {
151      final String currentDirectoryName = directory.getName();
152      if (currentDirectoryName.toLowerCase().contains("application_")) {
153        return currentDirectoryName;
154      }
155    }
156    // cannot find a directory that contains application_, presumably we are on local runtime
157    // again, this is a hack for now, we need #845 as a proper solution
158    return "REEF_LOCAL_RUNTIME";
159  }
160
161  private static boolean isDoneOrFailedOrKilled(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
162    return resourceStatusProto.getState() == ReefServiceProtos.State.DONE ||
163        resourceStatusProto.getState() == ReefServiceProtos.State.FAILED ||
164        resourceStatusProto.getState() == ReefServiceProtos.State.KILLED;
165  }
166
167  @Override
168  public String getId() {
169    return this.evaluatorId;
170  }
171
172  public void setType(final EvaluatorType type) {
173    this.evaluatorDescriptor.setType(type);
174  }
175
176  public EvaluatorDescriptor getEvaluatorDescriptor() {
177    return this.evaluatorDescriptor;
178  }
179
180  @Override
181  public void close() {
182    synchronized (this.evaluatorDescriptor) {
183      if (this.stateManager.isRunning()) {
184        LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
185        try {
186          // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
187          final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
188              EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
189                  .setTimestamp(System.currentTimeMillis())
190                  .setIdentifier(getId())
191                  .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
192                  .build();
193          sendEvaluatorControlMessage(evaluatorControlProto);
194        } finally {
195          this.stateManager.setKilled();
196        }
197      }
198
199
200      if (!this.isResourceReleased) {
201        this.isResourceReleased = true;
202        try {
203        /* We need to wait awhile before returning the container to the RM in order to
204         * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */
205          this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
206            @Override
207            public void onNext(final Alarm alarm) {
208              EvaluatorManager.this.resourceReleaseHandler.onNext(
209                  DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
210                      .setIdentifier(EvaluatorManager.this.evaluatorId).build()
211              );
212            }
213          });
214        } catch (final IllegalStateException e) {
215          LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
216          EvaluatorManager.this.resourceReleaseHandler.onNext(
217              DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
218                  .setIdentifier(EvaluatorManager.this.evaluatorId).build()
219          );
220        }
221      }
222    }
223    this.idlenessSource.check();
224  }
225
226  /**
227   * Return true if the state is DONE, FAILED, or KILLED,
228   * <em>and</em> there are no messages queued or in processing.
229   */
230  public boolean isClosed() {
231    return this.messageDispatcher.isEmpty() &&
232        (this.stateManager.isDoneOrFailedOrKilled());
233  }
234
235  /**
236   * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED
237   *
238   * @param exception on the EvaluatorRuntime
239   */
240  public void onEvaluatorException(final EvaluatorException exception) {
241    synchronized (this.evaluatorDescriptor) {
242      if (this.stateManager.isDoneOrFailedOrKilled()) {
243        LOG.log(Level.FINE, "Ignoring an exception receivedfor Evaluator {0} which is already in state {1}.",
244            new Object[]{this.getId(), this.stateManager});
245        return;
246      }
247
248      LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception);
249
250      try {
251
252        final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
253
254        final Optional<FailedTask> failedTaskOptional;
255        if (this.task.isPresent()) {
256          final String taskId = this.task.get().getId();
257          final Optional<ActiveContext> evaluatorContext = Optional.empty();
258          final Optional<byte[]> bytes = Optional.empty();
259          final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash"));
260          final String message = "Evaluator crash";
261          final Optional<String> description = Optional.empty();
262          final FailedTask failedTask = new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
263          failedTaskOptional = Optional.of(failedTask);
264        } else {
265          failedTaskOptional = Optional.empty();
266        }
267
268
269        this.messageDispatcher.onEvaluatorFailed(new FailedEvaluatorImpl(exception, failedContextList, failedTaskOptional, this.evaluatorId));
270
271      } catch (final Exception e) {
272        LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
273      } finally {
274        this.stateManager.setFailed();
275        close();
276      }
277    }
278  }
279
280  public synchronized void onEvaluatorHeartbeatMessage(
281      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
282
283    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
284        evaluatorHeartbeatProtoRemoteMessage.getMessage();
285    LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
286
287    if (this.stateManager.isDoneOrFailedOrKilled()) {
288      LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.",
289          new Object[]{this.getId(), this.stateManager});
290      return;
291    }
292
293    this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp());
294    final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
295
296    // first message from a running evaluator trying to re-establish communications
297    if (evaluatorHeartbeatProto.getRecovery()) {
298      this.evaluatorControlHandler.setRemoteID(evaluatorRID);
299      this.stateManager.setRunning();
300
301      this.driverStatusManager.oneContainerRecovered();
302      final int numRecoveredContainers = this.driverStatusManager.getNumRecoveredContainers();
303
304      LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId);
305      final int expectedEvaluatorsNumber = this.driverStatusManager.getNumPreviousContainers();
306
307      if (numRecoveredContainers > expectedEvaluatorsNumber) {
308        LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but [{1}] evaluators have checked in.",
309            new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
310        throw new RuntimeException("More then expected number of evaluators are checking in during recovery.");
311      } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
312        LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. Recovery completed.", expectedEvaluatorsNumber);
313        this.driverStatusManager.setRestartCompleted();
314        this.messageDispatcher.OnDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis()));
315      } else {
316        LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] evaluators have checked in.",
317            new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
318      }
319    }
320
321    // If this is the first message from this Evaluator, register it.
322    if (this.stateManager.isSubmitted()) {
323      this.evaluatorControlHandler.setRemoteID(evaluatorRID);
324      this.stateManager.setRunning();
325      LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
326    }
327
328    // Process the Evaluator status message
329    if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
330      this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus());
331    }
332
333    // Process the Context status message(s)
334    final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
335    this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(),
336        informClientOfNewContexts);
337
338    // Process the Task status message
339    if (evaluatorHeartbeatProto.hasTaskStatus()) {
340      this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus());
341    }
342    LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
343  }
344
345  /**
346   * Process a evaluator status message.
347   *
348   * @param message
349   */
350  private synchronized void onEvaluatorStatusMessage(final ReefServiceProtos.EvaluatorStatusProto message) {
351
352    switch (message.getState()) {
353      case DONE:
354        this.onEvaluatorDone(message);
355        break;
356      case FAILED:
357        this.onEvaluatorFailed(message);
358        break;
359      case INIT:
360      case KILLED:
361      case RUNNING:
362      case SUSPEND:
363        break;
364    }
365  }
366
367  /**
368   * Process an evaluator message that indicates that the evaluator shut down cleanly.
369   *
370   * @param message
371   */
372  private synchronized void onEvaluatorDone(final ReefServiceProtos.EvaluatorStatusProto message) {
373    assert (message.getState() == ReefServiceProtos.State.DONE);
374    LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
375    this.stateManager.setDone();
376    this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
377    close();
378  }
379
380  /**
381   * Process an evaluator message that indicates a crash.
382   *
383   * @param evaluatorStatusProto
384   */
385  private synchronized void onEvaluatorFailed(final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) {
386    assert (evaluatorStatusProto.getState() == ReefServiceProtos.State.FAILED);
387    final EvaluatorException evaluatorException;
388    if (evaluatorStatusProto.hasError()) {
389      final Optional<Throwable> exception = this.exceptionCodec.fromBytes(evaluatorStatusProto.getError().toByteArray());
390      if (exception.isPresent()) {
391        evaluatorException = new EvaluatorException(getId(), exception.get());
392      } else {
393        evaluatorException = new EvaluatorException(getId(), new Exception("Exception sent, but can't be deserialized"));
394      }
395    } else {
396      evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
397    }
398    onEvaluatorException(evaluatorException);
399  }
400
401  public void onResourceLaunch(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
402    synchronized (this.evaluatorDescriptor) {
403      if (this.stateManager.isAllocated()) {
404        this.stateManager.setSubmitted();
405        this.resourceLaunchHandler.onNext(resourceLaunchProto);
406      } else {
407        throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
408            " state but instead is in state " + this.stateManager);
409      }
410    }
411  }
412
413  /**
414   * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime
415   *
416   * @param contextControlProto message contains context control info.
417   */
418  public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
419    synchronized (this.evaluatorDescriptor) {
420      LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
421      this.contextControlHandler.send(contextControlProto);
422    }
423  }
424
425  /**
426   * Forward the EvaluatorControlProto to the EvaluatorRuntime
427   *
428   * @param evaluatorControlProto message contains evaluator control information.
429   */
430  void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
431    synchronized (this.evaluatorDescriptor) {
432      this.evaluatorControlHandler.send(evaluatorControlProto);
433    }
434  }
435
436  /**
437   * Handle task status messages.
438   *
439   * @param taskStatusProto message contains the current task status.
440   */
441  private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
442
443    if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatusProto.getTaskId()))) {
444      if (taskStatusProto.getState() == ReefServiceProtos.State.INIT ||
445          taskStatusProto.getState() == ReefServiceProtos.State.FAILED ||
446          taskStatusProto.getRecovery() // for task from recovered evaluators
447          ) {
448
449        // FAILED is a legal first state of a Task as it could have failed during construction.
450        this.task = Optional.of(
451            new TaskRepresenter(taskStatusProto.getTaskId(),
452                this.contextRepresenters.getContext(taskStatusProto.getContextId()),
453                this.messageDispatcher,
454                this,
455                this.exceptionCodec));
456      } else {
457        throw new RuntimeException("Received an message of state " + taskStatusProto.getState() +
458            ", not INIT or FAILED for Task " + taskStatusProto.getTaskId() + " which we haven't heard from before.");
459      }
460    }
461    this.task.get().onTaskStatusMessage(taskStatusProto);
462
463    if (this.task.get().isNotRunning()) {
464      LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
465      this.task = Optional.empty();
466    }
467  }
468
469  /**
470   * Resource status information from the (actual) resource manager.
471   */
472  public void onResourceStatusMessage(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
473    synchronized (this.evaluatorDescriptor) {
474      LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusProto.getState());
475      if (this.stateManager.isDoneOrFailedOrKilled()) {
476        LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
477            new Object[]{this.getId(), this.stateManager});
478      } else if (isDoneOrFailedOrKilled(resourceStatusProto) && this.stateManager.isAllocatedOrSubmittedOrRunning()) {
479        // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes
480        // it to be alive.
481        final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
482            .append(this.evaluatorId)
483            .append("] is assumed to be in state [")
484            .append(this.stateManager.toString())
485            .append("]. But the resource manager reports it to be in state [")
486            .append(resourceStatusProto.getState())
487            .append("].");
488
489        if (this.stateManager.isSubmitted()) {
490          messageBuilder
491              .append(" This most likely means that the Evaluator suffered a failure before establishing a communications link to the driver.");
492        } else if (this.stateManager.isAllocated()) {
493          messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
494        } else if (this.stateManager.isRunning()) {
495          messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message back to the driver.");
496        }
497        if (this.task.isPresent()) {
498          messageBuilder.append(" Task [")
499              .append(this.task.get().getId())
500              .append("] was running when the Evaluator crashed.");
501        }
502        this.isResourceReleased = true;
503
504        if (resourceStatusProto.getState() == ReefServiceProtos.State.KILLED) {
505          this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString()));
506        } else {
507          this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
508        }
509      }
510    }
511  }
512
513  @Override
514  public String toString() {
515    return "EvaluatorManager:"
516        + " id=" + this.evaluatorId
517        + " state=" + this.stateManager
518        + " task=" + this.task;
519  }
520
521  // Dynamic Parameters
522  @NamedParameter(doc = "The Evaluator Identifier.")
523  public final static class EvaluatorIdentifier implements Name<String> {
524  }
525
526  @NamedParameter(doc = "The Evaluator Host.")
527  public final static class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> {
528  }
529}