This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.task;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.context.ActiveContext;
024import org.apache.reef.driver.task.FailedTask;
025import org.apache.reef.driver.task.RunningTask;
026import org.apache.reef.proto.ReefServiceProtos;
027import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
028import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
030import org.apache.reef.runtime.common.utils.ExceptionCodec;
031import org.apache.reef.util.Optional;
032
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036/**
037 * Represents a Task on the Driver.
038 */
039@DriverSide
040@Private
041public final class TaskRepresenter {
042
043  private static final Logger LOG = Logger.getLogger(TaskRepresenter.class.getName());
044
045  private final EvaluatorContext context;
046  private final EvaluatorMessageDispatcher messageDispatcher;
047  private final EvaluatorManager evaluatorManager;
048  private final ExceptionCodec exceptionCodec;
049  private final String taskId;
050
051  // Mutable state
052  private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
053
054  public TaskRepresenter(final String taskId,
055                         final EvaluatorContext context,
056                         final EvaluatorMessageDispatcher messageDispatcher,
057                         final EvaluatorManager evaluatorManager,
058                         final ExceptionCodec exceptionCodec) {
059    this.taskId = taskId;
060    this.context = context;
061    this.messageDispatcher = messageDispatcher;
062    this.evaluatorManager = evaluatorManager;
063    this.exceptionCodec = exceptionCodec;
064  }
065
066  private static byte[] getResult(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
067    return taskStatusProto.hasResult() ? taskStatusProto.getResult().toByteArray() : null;
068  }
069
070  public void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
071
072    LOG.log(Level.FINE, "Received task {0} status {1}",
073        new Object[]{taskStatusProto.getTaskId(), taskStatusProto.getState()});
074
075    // Make sure that the message is indeed for us.
076    if (!taskStatusProto.getContextId().equals(this.context.getId())) {
077      throw new RuntimeException(
078          "Received a message for a task running on Context " + taskStatusProto.getContextId() +
079              " while the Driver believes this Task to be run on Context " + this.context.getId());
080    }
081
082    if (!taskStatusProto.getTaskId().equals(this.taskId)) {
083      throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() +
084          " in the TaskRepresenter for Task " + this.taskId);
085    }
086    if (taskStatusProto.getRecovery()) {
087      // when a recovered heartbeat is received, we will take its word for it
088      LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
089          new Object[]{taskStatusProto.getState(), this.taskId});
090      this.setState(taskStatusProto.getState());
091    }
092    // Dispatch the message to the right method.
093    switch (taskStatusProto.getState()) {
094      case INIT:
095        this.onTaskInit(taskStatusProto);
096        break;
097      case RUNNING:
098        this.onTaskRunning(taskStatusProto);
099        break;
100      case SUSPEND:
101        this.onTaskSuspend(taskStatusProto);
102        break;
103      case DONE:
104        this.onTaskDone(taskStatusProto);
105        break;
106      case FAILED:
107        this.onTaskFailed(taskStatusProto);
108        break;
109      default:
110        throw new IllegalStateException("Unknown task state: " + taskStatusProto.getState());
111    }
112  }
113
114  private void onTaskInit(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
115    assert ((ReefServiceProtos.State.INIT == taskStatusProto.getState()));
116    if (this.isKnown()) {
117      LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" +
118          " which we have seen before. Ignoring the second message", this.taskId);
119    } else {
120      final RunningTask runningTask = new RunningTaskImpl(
121          this.evaluatorManager, this.taskId, this.context, this);
122      this.messageDispatcher.onTaskRunning(runningTask);
123      this.setState(ReefServiceProtos.State.RUNNING);
124    }
125  }
126
127  private void onTaskRunning(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
128
129    assert (taskStatusProto.getState() == ReefServiceProtos.State.RUNNING);
130
131    if (this.isNotRunning()) {
132      throw new IllegalStateException("Received a task status message from task " + this.taskId +
133          " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state);
134    }
135
136    // fire driver restart task running handler if this is a recovery heartbeat
137    if (taskStatusProto.getRecovery()) {
138      final RunningTask runningTask = new RunningTaskImpl(
139          this.evaluatorManager, this.taskId, this.context, this);
140      this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
141    }
142
143    for (final ReefServiceProtos.TaskStatusProto.TaskMessageProto taskMessageProto : taskStatusProto.getTaskMessageList()) {
144      this.messageDispatcher.onTaskMessage(
145          new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
146              this.taskId, this.context.getId(), taskMessageProto.getSourceId()));
147    }
148  }
149
150  private void onTaskSuspend(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
151    assert (ReefServiceProtos.State.SUSPEND == taskStatusProto.getState());
152    assert (this.isKnown());
153    this.messageDispatcher.onTaskSuspended(
154        new SuspendedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
155    this.setState(ReefServiceProtos.State.SUSPEND);
156  }
157
158  private void onTaskDone(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
159    assert (ReefServiceProtos.State.DONE == taskStatusProto.getState());
160    assert (this.isKnown());
161    this.messageDispatcher.onTaskCompleted(
162        new CompletedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
163    this.setState(ReefServiceProtos.State.DONE);
164  }
165
166  private void onTaskFailed(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
167    assert (ReefServiceProtos.State.FAILED == taskStatusProto.getState());
168    final Optional<ActiveContext> evaluatorContext = Optional.<ActiveContext>of(this.context);
169    final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatusProto));
170    final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes);
171    final String message = exception.isPresent() ? exception.get().getMessage() : "No message given";
172    final Optional<String> description = Optional.empty();
173    final FailedTask failedTask = new FailedTask(
174        this.taskId, message, description, exception, bytes, evaluatorContext);
175    this.messageDispatcher.onTaskFailed(failedTask);
176    this.setState(ReefServiceProtos.State.FAILED);
177  }
178
179  public String getId() {
180    return this.taskId;
181  }
182
183  /**
184   * @return true, if we had at least one message from the task.
185   */
186  private boolean isKnown() {
187    return this.state != ReefServiceProtos.State.INIT;
188  }
189
190  /**
191   * @return true, if this task is in any other state but RUNNING.
192   */
193  public boolean isNotRunning() {
194    return this.state != ReefServiceProtos.State.RUNNING;
195  }
196
197  private void setState(final ReefServiceProtos.State newState) {
198    LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]",
199        new Object[]{this.taskId, this.state, newState});
200    this.state = newState;
201  }
202}