This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.evaluator.task;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.driver.task.TaskConfigurationOptions;
023import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
024import org.apache.reef.proto.ReefServiceProtos;
025import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
026import org.apache.reef.runtime.common.utils.ExceptionCodec;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.task.TaskMessage;
029import org.apache.reef.task.TaskMessageSource;
030import org.apache.reef.util.Optional;
031
032import javax.inject.Inject;
033import java.util.ArrayList;
034import java.util.Collection;
035import java.util.List;
036import java.util.Set;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Represents the various states a Task could be in.
042 */
043public final class TaskStatus {
044  private static final Logger LOG = Logger.getLogger(TaskStatus.class.getName());
045
046  private final String taskId;
047  private final String contextId;
048  private final HeartBeatManager heartBeatManager;
049  private final Set<TaskMessageSource> evaluatorMessageSources;
050  private final ExceptionCodec exceptionCodec;
051  private Optional<Throwable> lastException = Optional.empty();
052  private Optional<byte[]> result = Optional.empty();
053  private State state = State.PRE_INIT;
054
055
056  @Inject
057  TaskStatus(@Parameter(TaskConfigurationOptions.Identifier.class) final String taskId,
058             @Parameter(ContextIdentifier.class) final String contextId,
059             @Parameter(TaskConfigurationOptions.TaskMessageSources.class)
060             final Set<TaskMessageSource> evaluatorMessageSources,
061             final HeartBeatManager heartBeatManager,
062             final ExceptionCodec exceptionCodec) {
063    this.taskId = taskId;
064    this.contextId = contextId;
065    this.heartBeatManager = heartBeatManager;
066    this.evaluatorMessageSources = evaluatorMessageSources;
067    this.exceptionCodec = exceptionCodec;
068  }
069
070  /**
071   * @param from
072   * @param to
073   * @return true, if the state transition from state 'from' to state 'to' is legal.
074   */
075  private static boolean isLegal(final State from, final State to) {
076    if (from == null) {
077      return to == State.INIT;
078    }
079    switch (from) {
080    case PRE_INIT:
081      switch (to) {
082      case INIT:
083        return true;
084      default:
085        return false;
086      }
087    case INIT:
088      switch (to) {
089      case RUNNING:
090      case FAILED:
091      case KILLED:
092      case DONE:
093        return true;
094      default:
095        return false;
096      }
097    case RUNNING:
098      switch (to) {
099      case CLOSE_REQUESTED:
100      case SUSPEND_REQUESTED:
101      case FAILED:
102      case KILLED:
103      case DONE:
104        return true;
105      default:
106        return false;
107      }
108    case CLOSE_REQUESTED:
109      switch (to) {
110      case FAILED:
111      case KILLED:
112      case DONE:
113        return true;
114      default:
115        return false;
116      }
117    case SUSPEND_REQUESTED:
118      switch (to) {
119      case FAILED:
120      case KILLED:
121      case SUSPENDED:
122        return true;
123      default:
124        return false;
125      }
126
127    case FAILED:
128    case DONE:
129    case KILLED:
130      return false;
131    default:
132      return false;
133    }
134  }
135
136  public String getTaskId() {
137    return this.taskId;
138  }
139
140  ReefServiceProtos.TaskStatusProto toProto() {
141    this.check();
142    final ReefServiceProtos.TaskStatusProto.Builder resultBuilder = ReefServiceProtos.TaskStatusProto.newBuilder()
143        .setContextId(this.contextId)
144        .setTaskId(this.taskId)
145        .setState(this.getProtoState());
146
147    if (this.result.isPresent()) {
148      resultBuilder.setResult(ByteString.copyFrom(this.result.get()));
149    } else if (this.lastException.isPresent()) {
150      final byte[] error = this.exceptionCodec.toBytes(this.lastException.get());
151      resultBuilder.setResult(ByteString.copyFrom(error));
152    } else if (this.state == State.RUNNING) {
153      for (final TaskMessage taskMessage : this.getMessages()) {
154        resultBuilder.addTaskMessage(ReefServiceProtos.TaskStatusProto.TaskMessageProto.newBuilder()
155            .setSourceId(taskMessage.getMessageSourceID())
156            .setMessage(ByteString.copyFrom(taskMessage.get()))
157            .build());
158      }
159    }
160
161    return resultBuilder.build();
162  }
163
164  private void check() {
165    if (this.result.isPresent() && this.lastException.isPresent()) {
166      throw new RuntimeException("Found both an exception and a result. This is unsupported.");
167    }
168  }
169
170  private ReefServiceProtos.State getProtoState() {
171    switch (this.state) {
172    case INIT:
173      return ReefServiceProtos.State.INIT;
174    case CLOSE_REQUESTED:
175    case SUSPEND_REQUESTED:
176    case RUNNING:
177      return ReefServiceProtos.State.RUNNING;
178    case DONE:
179      return ReefServiceProtos.State.DONE;
180    case SUSPENDED:
181      return ReefServiceProtos.State.SUSPEND;
182    case FAILED:
183      return ReefServiceProtos.State.FAILED;
184    case KILLED:
185      return ReefServiceProtos.State.KILLED;
186    default:
187      throw new RuntimeException("Unknown state: " + this.state);
188    }
189  }
190
191  void setException(final Throwable throwable) {
192    synchronized (this.heartBeatManager) {
193      this.lastException = Optional.of(throwable);
194      this.state = State.FAILED;
195      this.check();
196      this.heartbeat();
197    }
198  }
199
200  void setResult(final byte[] result) {
201    synchronized (this.heartBeatManager) {
202      this.result = Optional.ofNullable(result);
203      if (this.state == State.RUNNING) {
204        this.setState(State.DONE);
205      } else if (this.state == State.SUSPEND_REQUESTED) {
206        this.setState(State.SUSPENDED);
207      } else if (this.state == State.CLOSE_REQUESTED) {
208        this.setState(State.DONE);
209      }
210      this.check();
211      this.heartbeat();
212    }
213  }
214
215  private void heartbeat() {
216    this.heartBeatManager.sendTaskStatus(this.toProto());
217  }
218
219  /**
220   * Sets the state to INIT and informs the driver about it.
221   */
222  void setInit() {
223    LOG.log(Level.FINEST, "Sending Task INIT heartbeat to the Driver.");
224    this.setState(State.INIT);
225    this.heartbeat();
226  }
227
228  /**
229   * Sets the state to RUNNING after the handlers for TaskStart have been called.
230   */
231  void setRunning() {
232    this.setState(State.RUNNING);
233    this.heartbeat();
234  }
235
236  void setCloseRequested() {
237    this.setState(State.CLOSE_REQUESTED);
238  }
239
240  void setSuspendRequested() {
241    this.setState(State.SUSPEND_REQUESTED);
242  }
243
244  void setKilled() {
245    this.setState(State.KILLED);
246    this.heartbeat();
247  }
248
249  boolean isRunning() {
250    return this.state == State.RUNNING;
251  }
252
253  boolean isNotRunning() {
254    return this.state != State.RUNNING;
255  }
256
257  boolean hasEnded() {
258    switch (this.state) {
259    case DONE:
260    case SUSPENDED:
261    case FAILED:
262    case KILLED:
263      return true;
264    default:
265      return false;
266    }
267  }
268
269  State getState() {
270    return this.state;
271  }
272
273  private void setState(final State state) {
274    if (isLegal(this.state, state)) {
275      this.state = state;
276    } else {
277      final String msg = "Illegal state transition from [" + this.state + "] to [" + state + "]";
278      LOG.log(Level.SEVERE, msg);
279      throw new RuntimeException(msg);
280    }
281  }
282
283  String getContextId() {
284    return this.contextId;
285  }
286
287  /**
288   * @return the messages to be sent on the Task's behalf in the next heartbeat.
289   */
290  private Collection<TaskMessage> getMessages() {
291    final List<TaskMessage> messageList = new ArrayList<>(this.evaluatorMessageSources.size());
292    for (final TaskMessageSource messageSource : this.evaluatorMessageSources) {
293      final Optional<TaskMessage> taskMessageOptional = messageSource.getMessage();
294      if (taskMessageOptional.isPresent()) {
295        messageList.add(taskMessageOptional.get());
296      }
297    }
298    return messageList;
299  }
300
301
302  enum State {
303    PRE_INIT,
304    INIT,
305    RUNNING,
306    CLOSE_REQUESTED,
307    SUSPEND_REQUESTED,
308    SUSPENDED,
309    FAILED,
310    DONE,
311    KILLED
312  }
313}