001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.task; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.context.ActiveContext; 024import org.apache.reef.driver.task.FailedTask; 025import org.apache.reef.driver.task.RunningTask; 026import org.apache.reef.proto.ReefServiceProtos; 027import org.apache.reef.runtime.common.driver.context.EvaluatorContext; 028import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; 029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; 030import org.apache.reef.runtime.common.utils.ExceptionCodec; 031import org.apache.reef.util.Optional; 032 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036/** 037 * Represents a Task on the Driver. 038 */ 039@DriverSide 040@Private 041public final class TaskRepresenter { 042 043 private static final Logger LOG = Logger.getLogger(TaskRepresenter.class.getName()); 044 045 private final EvaluatorContext context; 046 private final EvaluatorMessageDispatcher messageDispatcher; 047 private final EvaluatorManager evaluatorManager; 048 private final ExceptionCodec exceptionCodec; 049 private final String taskId; 050 051 // Mutable state 052 private ReefServiceProtos.State state = ReefServiceProtos.State.INIT; 053 054 public TaskRepresenter(final String taskId, 055 final EvaluatorContext context, 056 final EvaluatorMessageDispatcher messageDispatcher, 057 final EvaluatorManager evaluatorManager, 058 final ExceptionCodec exceptionCodec) { 059 this.taskId = taskId; 060 this.context = context; 061 this.messageDispatcher = messageDispatcher; 062 this.evaluatorManager = evaluatorManager; 063 this.exceptionCodec = exceptionCodec; 064 } 065 066 private static byte[] getResult(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 067 return taskStatusProto.hasResult() ? taskStatusProto.getResult().toByteArray() : null; 068 } 069 070 public void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 071 072 LOG.log(Level.FINE, "Received task {0} status {1}", 073 new Object[]{taskStatusProto.getTaskId(), taskStatusProto.getState()}); 074 075 // Make sure that the message is indeed for us. 076 if (!taskStatusProto.getContextId().equals(this.context.getId())) { 077 throw new RuntimeException( 078 "Received a message for a task running on Context " + taskStatusProto.getContextId() + 079 " while the Driver believes this Task to be run on Context " + this.context.getId()); 080 } 081 082 if (!taskStatusProto.getTaskId().equals(this.taskId)) { 083 throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() + 084 " in the TaskRepresenter for Task " + this.taskId); 085 } 086 if (taskStatusProto.getRecovery()) { 087 // when a recovered heartbeat is received, we will take its word for it 088 LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.", 089 new Object[]{taskStatusProto.getState(), this.taskId}); 090 this.setState(taskStatusProto.getState()); 091 } 092 // Dispatch the message to the right method. 093 switch (taskStatusProto.getState()) { 094 case INIT: 095 this.onTaskInit(taskStatusProto); 096 break; 097 case RUNNING: 098 this.onTaskRunning(taskStatusProto); 099 break; 100 case SUSPEND: 101 this.onTaskSuspend(taskStatusProto); 102 break; 103 case DONE: 104 this.onTaskDone(taskStatusProto); 105 break; 106 case FAILED: 107 this.onTaskFailed(taskStatusProto); 108 break; 109 default: 110 throw new IllegalStateException("Unknown task state: " + taskStatusProto.getState()); 111 } 112 } 113 114 private void onTaskInit(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 115 assert ((ReefServiceProtos.State.INIT == taskStatusProto.getState())); 116 if (this.isKnown()) { 117 LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" + 118 " which we have seen before. Ignoring the second message", this.taskId); 119 } else { 120 final RunningTask runningTask = new RunningTaskImpl( 121 this.evaluatorManager, this.taskId, this.context, this); 122 this.messageDispatcher.onTaskRunning(runningTask); 123 this.setState(ReefServiceProtos.State.RUNNING); 124 } 125 } 126 127 private void onTaskRunning(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 128 129 assert (taskStatusProto.getState() == ReefServiceProtos.State.RUNNING); 130 131 if (this.isNotRunning()) { 132 throw new IllegalStateException("Received a task status message from task " + this.taskId + 133 " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state); 134 } 135 136 // fire driver restart task running handler if this is a recovery heartbeat 137 if (taskStatusProto.getRecovery()) { 138 final RunningTask runningTask = new RunningTaskImpl( 139 this.evaluatorManager, this.taskId, this.context, this); 140 this.messageDispatcher.onDriverRestartTaskRunning(runningTask); 141 } 142 143 for (final ReefServiceProtos.TaskStatusProto.TaskMessageProto taskMessageProto : taskStatusProto.getTaskMessageList()) { 144 this.messageDispatcher.onTaskMessage( 145 new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(), 146 this.taskId, this.context.getId(), taskMessageProto.getSourceId())); 147 } 148 } 149 150 private void onTaskSuspend(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 151 assert (ReefServiceProtos.State.SUSPEND == taskStatusProto.getState()); 152 assert (this.isKnown()); 153 this.messageDispatcher.onTaskSuspended( 154 new SuspendedTaskImpl(this.context, getResult(taskStatusProto), this.taskId)); 155 this.setState(ReefServiceProtos.State.SUSPEND); 156 } 157 158 private void onTaskDone(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 159 assert (ReefServiceProtos.State.DONE == taskStatusProto.getState()); 160 assert (this.isKnown()); 161 this.messageDispatcher.onTaskCompleted( 162 new CompletedTaskImpl(this.context, getResult(taskStatusProto), this.taskId)); 163 this.setState(ReefServiceProtos.State.DONE); 164 } 165 166 private void onTaskFailed(final ReefServiceProtos.TaskStatusProto taskStatusProto) { 167 assert (ReefServiceProtos.State.FAILED == taskStatusProto.getState()); 168 final Optional<ActiveContext> evaluatorContext = Optional.<ActiveContext>of(this.context); 169 final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatusProto)); 170 final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes); 171 final String message = exception.isPresent() ? exception.get().getMessage() : "No message given"; 172 final Optional<String> description = Optional.empty(); 173 final FailedTask failedTask = new FailedTask( 174 this.taskId, message, description, exception, bytes, evaluatorContext); 175 this.messageDispatcher.onTaskFailed(failedTask); 176 this.setState(ReefServiceProtos.State.FAILED); 177 } 178 179 public String getId() { 180 return this.taskId; 181 } 182 183 /** 184 * @return true, if we had at least one message from the task. 185 */ 186 private boolean isKnown() { 187 return this.state != ReefServiceProtos.State.INIT; 188 } 189 190 /** 191 * @return true, if this task is in any other state but RUNNING. 192 */ 193 public boolean isNotRunning() { 194 return this.state != ReefServiceProtos.State.RUNNING; 195 } 196 197 private void setState(final ReefServiceProtos.State newState) { 198 LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]", 199 new Object[]{this.taskId, this.state, newState}); 200 this.state = newState; 201 } 202}