This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.evaluator.task;
020
021import org.apache.reef.annotations.audience.EvaluatorSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.task.TaskConfigurationOptions;
024import org.apache.reef.proto.ReefServiceProtos;
025import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
026import org.apache.reef.runtime.common.evaluator.task.exceptions.*;
027import org.apache.reef.tang.InjectionFuture;
028import org.apache.reef.tang.annotations.Parameter;
029import org.apache.reef.task.Task;
030import org.apache.reef.task.events.CloseEvent;
031import org.apache.reef.task.events.DriverMessage;
032import org.apache.reef.task.events.SuspendEvent;
033import org.apache.reef.util.Optional;
034import org.apache.reef.wake.EventHandler;
035
036import javax.inject.Inject;
037import javax.xml.bind.DatatypeConverter;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * The execution environment for a Task.
043 */
044@Private
045@EvaluatorSide
046public final class TaskRuntime implements Runnable {
047
048  private final static Logger LOG = Logger.getLogger(TaskRuntime.class.getName());
049
050  /**
051   * User supplied Task code.
052   */
053  private final Task task;
054
055  private final InjectionFuture<EventHandler<CloseEvent>> f_closeHandler;
056  private final InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler;
057  private final InjectionFuture<EventHandler<DriverMessage>> f_messageHandler;
058  private final TaskLifeCycleHandlers taskLifeCycleHandlers;
059
060  /**
061   * The memento given by the task configuration.
062   */
063  private final Optional<byte[]> memento;
064
065  /**
066   * Heart beat manager to trigger on heartbeats.
067   */
068  private final HeartBeatManager heartBeatManager;
069
070  private final TaskStatus currentStatus;
071
072  // TODO: Document
073  @Inject
074  private TaskRuntime(
075      final HeartBeatManager heartBeatManager,
076      final Task task,
077      final TaskStatus currentStatus,
078      final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler,
079      final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler,
080      final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler,
081      final TaskLifeCycleHandlers taskLifeCycleHandlers) {
082    this(heartBeatManager, task, currentStatus, f_closeHandler, f_suspendHandler, f_messageHandler, null, taskLifeCycleHandlers);
083  }
084
085  // TODO: Document
086  @Inject
087  private TaskRuntime(
088      final HeartBeatManager heartBeatManager,
089      final Task task,
090      final TaskStatus currentStatus,
091      final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler,
092      final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler,
093      final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler,
094      final @Parameter(TaskConfigurationOptions.Memento.class) String memento,
095      final TaskLifeCycleHandlers taskLifeCycleHandlers) {
096
097    this.heartBeatManager = heartBeatManager;
098    this.task = task;
099    this.taskLifeCycleHandlers = taskLifeCycleHandlers;
100
101    this.memento = null == memento ? Optional.<byte[]>empty() :
102        Optional.of(DatatypeConverter.parseBase64Binary(memento));
103
104    this.f_closeHandler = f_closeHandler;
105    this.f_suspendHandler = f_suspendHandler;
106    this.f_messageHandler = f_messageHandler;
107
108    this.currentStatus = currentStatus;
109  }
110
111  /**
112   * This method needs to be called before a Task can be run().
113   * It informs the Driver that the Task is initializing.
114   */
115  public void initialize() {
116    this.currentStatus.setInit();
117  }
118
119  /**
120   * Run the task: Fire TaskStart, call Task.call(), fire TaskStop.
121   */
122  @Override
123  public void run() {
124    try {
125      // Change state and inform the Driver
126      this.taskLifeCycleHandlers.beforeTaskStart();
127
128      LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStart>.");
129      this.currentStatus.setRunning();
130
131      // Call Task.call()
132      final byte[] result = this.runTask();
133
134      // Inform the Driver about it
135      this.currentStatus.setResult(result);
136
137      LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStop>.");
138      this.taskLifeCycleHandlers.afterTaskExit();
139
140    } catch (final TaskStartHandlerFailure taskStartHandlerFailure) {
141      LOG.log(Level.WARNING, "Caught an exception during TaskStart handler execution.", taskStartHandlerFailure);
142      this.currentStatus.setException(taskStartHandlerFailure.getCause());
143    } catch (final TaskStopHandlerFailure taskStopHandlerFailure) {
144      LOG.log(Level.WARNING, "Caught an exception during TaskStop handler execution.", taskStopHandlerFailure);
145      this.currentStatus.setException(taskStopHandlerFailure.getCause());
146    } catch (final TaskCallFailure e) {
147      LOG.log(Level.WARNING, "Caught an exception during Task.call().", e.getCause());
148      this.currentStatus.setException(e);
149    }
150  }
151
152  /**
153   * Called by heartbeat manager
154   *
155   * @return current TaskStatusProto
156   */
157  public ReefServiceProtos.TaskStatusProto getStatusProto() {
158    return this.currentStatus.toProto();
159  }
160
161  /**
162   * @return true, if the Task is no longer running, either because it is crashed or exited cleanly
163   */
164  public boolean hasEnded() {
165    return this.currentStatus.hasEnded();
166  }
167
168  /**
169   * @return the ID of the task.
170   */
171  public String getTaskId() {
172    return this.currentStatus.getTaskId();
173  }
174
175  public String getId() {
176    return "TASK:" + this.task.getClass().getSimpleName() + ':' + this.currentStatus.getTaskId();
177  }
178
179  /**
180   * Close the Task. This calls the configured close handler.
181   *
182   * @param message the optional message for the close handler or null if there none.
183   */
184  public final void close(final byte[] message) {
185    LOG.log(Level.FINEST, "Triggering Task close.");
186    synchronized (this.heartBeatManager) {
187      if (this.currentStatus.isNotRunning()) {
188        LOG.log(Level.WARNING, "Trying to close a task that is in state: {0}. Ignoring.",
189            this.currentStatus.getState());
190      } else {
191        try {
192          this.closeTask(message);
193          this.currentStatus.setCloseRequested();
194        } catch (final TaskCloseHandlerFailure taskCloseHandlerFailure) {
195          LOG.log(Level.WARNING, "Exception while executing task close handler.",
196              taskCloseHandlerFailure.getCause());
197          this.currentStatus.setException(taskCloseHandlerFailure.getCause());
198        }
199      }
200    }
201  }
202
203  /**
204   * Suspend the Task.  This calls the configured suspend handler.
205   *
206   * @param message the optional message for the suspend handler or null if there none.
207   */
208  public void suspend(final byte[] message) {
209    synchronized (this.heartBeatManager) {
210      if (this.currentStatus.isNotRunning()) {
211        LOG.log(Level.WARNING, "Trying to suspend a task that is in state: {0}. Ignoring.",
212            this.currentStatus.getState());
213      } else {
214        try {
215          this.suspendTask(message);
216          this.currentStatus.setSuspendRequested();
217        } catch (final TaskSuspendHandlerFailure taskSuspendHandlerFailure) {
218          LOG.log(Level.WARNING, "Exception while executing task suspend handler.",
219              taskSuspendHandlerFailure.getCause());
220          this.currentStatus.setException(taskSuspendHandlerFailure.getCause());
221        }
222      }
223    }
224  }
225
226  /**
227   * Deliver a message to the Task. This calls into the user supplied message handler.
228   *
229   * @param message the message to be delivered.
230   */
231  public void deliver(final byte[] message) {
232    synchronized (this.heartBeatManager) {
233      if (this.currentStatus.isNotRunning()) {
234        LOG.log(Level.WARNING,
235            "Trying to send a message to a task that is in state: {0}. Ignoring.",
236            this.currentStatus.getState());
237      } else {
238        try {
239          this.deliverMessageToTask(message);
240        } catch (final TaskMessageHandlerFailure taskMessageHandlerFailure) {
241          LOG.log(Level.WARNING, "Exception while executing task close handler.",
242              taskMessageHandlerFailure.getCause());
243          this.currentStatus.setException(taskMessageHandlerFailure.getCause());
244        }
245      }
246    }
247  }
248
249  /**
250   * @return the ID of the Context this task is executing in.
251   */
252  private String getContextID() {
253    return this.currentStatus.getContextId();
254  }
255
256  /**
257   * Calls the Task.call() method and catches exceptions it may throw.
258   *
259   * @return the return value of Task.call()
260   * @throws TaskCallFailure if any Throwable was caught from the Task.call() method.
261   *                         That throwable would be the cause of the TaskCallFailure.
262   */
263  private byte[] runTask() throws TaskCallFailure {
264    try {
265      final byte[] result;
266      if (this.memento.isPresent()) {
267        LOG.log(Level.FINEST, "Calling Task.call() with a memento");
268        result = this.task.call(this.memento.get());
269      } else {
270        LOG.log(Level.FINEST, "Calling Task.call() without a memento");
271        result = this.task.call(null);
272      }
273      LOG.log(Level.FINEST, "Task.call() exited cleanly.");
274      return result;
275    } catch (final Throwable throwable) {
276      throw new TaskCallFailure(throwable);
277    }
278  }
279
280  /**
281   * Calls the configured Task close handler and catches exceptions it may throw.
282   */
283  private void closeTask(final byte[] message) throws TaskCloseHandlerFailure {
284    LOG.log(Level.FINEST, "Invoking close handler.");
285    try {
286      this.f_closeHandler.get().onNext(new CloseEventImpl(message));
287    } catch (final Throwable throwable) {
288      throw new TaskCloseHandlerFailure(throwable);
289    }
290  }
291
292  /**
293   * Calls the configured Task message handler and catches exceptions it may throw.
294   */
295  private void deliverMessageToTask(final byte[] message) throws TaskMessageHandlerFailure {
296    try {
297      this.f_messageHandler.get().onNext(new DriverMessageImpl(message));
298    } catch (final Throwable throwable) {
299      throw new TaskMessageHandlerFailure(throwable);
300    }
301  }
302
303  /**
304   * Calls the configured Task suspend handler and catches exceptions it may throw.
305   */
306  private void suspendTask(final byte[] message) throws TaskSuspendHandlerFailure {
307    try {
308      this.f_suspendHandler.get().onNext(new SuspendEventImpl(message));
309    } catch (final Throwable throwable) {
310      throw new TaskSuspendHandlerFailure(throwable);
311    }
312  }
313}