001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator.task; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.driver.task.TaskConfigurationOptions; 023import org.apache.reef.evaluator.context.parameters.ContextIdentifier; 024import org.apache.reef.proto.ReefServiceProtos; 025import org.apache.reef.runtime.common.evaluator.HeartBeatManager; 026import org.apache.reef.runtime.common.utils.ExceptionCodec; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.task.TaskMessage; 029import org.apache.reef.task.TaskMessageSource; 030import org.apache.reef.util.Optional; 031 032import javax.inject.Inject; 033import java.util.ArrayList; 034import java.util.Collection; 035import java.util.List; 036import java.util.Set; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Represents the various states a Task could be in. 042 */ 043public final class TaskStatus { 044 private static final Logger LOG = Logger.getLogger(TaskStatus.class.getName()); 045 046 private final String taskId; 047 private final String contextId; 048 private final HeartBeatManager heartBeatManager; 049 private final Set<TaskMessageSource> evaluatorMessageSources; 050 private final ExceptionCodec exceptionCodec; 051 private Optional<Throwable> lastException = Optional.empty(); 052 private Optional<byte[]> result = Optional.empty(); 053 private State state = State.PRE_INIT; 054 055 056 @Inject 057 TaskStatus(final @Parameter(TaskConfigurationOptions.Identifier.class) String taskId, 058 final @Parameter(ContextIdentifier.class) String contextId, 059 final @Parameter(TaskConfigurationOptions.TaskMessageSources.class) Set<TaskMessageSource> evaluatorMessageSources, 060 final HeartBeatManager heartBeatManager, 061 final ExceptionCodec exceptionCodec) { 062 this.taskId = taskId; 063 this.contextId = contextId; 064 this.heartBeatManager = heartBeatManager; 065 this.evaluatorMessageSources = evaluatorMessageSources; 066 this.exceptionCodec = exceptionCodec; 067 } 068 069 /** 070 * @param from 071 * @param to 072 * @return true, if the state transition from state 'from' to state 'to' is legal. 073 */ 074 private static boolean isLegal(final State from, final State to) { 075 if (from == null) { 076 return to == State.INIT; 077 } 078 switch (from) { 079 case PRE_INIT: 080 switch (to) { 081 case INIT: 082 return true; 083 default: 084 return false; 085 } 086 case INIT: 087 switch (to) { 088 case RUNNING: 089 case FAILED: 090 case KILLED: 091 case DONE: 092 return true; 093 default: 094 return false; 095 } 096 case RUNNING: 097 switch (to) { 098 case CLOSE_REQUESTED: 099 case SUSPEND_REQUESTED: 100 case FAILED: 101 case KILLED: 102 case DONE: 103 return true; 104 default: 105 return false; 106 } 107 case CLOSE_REQUESTED: 108 switch (to) { 109 case FAILED: 110 case KILLED: 111 case DONE: 112 return true; 113 default: 114 return false; 115 } 116 case SUSPEND_REQUESTED: 117 switch (to) { 118 case FAILED: 119 case KILLED: 120 case SUSPENDED: 121 return true; 122 default: 123 return false; 124 } 125 126 case FAILED: 127 case DONE: 128 case KILLED: 129 return false; 130 default: 131 return false; 132 } 133 } 134 135 public final String getTaskId() { 136 return this.taskId; 137 } 138 139 ReefServiceProtos.TaskStatusProto toProto() { 140 this.check(); 141 final ReefServiceProtos.TaskStatusProto.Builder result = ReefServiceProtos.TaskStatusProto.newBuilder() 142 .setContextId(this.contextId) 143 .setTaskId(this.taskId) 144 .setState(this.getProtoState()); 145 146 if (this.result.isPresent()) { 147 result.setResult(ByteString.copyFrom(this.result.get())); 148 } else if (this.lastException.isPresent()) { 149 final byte[] error = this.exceptionCodec.toBytes(this.lastException.get()); 150 result.setResult(ByteString.copyFrom(error)); 151 } else if (this.state == State.RUNNING) { 152 for (final TaskMessage taskMessage : this.getMessages()) { 153 result.addTaskMessage(ReefServiceProtos.TaskStatusProto.TaskMessageProto.newBuilder() 154 .setSourceId(taskMessage.getMessageSourceID()) 155 .setMessage(ByteString.copyFrom(taskMessage.get())) 156 .build()); 157 } 158 } 159 160 return result.build(); 161 } 162 163 private void check() { 164 if (this.result.isPresent() && this.lastException.isPresent()) { 165 throw new RuntimeException("Found both an exception and a result. This is unsupported."); 166 } 167 } 168 169 private ReefServiceProtos.State getProtoState() { 170 switch (this.state) { 171 case INIT: 172 return ReefServiceProtos.State.INIT; 173 case CLOSE_REQUESTED: 174 case SUSPEND_REQUESTED: 175 case RUNNING: 176 return ReefServiceProtos.State.RUNNING; 177 case DONE: 178 return ReefServiceProtos.State.DONE; 179 case SUSPENDED: 180 return ReefServiceProtos.State.SUSPEND; 181 case FAILED: 182 return ReefServiceProtos.State.FAILED; 183 case KILLED: 184 return ReefServiceProtos.State.KILLED; 185 } 186 throw new RuntimeException("Unknown state: " + this.state); 187 } 188 189 void setException(final Throwable throwable) { 190 synchronized (this.heartBeatManager) { 191 this.lastException = Optional.of(throwable); 192 this.state = State.FAILED; 193 this.check(); 194 this.heartbeat(); 195 } 196 } 197 198 void setResult(final byte[] result) { 199 synchronized (this.heartBeatManager) { 200 this.result = Optional.ofNullable(result); 201 if (this.state == State.RUNNING) { 202 this.setState(State.DONE); 203 } else if (this.state == State.SUSPEND_REQUESTED) { 204 this.setState(State.SUSPENDED); 205 } else if (this.state == State.CLOSE_REQUESTED) { 206 this.setState(State.DONE); 207 } 208 this.check(); 209 this.heartbeat(); 210 } 211 } 212 213 private void heartbeat() { 214 this.heartBeatManager.sendTaskStatus(this.toProto()); 215 } 216 217 /** 218 * Sets the state to INIT and informs the driver about it. 219 */ 220 void setInit() { 221 LOG.log(Level.FINEST, "Sending Task INIT heartbeat to the Driver."); 222 this.setState(State.INIT); 223 this.heartbeat(); 224 } 225 226 /** 227 * Sets the state to RUNNING after the handlers for TaskStart have been called. 228 */ 229 void setRunning() { 230 this.setState(State.RUNNING); 231 } 232 233 void setCloseRequested() { 234 this.setState(State.CLOSE_REQUESTED); 235 } 236 237 void setSuspendRequested() { 238 this.setState(State.SUSPEND_REQUESTED); 239 } 240 241 void setKilled() { 242 this.setState(State.KILLED); 243 this.heartbeat(); 244 } 245 246 boolean isRunning() { 247 return this.state == State.RUNNING; 248 } 249 250 boolean isNotRunning() { 251 return this.state != State.RUNNING; 252 } 253 254 boolean hasEnded() { 255 switch (this.state) { 256 case DONE: 257 case SUSPENDED: 258 case FAILED: 259 case KILLED: 260 return true; 261 default: 262 return false; 263 } 264 } 265 266 State getState() { 267 return this.state; 268 } 269 270 private void setState(final State state) { 271 if (isLegal(this.state, state)) { 272 this.state = state; 273 } else { 274 final String msg = "Illegal state transition from [" + this.state + "] to [" + state + "]"; 275 LOG.log(Level.SEVERE, msg); 276 throw new RuntimeException(msg); 277 } 278 } 279 280 String getContextId() { 281 return this.contextId; 282 } 283 284 /** 285 * @return the messages to be sent on the Task's behalf in the next heartbeat. 286 */ 287 private final Collection<TaskMessage> getMessages() { 288 final List<TaskMessage> result = new ArrayList<>(this.evaluatorMessageSources.size()); 289 for (final TaskMessageSource messageSource : this.evaluatorMessageSources) { 290 final Optional<TaskMessage> taskMessageOptional = messageSource.getMessage(); 291 if (taskMessageOptional.isPresent()) { 292 result.add(taskMessageOptional.get()); 293 } 294 } 295 return result; 296 } 297 298 299 enum State { 300 PRE_INIT, 301 INIT, 302 RUNNING, 303 CLOSE_REQUESTED, 304 SUSPEND_REQUESTED, 305 SUSPENDED, 306 FAILED, 307 DONE, 308 KILLED 309 } 310}