This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.task;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.context.ActiveContext;
024import org.apache.reef.driver.restart.DriverRestartManager;
025import org.apache.reef.driver.restart.EvaluatorRestartState;
026import org.apache.reef.driver.task.FailedTask;
027import org.apache.reef.driver.task.RunningTask;
028import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
030import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
031import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
032import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskMessagePOJO;
033import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO;
034import org.apache.reef.runtime.common.utils.ExceptionCodec;
035import org.apache.reef.util.Optional;
036
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Represents a Task on the Driver.
042 */
043@DriverSide
044@Private
045public final class TaskRepresenter {
046
047  private static final Logger LOG = Logger.getLogger(TaskRepresenter.class.getName());
048
049  private final EvaluatorContext context;
050  private final EvaluatorMessageDispatcher messageDispatcher;
051  private final EvaluatorManager evaluatorManager;
052  private final ExceptionCodec exceptionCodec;
053  private final String taskId;
054  private final DriverRestartManager driverRestartManager;
055
056  // Mutable state
057  private State state = State.INIT;
058  private boolean isFirstRunningMessage = true;
059
060  public TaskRepresenter(final String taskId,
061                         final EvaluatorContext context,
062                         final EvaluatorMessageDispatcher messageDispatcher,
063                         final EvaluatorManager evaluatorManager,
064                         final ExceptionCodec exceptionCodec,
065                         final DriverRestartManager driverRestartManager) {
066    this.taskId = taskId;
067    this.context = context;
068    this.messageDispatcher = messageDispatcher;
069    this.evaluatorManager = evaluatorManager;
070    this.exceptionCodec = exceptionCodec;
071    this.driverRestartManager = driverRestartManager;
072  }
073
074  private static byte[] getResult(final TaskStatusPOJO taskStatus) {
075    return taskStatus.hasResult() ? taskStatus.getResult() : null;
076  }
077
078  public void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
079
080    LOG.log(Level.FINE, "Received task {0} status {1}",
081            new Object[]{taskStatus.getTaskId(), taskStatus.getState()});
082
083    // Make sure that the message is indeed for us.
084    if (!taskStatus.getContextId().equals(this.context.getId())) {
085      throw new RuntimeException(
086          "Received a message for a task running on Context " + taskStatus.getContextId() +
087              " while the Driver believes this Task to be run on Context " + this.context.getId());
088    }
089
090    if (!taskStatus.getTaskId().equals(this.taskId)) {
091      throw new RuntimeException("Received a message for task " + taskStatus.getTaskId() +
092          " in the TaskRepresenter for Task " + this.taskId);
093    }
094
095    if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) {
096      // when a recovered heartbeat is received, we will take its word for it
097      LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
098          new Object[]{taskStatus.getState(), this.taskId});
099      this.setState(taskStatus.getState());
100    }
101    // Dispatch the message to the right method.
102    switch (taskStatus.getState()) {
103    case INIT:
104      this.onTaskInit(taskStatus);
105      break;
106    case RUNNING:
107      this.onTaskRunning(taskStatus);
108      break;
109    case SUSPEND:
110      this.onTaskSuspend(taskStatus);
111      break;
112    case DONE:
113      this.onTaskDone(taskStatus);
114      break;
115    case FAILED:
116      this.onTaskFailed(taskStatus);
117      break;
118    default:
119      throw new IllegalStateException("Unknown task state: " + taskStatus.getState());
120    }
121  }
122
123  private void onTaskInit(final TaskStatusPOJO taskStatusPOJO) {
124    assert State.INIT == taskStatusPOJO.getState();
125    if (this.isKnown()) {
126      LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" +
127          " which we have seen before. Ignoring the second message", this.taskId);
128    } else {
129      this.setState(State.RUNNING);
130    }
131  }
132
133  private void onTaskRunning(final TaskStatusPOJO taskStatus) {
134    assert taskStatus.getState() == State.RUNNING;
135
136    if (this.isNotRunning()) {
137      throw new IllegalStateException("Received a task status message from task " + this.taskId +
138          " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state);
139    }
140
141    if (isFirstRunningMessage) {
142      isFirstRunningMessage = false;
143      final RunningTask runningTask = new RunningTaskImpl(
144          this.evaluatorManager, this.taskId, this.context, this);
145      this.messageDispatcher.onTaskRunning(runningTask);
146    }
147
148    // fire driver restart task running handler if this is a recovery heartbeat
149    if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) {
150      final RunningTask runningTask = new RunningTaskImpl(
151          this.evaluatorManager, this.taskId, this.context, this);
152      this.driverRestartManager.setEvaluatorProcessed(evaluatorManager.getId());
153      this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
154    }
155
156    for (final TaskMessagePOJO
157             taskMessagePOJO : taskStatus.getTaskMessageList()) {
158      this.messageDispatcher.onTaskMessage(
159          new TaskMessageImpl(taskMessagePOJO.getMessage(),
160              this.taskId, this.context.getId(), taskMessagePOJO.getSourceId(), taskMessagePOJO.getSequenceNumber()));
161    }
162  }
163
164  private void onTaskSuspend(final TaskStatusPOJO taskStatus) {
165    assert State.SUSPEND == taskStatus.getState();
166    assert this.isKnown();
167    this.messageDispatcher.onTaskSuspended(
168        new SuspendedTaskImpl(this.context, getResult(taskStatus), this.taskId));
169    this.setState(State.SUSPEND);
170  }
171
172  private void onTaskDone(final TaskStatusPOJO taskStatus) {
173    assert State.DONE == taskStatus.getState();
174    assert this.isKnown();
175    this.messageDispatcher.onTaskCompleted(
176        new CompletedTaskImpl(this.context, getResult(taskStatus), this.taskId));
177    this.setState(State.DONE);
178  }
179
180  private void onTaskFailed(final TaskStatusPOJO taskStatus) {
181    assert State.FAILED == taskStatus.getState();
182    final Optional<ActiveContext> evaluatorContext = Optional.<ActiveContext>of(this.context);
183    final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatus));
184    final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes);
185    final String message = exception.isPresent() ? exception.get().getMessage() : "No message given";
186    final Optional<String> description = Optional.empty();
187    final FailedTask failedTask = new FailedTask(
188        this.taskId, message, description, exception, bytes, evaluatorContext);
189    this.messageDispatcher.onTaskFailed(failedTask);
190    this.setState(State.FAILED);
191  }
192
193  public String getId() {
194    return this.taskId;
195  }
196
197  /**
198   * @return true, if we had at least one message from the task.
199   */
200  private boolean isKnown() {
201    return this.state != State.INIT;
202  }
203
204  /**
205   * @return true, if this task is in any other state but RUNNING.
206   */
207  public boolean isNotRunning() {
208    return this.state != State.RUNNING;
209  }
210
211  /**
212   * @return true, if this task is in INIT or RUNNING status.
213   */
214  public boolean isClosable() {
215    return this.state == State.INIT || this.state == State.RUNNING;
216  }
217
218  private void setState(final State newState) {
219    LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]",
220        new Object[]{this.taskId, this.state, newState});
221    this.state = newState;
222  }
223
224  /**
225   * Check whether this evaluator is in closing state.
226   * @return whether this evaluator is in closing state.
227   */
228  public boolean evaluatorIsClosing() {
229    return evaluatorManager.isClosing();
230  }
231
232  /**
233   * Check whether this evaluator is in closed state.
234   * @return whether this evaluator is in closed state.
235   */
236  public boolean evaluatorIsClosed() {
237    return evaluatorManager.isClosed();
238  }
239}