001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator.task; 020 021import org.apache.reef.annotations.audience.EvaluatorSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.task.TaskConfigurationOptions; 024import org.apache.reef.proto.ReefServiceProtos; 025import org.apache.reef.runtime.common.evaluator.HeartBeatManager; 026import org.apache.reef.runtime.common.evaluator.task.exceptions.*; 027import org.apache.reef.tang.InjectionFuture; 028import org.apache.reef.tang.annotations.Parameter; 029import org.apache.reef.task.Task; 030import org.apache.reef.task.events.CloseEvent; 031import org.apache.reef.task.events.DriverMessage; 032import org.apache.reef.task.events.SuspendEvent; 033import org.apache.reef.util.Optional; 034import org.apache.reef.wake.EventHandler; 035 036import javax.inject.Inject; 037import javax.xml.bind.DatatypeConverter; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * The execution environment for a Task. 043 */ 044@Private 045@EvaluatorSide 046public final class TaskRuntime implements Runnable { 047 048 private final static Logger LOG = Logger.getLogger(TaskRuntime.class.getName()); 049 050 /** 051 * User supplied Task code. 052 */ 053 private final Task task; 054 055 private final InjectionFuture<EventHandler<CloseEvent>> f_closeHandler; 056 private final InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler; 057 private final InjectionFuture<EventHandler<DriverMessage>> f_messageHandler; 058 private final TaskLifeCycleHandlers taskLifeCycleHandlers; 059 060 /** 061 * The memento given by the task configuration. 062 */ 063 private final Optional<byte[]> memento; 064 065 /** 066 * Heart beat manager to trigger on heartbeats. 067 */ 068 private final HeartBeatManager heartBeatManager; 069 070 private final TaskStatus currentStatus; 071 072 // TODO: Document 073 @Inject 074 private TaskRuntime( 075 final HeartBeatManager heartBeatManager, 076 final Task task, 077 final TaskStatus currentStatus, 078 final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler, 079 final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler, 080 final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler, 081 final TaskLifeCycleHandlers taskLifeCycleHandlers) { 082 this(heartBeatManager, task, currentStatus, f_closeHandler, f_suspendHandler, f_messageHandler, null, taskLifeCycleHandlers); 083 } 084 085 // TODO: Document 086 @Inject 087 private TaskRuntime( 088 final HeartBeatManager heartBeatManager, 089 final Task task, 090 final TaskStatus currentStatus, 091 final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler, 092 final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler, 093 final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler, 094 final @Parameter(TaskConfigurationOptions.Memento.class) String memento, 095 final TaskLifeCycleHandlers taskLifeCycleHandlers) { 096 097 this.heartBeatManager = heartBeatManager; 098 this.task = task; 099 this.taskLifeCycleHandlers = taskLifeCycleHandlers; 100 101 this.memento = null == memento ? Optional.<byte[]>empty() : 102 Optional.of(DatatypeConverter.parseBase64Binary(memento)); 103 104 this.f_closeHandler = f_closeHandler; 105 this.f_suspendHandler = f_suspendHandler; 106 this.f_messageHandler = f_messageHandler; 107 108 this.currentStatus = currentStatus; 109 } 110 111 /** 112 * This method needs to be called before a Task can be run(). 113 * It informs the Driver that the Task is initializing. 114 */ 115 public void initialize() { 116 this.currentStatus.setInit(); 117 } 118 119 /** 120 * Run the task: Fire TaskStart, call Task.call(), fire TaskStop. 121 */ 122 @Override 123 public void run() { 124 try { 125 // Change state and inform the Driver 126 this.taskLifeCycleHandlers.beforeTaskStart(); 127 128 LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStart>."); 129 this.currentStatus.setRunning(); 130 131 // Call Task.call() 132 final byte[] result = this.runTask(); 133 134 // Inform the Driver about it 135 this.currentStatus.setResult(result); 136 137 LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStop>."); 138 this.taskLifeCycleHandlers.afterTaskExit(); 139 140 } catch (final TaskStartHandlerFailure taskStartHandlerFailure) { 141 LOG.log(Level.WARNING, "Caught an exception during TaskStart handler execution.", taskStartHandlerFailure); 142 this.currentStatus.setException(taskStartHandlerFailure.getCause()); 143 } catch (final TaskStopHandlerFailure taskStopHandlerFailure) { 144 LOG.log(Level.WARNING, "Caught an exception during TaskStop handler execution.", taskStopHandlerFailure); 145 this.currentStatus.setException(taskStopHandlerFailure.getCause()); 146 } catch (final TaskCallFailure e) { 147 LOG.log(Level.WARNING, "Caught an exception during Task.call().", e.getCause()); 148 this.currentStatus.setException(e); 149 } 150 } 151 152 /** 153 * Called by heartbeat manager 154 * 155 * @return current TaskStatusProto 156 */ 157 public ReefServiceProtos.TaskStatusProto getStatusProto() { 158 return this.currentStatus.toProto(); 159 } 160 161 /** 162 * @return true, if the Task is no longer running, either because it is crashed or exited cleanly 163 */ 164 public boolean hasEnded() { 165 return this.currentStatus.hasEnded(); 166 } 167 168 /** 169 * @return the ID of the task. 170 */ 171 public String getTaskId() { 172 return this.currentStatus.getTaskId(); 173 } 174 175 public String getId() { 176 return "TASK:" + this.task.getClass().getSimpleName() + ':' + this.currentStatus.getTaskId(); 177 } 178 179 /** 180 * Close the Task. This calls the configured close handler. 181 * 182 * @param message the optional message for the close handler or null if there none. 183 */ 184 public final void close(final byte[] message) { 185 LOG.log(Level.FINEST, "Triggering Task close."); 186 synchronized (this.heartBeatManager) { 187 if (this.currentStatus.isNotRunning()) { 188 LOG.log(Level.WARNING, "Trying to close a task that is in state: {0}. Ignoring.", 189 this.currentStatus.getState()); 190 } else { 191 try { 192 this.closeTask(message); 193 this.currentStatus.setCloseRequested(); 194 } catch (final TaskCloseHandlerFailure taskCloseHandlerFailure) { 195 LOG.log(Level.WARNING, "Exception while executing task close handler.", 196 taskCloseHandlerFailure.getCause()); 197 this.currentStatus.setException(taskCloseHandlerFailure.getCause()); 198 } 199 } 200 } 201 } 202 203 /** 204 * Suspend the Task. This calls the configured suspend handler. 205 * 206 * @param message the optional message for the suspend handler or null if there none. 207 */ 208 public void suspend(final byte[] message) { 209 synchronized (this.heartBeatManager) { 210 if (this.currentStatus.isNotRunning()) { 211 LOG.log(Level.WARNING, "Trying to suspend a task that is in state: {0}. Ignoring.", 212 this.currentStatus.getState()); 213 } else { 214 try { 215 this.suspendTask(message); 216 this.currentStatus.setSuspendRequested(); 217 } catch (final TaskSuspendHandlerFailure taskSuspendHandlerFailure) { 218 LOG.log(Level.WARNING, "Exception while executing task suspend handler.", 219 taskSuspendHandlerFailure.getCause()); 220 this.currentStatus.setException(taskSuspendHandlerFailure.getCause()); 221 } 222 } 223 } 224 } 225 226 /** 227 * Deliver a message to the Task. This calls into the user supplied message handler. 228 * 229 * @param message the message to be delivered. 230 */ 231 public void deliver(final byte[] message) { 232 synchronized (this.heartBeatManager) { 233 if (this.currentStatus.isNotRunning()) { 234 LOG.log(Level.WARNING, 235 "Trying to send a message to a task that is in state: {0}. Ignoring.", 236 this.currentStatus.getState()); 237 } else { 238 try { 239 this.deliverMessageToTask(message); 240 } catch (final TaskMessageHandlerFailure taskMessageHandlerFailure) { 241 LOG.log(Level.WARNING, "Exception while executing task close handler.", 242 taskMessageHandlerFailure.getCause()); 243 this.currentStatus.setException(taskMessageHandlerFailure.getCause()); 244 } 245 } 246 } 247 } 248 249 /** 250 * @return the ID of the Context this task is executing in. 251 */ 252 private String getContextID() { 253 return this.currentStatus.getContextId(); 254 } 255 256 /** 257 * Calls the Task.call() method and catches exceptions it may throw. 258 * 259 * @return the return value of Task.call() 260 * @throws TaskCallFailure if any Throwable was caught from the Task.call() method. 261 * That throwable would be the cause of the TaskCallFailure. 262 */ 263 private byte[] runTask() throws TaskCallFailure { 264 try { 265 final byte[] result; 266 if (this.memento.isPresent()) { 267 LOG.log(Level.FINEST, "Calling Task.call() with a memento"); 268 result = this.task.call(this.memento.get()); 269 } else { 270 LOG.log(Level.FINEST, "Calling Task.call() without a memento"); 271 result = this.task.call(null); 272 } 273 LOG.log(Level.FINEST, "Task.call() exited cleanly."); 274 return result; 275 } catch (final Throwable throwable) { 276 throw new TaskCallFailure(throwable); 277 } 278 } 279 280 /** 281 * Calls the configured Task close handler and catches exceptions it may throw. 282 */ 283 private void closeTask(final byte[] message) throws TaskCloseHandlerFailure { 284 LOG.log(Level.FINEST, "Invoking close handler."); 285 try { 286 this.f_closeHandler.get().onNext(new CloseEventImpl(message)); 287 } catch (final Throwable throwable) { 288 throw new TaskCloseHandlerFailure(throwable); 289 } 290 } 291 292 /** 293 * Calls the configured Task message handler and catches exceptions it may throw. 294 */ 295 private void deliverMessageToTask(final byte[] message) throws TaskMessageHandlerFailure { 296 try { 297 this.f_messageHandler.get().onNext(new DriverMessageImpl(message)); 298 } catch (final Throwable throwable) { 299 throw new TaskMessageHandlerFailure(throwable); 300 } 301 } 302 303 /** 304 * Calls the configured Task suspend handler and catches exceptions it may throw. 305 */ 306 private void suspendTask(final byte[] message) throws TaskSuspendHandlerFailure { 307 try { 308 this.f_suspendHandler.get().onNext(new SuspendEventImpl(message)); 309 } catch (final Throwable throwable) { 310 throw new TaskSuspendHandlerFailure(throwable); 311 } 312 } 313}