001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator.task; 020 021import org.apache.reef.annotations.audience.EvaluatorSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.task.TaskConfigurationOptions; 024import org.apache.reef.proto.ReefServiceProtos; 025import org.apache.reef.runtime.common.evaluator.HeartBeatManager; 026import org.apache.reef.runtime.common.evaluator.task.exceptions.*; 027import org.apache.reef.tang.InjectionFuture; 028import org.apache.reef.tang.annotations.Parameter; 029import org.apache.reef.task.Task; 030import org.apache.reef.task.events.CloseEvent; 031import org.apache.reef.task.events.DriverMessage; 032import org.apache.reef.task.events.SuspendEvent; 033import org.apache.reef.util.Optional; 034import org.apache.reef.wake.EventHandler; 035 036import javax.inject.Inject; 037import javax.xml.bind.DatatypeConverter; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * The execution environment for a Task. 043 */ 044@Private 045@EvaluatorSide 046public final class TaskRuntime implements Runnable { 047 048 private static final Logger LOG = Logger.getLogger(TaskRuntime.class.getName()); 049 050 /** 051 * User supplied Task code. 052 */ 053 private final Task task; 054 055 private final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler; 056 private final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler; 057 private final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler; 058 private final TaskLifeCycleHandlers taskLifeCycleHandlers; 059 060 /** 061 * The memento given by the task configuration. 062 */ 063 private final Optional<byte[]> memento; 064 065 /** 066 * Heart beat manager to trigger on heartbeats. 067 */ 068 private final HeartBeatManager heartBeatManager; 069 070 private final TaskStatus currentStatus; 071 072 @Inject 073 private TaskRuntime( 074 final HeartBeatManager heartBeatManager, 075 final Task task, 076 final TaskStatus currentStatus, 077 @Parameter(TaskConfigurationOptions.CloseHandler.class) 078 final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler, 079 @Parameter(TaskConfigurationOptions.SuspendHandler.class) 080 final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler, 081 @Parameter(TaskConfigurationOptions.MessageHandler.class) 082 final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler, 083 final TaskLifeCycleHandlers taskLifeCycleHandlers) { 084 this(heartBeatManager, task, currentStatus, fCloseHandler, fSuspendHandler, fMessageHandler, null, 085 taskLifeCycleHandlers); 086 } 087 088 @Inject 089 private TaskRuntime( 090 final HeartBeatManager heartBeatManager, 091 final Task task, 092 final TaskStatus currentStatus, 093 @Parameter(TaskConfigurationOptions.CloseHandler.class) 094 final InjectionFuture<EventHandler<CloseEvent>> fCloseHandler, 095 @Parameter(TaskConfigurationOptions.SuspendHandler.class) 096 final InjectionFuture<EventHandler<SuspendEvent>> fSuspendHandler, 097 @Parameter(TaskConfigurationOptions.MessageHandler.class) 098 final InjectionFuture<EventHandler<DriverMessage>> fMessageHandler, 099 @Parameter(TaskConfigurationOptions.Memento.class) final String memento, 100 final TaskLifeCycleHandlers taskLifeCycleHandlers) { 101 102 this.heartBeatManager = heartBeatManager; 103 this.task = task; 104 this.taskLifeCycleHandlers = taskLifeCycleHandlers; 105 106 this.memento = null == memento ? Optional.<byte[]>empty() : 107 Optional.of(DatatypeConverter.parseBase64Binary(memento)); 108 109 this.fCloseHandler = fCloseHandler; 110 this.fSuspendHandler = fSuspendHandler; 111 this.fMessageHandler = fMessageHandler; 112 113 this.currentStatus = currentStatus; 114 } 115 116 /** 117 * This method needs to be called before a Task can be run(). 118 * It informs the Driver that the Task is initializing. 119 */ 120 public void initialize() { 121 this.currentStatus.setInit(); 122 } 123 124 /** 125 * Run the task: Fire TaskStart, call Task.call(), fire TaskStop. 126 */ 127 @Override 128 public void run() { 129 try { 130 // Change state and inform the Driver 131 this.taskLifeCycleHandlers.beforeTaskStart(); 132 133 LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStart>."); 134 this.currentStatus.setRunning(); 135 136 // Call Task.call() 137 final byte[] result = this.runTask(); 138 139 // Inform the Driver about it 140 this.currentStatus.setResult(result); 141 142 LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStop>."); 143 this.taskLifeCycleHandlers.afterTaskExit(); 144 145 } catch (final TaskStartHandlerFailure taskStartHandlerFailure) { 146 LOG.log(Level.WARNING, "Caught an exception during TaskStart handler execution.", taskStartHandlerFailure); 147 this.currentStatus.setException(taskStartHandlerFailure.getCause()); 148 } catch (final TaskStopHandlerFailure taskStopHandlerFailure) { 149 LOG.log(Level.WARNING, "Caught an exception during TaskStop handler execution.", taskStopHandlerFailure); 150 this.currentStatus.setException(taskStopHandlerFailure.getCause()); 151 } catch (final TaskCallFailure e) { 152 LOG.log(Level.WARNING, "Caught an exception during Task.call().", e.getCause()); 153 this.currentStatus.setException(e); 154 } 155 } 156 157 /** 158 * Called by heartbeat manager. 159 * 160 * @return current TaskStatusProto 161 */ 162 public ReefServiceProtos.TaskStatusProto getStatusProto() { 163 return this.currentStatus.toProto(); 164 } 165 166 /** 167 * @return true, if the Task is no longer running, either because it is crashed or exited cleanly 168 */ 169 public boolean hasEnded() { 170 return this.currentStatus.hasEnded(); 171 } 172 173 /** 174 * @return the ID of the task. 175 */ 176 public String getTaskId() { 177 return this.currentStatus.getTaskId(); 178 } 179 180 public String getId() { 181 return "TASK:" + this.task.getClass().getSimpleName() + ':' + this.currentStatus.getTaskId(); 182 } 183 184 /** 185 * Close the Task. This calls the configured close handler. 186 * 187 * @param message the optional message for the close handler or null if there none. 188 */ 189 public void close(final byte[] message) { 190 LOG.log(Level.FINEST, "Triggering Task close."); 191 synchronized (this.heartBeatManager) { 192 if (this.currentStatus.isNotRunning()) { 193 LOG.log(Level.WARNING, "Trying to close a task that is in state: {0}. Ignoring.", 194 this.currentStatus.getState()); 195 } else { 196 try { 197 this.closeTask(message); 198 this.currentStatus.setCloseRequested(); 199 } catch (final TaskCloseHandlerFailure taskCloseHandlerFailure) { 200 LOG.log(Level.WARNING, "Exception while executing task close handler.", 201 taskCloseHandlerFailure.getCause()); 202 this.currentStatus.setException(taskCloseHandlerFailure.getCause()); 203 } 204 } 205 } 206 } 207 208 /** 209 * Suspend the Task. This calls the configured suspend handler. 210 * 211 * @param message the optional message for the suspend handler or null if there none. 212 */ 213 public void suspend(final byte[] message) { 214 synchronized (this.heartBeatManager) { 215 if (this.currentStatus.isNotRunning()) { 216 LOG.log(Level.WARNING, "Trying to suspend a task that is in state: {0}. Ignoring.", 217 this.currentStatus.getState()); 218 } else { 219 try { 220 this.suspendTask(message); 221 this.currentStatus.setSuspendRequested(); 222 } catch (final TaskSuspendHandlerFailure taskSuspendHandlerFailure) { 223 LOG.log(Level.WARNING, "Exception while executing task suspend handler.", 224 taskSuspendHandlerFailure.getCause()); 225 this.currentStatus.setException(taskSuspendHandlerFailure.getCause()); 226 } 227 } 228 } 229 } 230 231 /** 232 * Deliver a message to the Task. This calls into the user supplied message handler. 233 * 234 * @param message the message to be delivered. 235 */ 236 public void deliver(final byte[] message) { 237 synchronized (this.heartBeatManager) { 238 if (this.currentStatus.isNotRunning()) { 239 LOG.log(Level.WARNING, 240 "Trying to send a message to a task that is in state: {0}. Ignoring.", 241 this.currentStatus.getState()); 242 } else { 243 try { 244 this.deliverMessageToTask(message); 245 } catch (final TaskMessageHandlerFailure taskMessageHandlerFailure) { 246 LOG.log(Level.WARNING, "Exception while executing task close handler.", 247 taskMessageHandlerFailure.getCause()); 248 this.currentStatus.setException(taskMessageHandlerFailure.getCause()); 249 } 250 } 251 } 252 } 253 254 /** 255 * @return the ID of the Context this task is executing in. 256 */ 257 private String getContextID() { 258 return this.currentStatus.getContextId(); 259 } 260 261 /** 262 * Calls the Task.call() method and catches exceptions it may throw. 263 * 264 * @return the return value of Task.call() 265 * @throws TaskCallFailure if any Throwable was caught from the Task.call() method. 266 * That throwable would be the cause of the TaskCallFailure. 267 */ 268 @SuppressWarnings("checkstyle:illegalcatch") 269 private byte[] runTask() throws TaskCallFailure { 270 try { 271 final byte[] result; 272 if (this.memento.isPresent()) { 273 LOG.log(Level.FINEST, "Calling Task.call() with a memento"); 274 result = this.task.call(this.memento.get()); 275 } else { 276 LOG.log(Level.FINEST, "Calling Task.call() without a memento"); 277 result = this.task.call(null); 278 } 279 LOG.log(Level.FINEST, "Task.call() exited cleanly."); 280 return result; 281 } catch (final Throwable throwable) { 282 throw new TaskCallFailure(throwable); 283 } 284 } 285 286 /** 287 * Calls the configured Task close handler and catches exceptions it may throw. 288 */ 289 @SuppressWarnings("checkstyle:illegalcatch") 290 private void closeTask(final byte[] message) throws TaskCloseHandlerFailure { 291 LOG.log(Level.FINEST, "Invoking close handler."); 292 try { 293 this.fCloseHandler.get().onNext(new CloseEventImpl(message)); 294 } catch (final Throwable throwable) { 295 throw new TaskCloseHandlerFailure(throwable); 296 } 297 } 298 299 /** 300 * Calls the configured Task message handler and catches exceptions it may throw. 301 */ 302 @SuppressWarnings("checkstyle:illegalcatch") 303 private void deliverMessageToTask(final byte[] message) throws TaskMessageHandlerFailure { 304 try { 305 this.fMessageHandler.get().onNext(new DriverMessageImpl(message)); 306 } catch (final Throwable throwable) { 307 throw new TaskMessageHandlerFailure(throwable); 308 } 309 } 310 311 /** 312 * Calls the configured Task suspend handler and catches exceptions it may throw. 313 */ 314 @SuppressWarnings("checkstyle:illegalcatch") 315 private void suspendTask(final byte[] message) throws TaskSuspendHandlerFailure { 316 try { 317 this.fSuspendHandler.get().onNext(new SuspendEventImpl(message)); 318 } catch (final Throwable throwable) { 319 throw new TaskSuspendHandlerFailure(throwable); 320 } 321 } 322}