This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.evaluator.task;
020
021import org.apache.reef.annotations.audience.EvaluatorSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.task.TaskConfigurationOptions;
024import org.apache.reef.proto.ReefServiceProtos;
025import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
026import org.apache.reef.runtime.common.evaluator.task.exceptions.*;
027import org.apache.reef.tang.InjectionFuture;
028import org.apache.reef.tang.annotations.Parameter;
029import org.apache.reef.task.Task;
030import org.apache.reef.task.events.CloseEvent;
031import org.apache.reef.task.events.DriverMessage;
032import org.apache.reef.task.events.SuspendEvent;
033import org.apache.reef.util.Optional;
034import org.apache.reef.wake.EventHandler;
035
036import javax.inject.Inject;
037import javax.xml.bind.DatatypeConverter;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * The execution environment for a Task.
043 */
044@Private
045@EvaluatorSide
046public final class TaskRuntime implements Runnable {
047
048  private static final Logger LOG = Logger.getLogger(TaskRuntime.class.getName());
049
050  /**
051   * User supplied Task code.
052   */
053  private final Task task;
054
055  private final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler;
056  private final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler;
057  private final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler;
058  private final TaskLifeCycleHandlers taskLifeCycleHandlers;
059
060  /**
061   * The memento given by the task configuration.
062   */
063  private final Optional<byte[]> memento;
064
065  /**
066   * Heart beat manager to trigger on heartbeats.
067   */
068  private final HeartBeatManager heartBeatManager;
069
070  private final TaskStatus currentStatus;
071
072  @Inject
073  private TaskRuntime(
074      final HeartBeatManager heartBeatManager,
075      final Task task,
076      final TaskStatus currentStatus,
077      @Parameter(TaskConfigurationOptions.CloseHandler.class)
078      final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler,
079      @Parameter(TaskConfigurationOptions.SuspendHandler.class)
080      final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler,
081      @Parameter(TaskConfigurationOptions.MessageHandler.class)
082      final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler,
083      final TaskLifeCycleHandlers taskLifeCycleHandlers) {
084    this(heartBeatManager, task, currentStatus, fCloseHandler, fSuspendHandler, fMessageHandler, null,
085        taskLifeCycleHandlers);
086  }
087
088  @Inject
089  private TaskRuntime(
090      final HeartBeatManager heartBeatManager,
091      final Task task,
092      final TaskStatus currentStatus,
093      @Parameter(TaskConfigurationOptions.CloseHandler.class)
094      final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler,
095      @Parameter(TaskConfigurationOptions.SuspendHandler.class)
096      final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler,
097      @Parameter(TaskConfigurationOptions.MessageHandler.class)
098      final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler,
099      @Parameter(TaskConfigurationOptions.Memento.class) final String memento,
100      final TaskLifeCycleHandlers taskLifeCycleHandlers) {
101
102    this.heartBeatManager = heartBeatManager;
103    this.task = task;
104    this.taskLifeCycleHandlers = taskLifeCycleHandlers;
105
106    this.memento = null == memento ? Optional.<byte[]>empty() :
107        Optional.of(DatatypeConverter.parseBase64Binary(memento));
108
109    this.fCloseHandler = fCloseHandler;
110    this.fSuspendHandler = fSuspendHandler;
111    this.fMessageHandler = fMessageHandler;
112
113    this.currentStatus = currentStatus;
114  }
115
116  /**
117   * This method needs to be called before a Task can be run().
118   * It informs the Driver that the Task is initializing.
119   */
120  public void initialize() {
121    this.currentStatus.setInit();
122  }
123
124  /**
125   * Run the task: Fire TaskStart, call Task.call(), fire TaskStop.
126   */
127  @Override
128  public void run() {
129    try {
130      // Change state and inform the Driver
131      this.taskLifeCycleHandlers.beforeTaskStart();
132
133      LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStart>.");
134      this.currentStatus.setRunning();
135
136      // Call Task.call()
137      final byte[] result = this.runTask();
138
139      // Inform the Driver about it
140      this.currentStatus.setResult(result);
141
142      LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStop>.");
143      this.taskLifeCycleHandlers.afterTaskExit();
144
145    } catch (final TaskStartHandlerFailure taskStartHandlerFailure) {
146      LOG.log(Level.WARNING, "Caught an exception during TaskStart handler execution.", taskStartHandlerFailure);
147      this.currentStatus.setException(taskStartHandlerFailure.getCause());
148    } catch (final TaskStopHandlerFailure taskStopHandlerFailure) {
149      LOG.log(Level.WARNING, "Caught an exception during TaskStop handler execution.", taskStopHandlerFailure);
150      this.currentStatus.setException(taskStopHandlerFailure.getCause());
151    } catch (final TaskCallFailure e) {
152      LOG.log(Level.WARNING, "Caught an exception during Task.call().", e.getCause());
153      this.currentStatus.setException(e);
154    }
155  }
156
157  /**
158   * Called by heartbeat manager.
159   *
160   * @return current TaskStatusProto
161   */
162  public ReefServiceProtos.TaskStatusProto getStatusProto() {
163    return this.currentStatus.toProto();
164  }
165
166  /**
167   * @return true, if the Task is no longer running, either because it is crashed or exited cleanly
168   */
169  public boolean hasEnded() {
170    return this.currentStatus.hasEnded();
171  }
172
173  /**
174   * @return the ID of the task.
175   */
176  public String getTaskId() {
177    return this.currentStatus.getTaskId();
178  }
179
180  public String getId() {
181    return "TASK:" + this.task.getClass().getSimpleName() + ':' + this.currentStatus.getTaskId();
182  }
183
184  /**
185   * Close the Task. This calls the configured close handler.
186   *
187   * @param message the optional message for the close handler or null if there none.
188   */
189  public void close(final byte[] message) {
190    LOG.log(Level.FINEST, "Triggering Task close.");
191    synchronized (this.heartBeatManager) {
192      if (this.currentStatus.isNotRunning()) {
193        LOG.log(Level.WARNING, "Trying to close a task that is in state: {0}. Ignoring.",
194            this.currentStatus.getState());
195      } else {
196        try {
197          this.closeTask(message);
198          this.currentStatus.setCloseRequested();
199        } catch (final TaskCloseHandlerFailure taskCloseHandlerFailure) {
200          LOG.log(Level.WARNING, "Exception while executing task close handler.",
201              taskCloseHandlerFailure.getCause());
202          this.currentStatus.setException(taskCloseHandlerFailure.getCause());
203        }
204      }
205    }
206  }
207
208  /**
209   * Suspend the Task.  This calls the configured suspend handler.
210   *
211   * @param message the optional message for the suspend handler or null if there none.
212   */
213  public void suspend(final byte[] message) {
214    synchronized (this.heartBeatManager) {
215      if (this.currentStatus.isNotRunning()) {
216        LOG.log(Level.WARNING, "Trying to suspend a task that is in state: {0}. Ignoring.",
217            this.currentStatus.getState());
218      } else {
219        try {
220          this.suspendTask(message);
221          this.currentStatus.setSuspendRequested();
222        } catch (final TaskSuspendHandlerFailure taskSuspendHandlerFailure) {
223          LOG.log(Level.WARNING, "Exception while executing task suspend handler.",
224              taskSuspendHandlerFailure.getCause());
225          this.currentStatus.setException(taskSuspendHandlerFailure.getCause());
226        }
227      }
228    }
229  }
230
231  /**
232   * Deliver a message to the Task. This calls into the user supplied message handler.
233   *
234   * @param message the message to be delivered.
235   */
236  public void deliver(final byte[] message) {
237    synchronized (this.heartBeatManager) {
238      if (this.currentStatus.isNotRunning()) {
239        LOG.log(Level.WARNING,
240            "Trying to send a message to a task that is in state: {0}. Ignoring.",
241            this.currentStatus.getState());
242      } else {
243        try {
244          this.deliverMessageToTask(message);
245        } catch (final TaskMessageHandlerFailure taskMessageHandlerFailure) {
246          LOG.log(Level.WARNING, "Exception while executing task close handler.",
247              taskMessageHandlerFailure.getCause());
248          this.currentStatus.setException(taskMessageHandlerFailure.getCause());
249        }
250      }
251    }
252  }
253
254  /**
255   * @return the ID of the Context this task is executing in.
256   */
257  private String getContextID() {
258    return this.currentStatus.getContextId();
259  }
260
261  /**
262   * Calls the Task.call() method and catches exceptions it may throw.
263   *
264   * @return the return value of Task.call()
265   * @throws TaskCallFailure if any Throwable was caught from the Task.call() method.
266   *                         That throwable would be the cause of the TaskCallFailure.
267   */
268  @SuppressWarnings("checkstyle:illegalcatch")
269  private byte[] runTask() throws TaskCallFailure {
270    try {
271      final byte[] result;
272      if (this.memento.isPresent()) {
273        LOG.log(Level.FINEST, "Calling Task.call() with a memento");
274        result = this.task.call(this.memento.get());
275      } else {
276        LOG.log(Level.FINEST, "Calling Task.call() without a memento");
277        result = this.task.call(null);
278      }
279      LOG.log(Level.FINEST, "Task.call() exited cleanly.");
280      return result;
281    } catch (final Throwable throwable) {
282      throw new TaskCallFailure(throwable);
283    }
284  }
285
286  /**
287   * Calls the configured Task close handler and catches exceptions it may throw.
288   */
289  @SuppressWarnings("checkstyle:illegalcatch")
290  private void closeTask(final byte[] message) throws TaskCloseHandlerFailure {
291    LOG.log(Level.FINEST, "Invoking close handler.");
292    try {
293      this.fCloseHandler.get().onNext(new CloseEventImpl(message));
294    } catch (final Throwable throwable) {
295      throw new TaskCloseHandlerFailure(throwable);
296    }
297  }
298
299  /**
300   * Calls the configured Task message handler and catches exceptions it may throw.
301   */
302  @SuppressWarnings("checkstyle:illegalcatch")
303  private void deliverMessageToTask(final byte[] message) throws TaskMessageHandlerFailure {
304    try {
305      this.fMessageHandler.get().onNext(new DriverMessageImpl(message));
306    } catch (final Throwable throwable) {
307      throw new TaskMessageHandlerFailure(throwable);
308    }
309  }
310
311  /**
312   * Calls the configured Task suspend handler and catches exceptions it may throw.
313   */
314  @SuppressWarnings("checkstyle:illegalcatch")
315  private void suspendTask(final byte[] message) throws TaskSuspendHandlerFailure {
316    try {
317      this.fSuspendHandler.get().onNext(new SuspendEventImpl(message));
318    } catch (final Throwable throwable) {
319      throw new TaskSuspendHandlerFailure(throwable);
320    }
321  }
322}