001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.task; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.context.ActiveContext; 024import org.apache.reef.driver.restart.DriverRestartManager; 025import org.apache.reef.driver.restart.EvaluatorRestartState; 026import org.apache.reef.driver.task.FailedTask; 027import org.apache.reef.driver.task.RunningTask; 028import org.apache.reef.runtime.common.driver.context.EvaluatorContext; 029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; 030import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; 031import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 032import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskMessagePOJO; 033import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO; 034import org.apache.reef.runtime.common.utils.ExceptionCodec; 035import org.apache.reef.util.Optional; 036 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Represents a Task on the Driver. 042 */ 043@DriverSide 044@Private 045public final class TaskRepresenter { 046 047 private static final Logger LOG = Logger.getLogger(TaskRepresenter.class.getName()); 048 049 private final EvaluatorContext context; 050 private final EvaluatorMessageDispatcher messageDispatcher; 051 private final EvaluatorManager evaluatorManager; 052 private final ExceptionCodec exceptionCodec; 053 private final String taskId; 054 private final DriverRestartManager driverRestartManager; 055 056 // Mutable state 057 private State state = State.INIT; 058 private boolean isFirstRunningMessage = true; 059 060 public TaskRepresenter(final String taskId, 061 final EvaluatorContext context, 062 final EvaluatorMessageDispatcher messageDispatcher, 063 final EvaluatorManager evaluatorManager, 064 final ExceptionCodec exceptionCodec, 065 final DriverRestartManager driverRestartManager) { 066 this.taskId = taskId; 067 this.context = context; 068 this.messageDispatcher = messageDispatcher; 069 this.evaluatorManager = evaluatorManager; 070 this.exceptionCodec = exceptionCodec; 071 this.driverRestartManager = driverRestartManager; 072 } 073 074 private static byte[] getResult(final TaskStatusPOJO taskStatus) { 075 return taskStatus.hasResult() ? taskStatus.getResult() : null; 076 } 077 078 public void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { 079 080 LOG.log(Level.FINE, "Received task {0} status {1}", 081 new Object[]{taskStatus.getTaskId(), taskStatus.getState()}); 082 083 // Make sure that the message is indeed for us. 084 if (!taskStatus.getContextId().equals(this.context.getId())) { 085 throw new RuntimeException( 086 "Received a message for a task running on Context " + taskStatus.getContextId() + 087 " while the Driver believes this Task to be run on Context " + this.context.getId()); 088 } 089 090 if (!taskStatus.getTaskId().equals(this.taskId)) { 091 throw new RuntimeException("Received a message for task " + taskStatus.getTaskId() + 092 " in the TaskRepresenter for Task " + this.taskId); 093 } 094 095 if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { 096 // when a recovered heartbeat is received, we will take its word for it 097 LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.", 098 new Object[]{taskStatus.getState(), this.taskId}); 099 this.setState(taskStatus.getState()); 100 } 101 // Dispatch the message to the right method. 102 switch (taskStatus.getState()) { 103 case INIT: 104 this.onTaskInit(taskStatus); 105 break; 106 case RUNNING: 107 this.onTaskRunning(taskStatus); 108 break; 109 case SUSPEND: 110 this.onTaskSuspend(taskStatus); 111 break; 112 case DONE: 113 this.onTaskDone(taskStatus); 114 break; 115 case FAILED: 116 this.onTaskFailed(taskStatus); 117 break; 118 default: 119 throw new IllegalStateException("Unknown task state: " + taskStatus.getState()); 120 } 121 } 122 123 private void onTaskInit(final TaskStatusPOJO taskStatusPOJO) { 124 assert State.INIT == taskStatusPOJO.getState(); 125 if (this.isKnown()) { 126 LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" + 127 " which we have seen before. Ignoring the second message", this.taskId); 128 } else { 129 this.setState(State.RUNNING); 130 } 131 } 132 133 private void onTaskRunning(final TaskStatusPOJO taskStatus) { 134 assert taskStatus.getState() == State.RUNNING; 135 136 if (this.isNotRunning()) { 137 throw new IllegalStateException("Received a task status message from task " + this.taskId + 138 " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state); 139 } 140 141 if (isFirstRunningMessage) { 142 isFirstRunningMessage = false; 143 final RunningTask runningTask = new RunningTaskImpl( 144 this.evaluatorManager, this.taskId, this.context, this); 145 this.messageDispatcher.onTaskRunning(runningTask); 146 } 147 148 // fire driver restart task running handler if this is a recovery heartbeat 149 if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { 150 final RunningTask runningTask = new RunningTaskImpl( 151 this.evaluatorManager, this.taskId, this.context, this); 152 this.driverRestartManager.setEvaluatorProcessed(evaluatorManager.getId()); 153 this.messageDispatcher.onDriverRestartTaskRunning(runningTask); 154 } 155 156 for (final TaskMessagePOJO 157 taskMessagePOJO : taskStatus.getTaskMessageList()) { 158 this.messageDispatcher.onTaskMessage( 159 new TaskMessageImpl(taskMessagePOJO.getMessage(), 160 this.taskId, this.context.getId(), taskMessagePOJO.getSourceId(), taskMessagePOJO.getSequenceNumber())); 161 } 162 } 163 164 private void onTaskSuspend(final TaskStatusPOJO taskStatus) { 165 assert State.SUSPEND == taskStatus.getState(); 166 assert this.isKnown(); 167 this.messageDispatcher.onTaskSuspended( 168 new SuspendedTaskImpl(this.context, getResult(taskStatus), this.taskId)); 169 this.setState(State.SUSPEND); 170 } 171 172 private void onTaskDone(final TaskStatusPOJO taskStatus) { 173 assert State.DONE == taskStatus.getState(); 174 assert this.isKnown(); 175 this.messageDispatcher.onTaskCompleted( 176 new CompletedTaskImpl(this.context, getResult(taskStatus), this.taskId)); 177 this.setState(State.DONE); 178 } 179 180 private void onTaskFailed(final TaskStatusPOJO taskStatus) { 181 assert State.FAILED == taskStatus.getState(); 182 final Optional<ActiveContext> evaluatorContext = Optional.<ActiveContext>of(this.context); 183 final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatus)); 184 final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes); 185 final String message = exception.isPresent() ? exception.get().getMessage() : "No message given"; 186 final Optional<String> description = Optional.empty(); 187 final FailedTask failedTask = new FailedTask( 188 this.taskId, message, description, exception, bytes, evaluatorContext); 189 this.messageDispatcher.onTaskFailed(failedTask); 190 this.setState(State.FAILED); 191 } 192 193 public String getId() { 194 return this.taskId; 195 } 196 197 /** 198 * @return true, if we had at least one message from the task. 199 */ 200 private boolean isKnown() { 201 return this.state != State.INIT; 202 } 203 204 /** 205 * @return true, if this task is in any other state but RUNNING. 206 */ 207 public boolean isNotRunning() { 208 return this.state != State.RUNNING; 209 } 210 211 /** 212 * @return true, if this task is in INIT or RUNNING status. 213 */ 214 public boolean isClosable() { 215 return this.state == State.INIT || this.state == State.RUNNING; 216 } 217 218 private void setState(final State newState) { 219 LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]", 220 new Object[]{this.taskId, this.state, newState}); 221 this.state = newState; 222 } 223}