This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.evaluator.task;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.driver.task.TaskConfigurationOptions;
023import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
024import org.apache.reef.proto.ReefServiceProtos;
025import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
026import org.apache.reef.runtime.common.utils.ExceptionCodec;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.task.TaskMessage;
029import org.apache.reef.task.TaskMessageSource;
030import org.apache.reef.util.Optional;
031
032import javax.inject.Inject;
033import java.util.ArrayList;
034import java.util.Collection;
035import java.util.List;
036import java.util.Set;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Represents the various states a Task could be in.
042 */
043public final class TaskStatus {
044  private static final Logger LOG = Logger.getLogger(TaskStatus.class.getName());
045
046  private final String taskId;
047  private final String contextId;
048  private final HeartBeatManager heartBeatManager;
049  private final Set<TaskMessageSource> evaluatorMessageSources;
050  private final ExceptionCodec exceptionCodec;
051  private Optional<Throwable> lastException = Optional.empty();
052  private Optional<byte[]> result = Optional.empty();
053  private State state = State.PRE_INIT;
054
055
056  @Inject
057  TaskStatus(final @Parameter(TaskConfigurationOptions.Identifier.class) String taskId,
058             final @Parameter(ContextIdentifier.class) String contextId,
059             final @Parameter(TaskConfigurationOptions.TaskMessageSources.class) Set<TaskMessageSource> evaluatorMessageSources,
060             final HeartBeatManager heartBeatManager,
061             final ExceptionCodec exceptionCodec) {
062    this.taskId = taskId;
063    this.contextId = contextId;
064    this.heartBeatManager = heartBeatManager;
065    this.evaluatorMessageSources = evaluatorMessageSources;
066    this.exceptionCodec = exceptionCodec;
067  }
068
069  /**
070   * @param from
071   * @param to
072   * @return true, if the state transition from state 'from' to state 'to' is legal.
073   */
074  private static boolean isLegal(final State from, final State to) {
075    if (from == null) {
076      return to == State.INIT;
077    }
078    switch (from) {
079      case PRE_INIT:
080        switch (to) {
081          case INIT:
082            return true;
083          default:
084            return false;
085        }
086      case INIT:
087        switch (to) {
088          case RUNNING:
089          case FAILED:
090          case KILLED:
091          case DONE:
092            return true;
093          default:
094            return false;
095        }
096      case RUNNING:
097        switch (to) {
098          case CLOSE_REQUESTED:
099          case SUSPEND_REQUESTED:
100          case FAILED:
101          case KILLED:
102          case DONE:
103            return true;
104          default:
105            return false;
106        }
107      case CLOSE_REQUESTED:
108        switch (to) {
109          case FAILED:
110          case KILLED:
111          case DONE:
112            return true;
113          default:
114            return false;
115        }
116      case SUSPEND_REQUESTED:
117        switch (to) {
118          case FAILED:
119          case KILLED:
120          case SUSPENDED:
121            return true;
122          default:
123            return false;
124        }
125
126      case FAILED:
127      case DONE:
128      case KILLED:
129        return false;
130      default:
131        return false;
132    }
133  }
134
135  public final String getTaskId() {
136    return this.taskId;
137  }
138
139  ReefServiceProtos.TaskStatusProto toProto() {
140    this.check();
141    final ReefServiceProtos.TaskStatusProto.Builder result = ReefServiceProtos.TaskStatusProto.newBuilder()
142        .setContextId(this.contextId)
143        .setTaskId(this.taskId)
144        .setState(this.getProtoState());
145
146    if (this.result.isPresent()) {
147      result.setResult(ByteString.copyFrom(this.result.get()));
148    } else if (this.lastException.isPresent()) {
149      final byte[] error = this.exceptionCodec.toBytes(this.lastException.get());
150      result.setResult(ByteString.copyFrom(error));
151    } else if (this.state == State.RUNNING) {
152      for (final TaskMessage taskMessage : this.getMessages()) {
153        result.addTaskMessage(ReefServiceProtos.TaskStatusProto.TaskMessageProto.newBuilder()
154            .setSourceId(taskMessage.getMessageSourceID())
155            .setMessage(ByteString.copyFrom(taskMessage.get()))
156            .build());
157      }
158    }
159
160    return result.build();
161  }
162
163  private void check() {
164    if (this.result.isPresent() && this.lastException.isPresent()) {
165      throw new RuntimeException("Found both an exception and a result. This is unsupported.");
166    }
167  }
168
169  private ReefServiceProtos.State getProtoState() {
170    switch (this.state) {
171      case INIT:
172        return ReefServiceProtos.State.INIT;
173      case CLOSE_REQUESTED:
174      case SUSPEND_REQUESTED:
175      case RUNNING:
176        return ReefServiceProtos.State.RUNNING;
177      case DONE:
178        return ReefServiceProtos.State.DONE;
179      case SUSPENDED:
180        return ReefServiceProtos.State.SUSPEND;
181      case FAILED:
182        return ReefServiceProtos.State.FAILED;
183      case KILLED:
184        return ReefServiceProtos.State.KILLED;
185    }
186    throw new RuntimeException("Unknown state: " + this.state);
187  }
188
189  void setException(final Throwable throwable) {
190    synchronized (this.heartBeatManager) {
191      this.lastException = Optional.of(throwable);
192      this.state = State.FAILED;
193      this.check();
194      this.heartbeat();
195    }
196  }
197
198  void setResult(final byte[] result) {
199    synchronized (this.heartBeatManager) {
200      this.result = Optional.ofNullable(result);
201      if (this.state == State.RUNNING) {
202        this.setState(State.DONE);
203      } else if (this.state == State.SUSPEND_REQUESTED) {
204        this.setState(State.SUSPENDED);
205      } else if (this.state == State.CLOSE_REQUESTED) {
206        this.setState(State.DONE);
207      }
208      this.check();
209      this.heartbeat();
210    }
211  }
212
213  private void heartbeat() {
214    this.heartBeatManager.sendTaskStatus(this.toProto());
215  }
216
217  /**
218   * Sets the state to INIT and informs the driver about it.
219   */
220  void setInit() {
221    LOG.log(Level.FINEST, "Sending Task INIT heartbeat to the Driver.");
222    this.setState(State.INIT);
223    this.heartbeat();
224  }
225
226  /**
227   * Sets the state to RUNNING after the handlers for TaskStart have been called.
228   */
229  void setRunning() {
230    this.setState(State.RUNNING);
231  }
232
233  void setCloseRequested() {
234    this.setState(State.CLOSE_REQUESTED);
235  }
236
237  void setSuspendRequested() {
238    this.setState(State.SUSPEND_REQUESTED);
239  }
240
241  void setKilled() {
242    this.setState(State.KILLED);
243    this.heartbeat();
244  }
245
246  boolean isRunning() {
247    return this.state == State.RUNNING;
248  }
249
250  boolean isNotRunning() {
251    return this.state != State.RUNNING;
252  }
253
254  boolean hasEnded() {
255    switch (this.state) {
256      case DONE:
257      case SUSPENDED:
258      case FAILED:
259      case KILLED:
260        return true;
261      default:
262        return false;
263    }
264  }
265
266  State getState() {
267    return this.state;
268  }
269
270  private void setState(final State state) {
271    if (isLegal(this.state, state)) {
272      this.state = state;
273    } else {
274      final String msg = "Illegal state transition from [" + this.state + "] to [" + state + "]";
275      LOG.log(Level.SEVERE, msg);
276      throw new RuntimeException(msg);
277    }
278  }
279
280  String getContextId() {
281    return this.contextId;
282  }
283
284  /**
285   * @return the messages to be sent on the Task's behalf in the next heartbeat.
286   */
287  private final Collection<TaskMessage> getMessages() {
288    final List<TaskMessage> result = new ArrayList<>(this.evaluatorMessageSources.size());
289    for (final TaskMessageSource messageSource : this.evaluatorMessageSources) {
290      final Optional<TaskMessage> taskMessageOptional = messageSource.getMessage();
291      if (taskMessageOptional.isPresent()) {
292        result.add(taskMessageOptional.get());
293      }
294    }
295    return result;
296  }
297
298
299  enum State {
300    PRE_INIT,
301    INIT,
302    RUNNING,
303    CLOSE_REQUESTED,
304    SUSPEND_REQUESTED,
305    SUSPENDED,
306    FAILED,
307    DONE,
308    KILLED
309  }
310}