001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator.task; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.driver.task.TaskConfigurationOptions; 023import org.apache.reef.evaluator.context.parameters.ContextIdentifier; 024import org.apache.reef.proto.ReefServiceProtos; 025import org.apache.reef.runtime.common.evaluator.HeartBeatManager; 026import org.apache.reef.runtime.common.utils.ExceptionCodec; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.task.TaskMessage; 029import org.apache.reef.task.TaskMessageSource; 030import org.apache.reef.util.Optional; 031 032import javax.inject.Inject; 033import java.util.ArrayList; 034import java.util.Collection; 035import java.util.List; 036import java.util.Set; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Represents the various states a Task could be in. 042 */ 043public final class TaskStatus { 044 private static final Logger LOG = Logger.getLogger(TaskStatus.class.getName()); 045 046 private final String taskId; 047 private final String contextId; 048 private final HeartBeatManager heartBeatManager; 049 private final Set<TaskMessageSource> evaluatorMessageSources; 050 private final ExceptionCodec exceptionCodec; 051 private Optional<Throwable> lastException = Optional.empty(); 052 private Optional<byte[]> result = Optional.empty(); 053 private State state = State.PRE_INIT; 054 055 056 @Inject 057 TaskStatus(@Parameter(TaskConfigurationOptions.Identifier.class) final String taskId, 058 @Parameter(ContextIdentifier.class) final String contextId, 059 @Parameter(TaskConfigurationOptions.TaskMessageSources.class) 060 final Set<TaskMessageSource> evaluatorMessageSources, 061 final HeartBeatManager heartBeatManager, 062 final ExceptionCodec exceptionCodec) { 063 this.taskId = taskId; 064 this.contextId = contextId; 065 this.heartBeatManager = heartBeatManager; 066 this.evaluatorMessageSources = evaluatorMessageSources; 067 this.exceptionCodec = exceptionCodec; 068 } 069 070 /** 071 * @param from 072 * @param to 073 * @return true, if the state transition from state 'from' to state 'to' is legal. 074 */ 075 private static boolean isLegal(final State from, final State to) { 076 if (from == null) { 077 return to == State.INIT; 078 } 079 switch (from) { 080 case PRE_INIT: 081 switch (to) { 082 case INIT: 083 return true; 084 default: 085 return false; 086 } 087 case INIT: 088 switch (to) { 089 case RUNNING: 090 case FAILED: 091 case KILLED: 092 case DONE: 093 return true; 094 default: 095 return false; 096 } 097 case RUNNING: 098 switch (to) { 099 case CLOSE_REQUESTED: 100 case SUSPEND_REQUESTED: 101 case FAILED: 102 case KILLED: 103 case DONE: 104 return true; 105 default: 106 return false; 107 } 108 case CLOSE_REQUESTED: 109 switch (to) { 110 case FAILED: 111 case KILLED: 112 case DONE: 113 return true; 114 default: 115 return false; 116 } 117 case SUSPEND_REQUESTED: 118 switch (to) { 119 case FAILED: 120 case KILLED: 121 case SUSPENDED: 122 return true; 123 default: 124 return false; 125 } 126 127 case FAILED: 128 case DONE: 129 case KILLED: 130 return false; 131 default: 132 return false; 133 } 134 } 135 136 public String getTaskId() { 137 return this.taskId; 138 } 139 140 ReefServiceProtos.TaskStatusProto toProto() { 141 this.check(); 142 final ReefServiceProtos.TaskStatusProto.Builder resultBuilder = ReefServiceProtos.TaskStatusProto.newBuilder() 143 .setContextId(this.contextId) 144 .setTaskId(this.taskId) 145 .setState(this.getProtoState()); 146 147 if (this.result.isPresent()) { 148 resultBuilder.setResult(ByteString.copyFrom(this.result.get())); 149 } else if (this.lastException.isPresent()) { 150 final byte[] error = this.exceptionCodec.toBytes(this.lastException.get()); 151 resultBuilder.setResult(ByteString.copyFrom(error)); 152 } else if (this.state == State.RUNNING) { 153 for (final TaskMessage taskMessage : this.getMessages()) { 154 resultBuilder.addTaskMessage(ReefServiceProtos.TaskStatusProto.TaskMessageProto.newBuilder() 155 .setSourceId(taskMessage.getMessageSourceID()) 156 .setMessage(ByteString.copyFrom(taskMessage.get())) 157 .build()); 158 } 159 } 160 161 return resultBuilder.build(); 162 } 163 164 private void check() { 165 if (this.result.isPresent() && this.lastException.isPresent()) { 166 throw new RuntimeException("Found both an exception and a result. This is unsupported."); 167 } 168 } 169 170 private ReefServiceProtos.State getProtoState() { 171 switch (this.state) { 172 case INIT: 173 return ReefServiceProtos.State.INIT; 174 case CLOSE_REQUESTED: 175 case SUSPEND_REQUESTED: 176 case RUNNING: 177 return ReefServiceProtos.State.RUNNING; 178 case DONE: 179 return ReefServiceProtos.State.DONE; 180 case SUSPENDED: 181 return ReefServiceProtos.State.SUSPEND; 182 case FAILED: 183 return ReefServiceProtos.State.FAILED; 184 case KILLED: 185 return ReefServiceProtos.State.KILLED; 186 default: 187 throw new RuntimeException("Unknown state: " + this.state); 188 } 189 } 190 191 void setException(final Throwable throwable) { 192 synchronized (this.heartBeatManager) { 193 this.lastException = Optional.of(throwable); 194 this.state = State.FAILED; 195 this.check(); 196 this.heartbeat(); 197 } 198 } 199 200 void setResult(final byte[] result) { 201 synchronized (this.heartBeatManager) { 202 this.result = Optional.ofNullable(result); 203 if (this.state == State.RUNNING) { 204 this.setState(State.DONE); 205 } else if (this.state == State.SUSPEND_REQUESTED) { 206 this.setState(State.SUSPENDED); 207 } else if (this.state == State.CLOSE_REQUESTED) { 208 this.setState(State.DONE); 209 } 210 this.check(); 211 this.heartbeat(); 212 } 213 } 214 215 private void heartbeat() { 216 this.heartBeatManager.sendTaskStatus(this.toProto()); 217 } 218 219 /** 220 * Sets the state to INIT and informs the driver about it. 221 */ 222 void setInit() { 223 LOG.log(Level.FINEST, "Sending Task INIT heartbeat to the Driver."); 224 this.setState(State.INIT); 225 this.heartbeat(); 226 } 227 228 /** 229 * Sets the state to RUNNING after the handlers for TaskStart have been called. 230 */ 231 void setRunning() { 232 this.setState(State.RUNNING); 233 this.heartbeat(); 234 } 235 236 void setCloseRequested() { 237 this.setState(State.CLOSE_REQUESTED); 238 } 239 240 void setSuspendRequested() { 241 this.setState(State.SUSPEND_REQUESTED); 242 } 243 244 void setKilled() { 245 this.setState(State.KILLED); 246 this.heartbeat(); 247 } 248 249 boolean isRunning() { 250 return this.state == State.RUNNING; 251 } 252 253 boolean isNotRunning() { 254 return this.state != State.RUNNING; 255 } 256 257 boolean hasEnded() { 258 switch (this.state) { 259 case DONE: 260 case SUSPENDED: 261 case FAILED: 262 case KILLED: 263 return true; 264 default: 265 return false; 266 } 267 } 268 269 State getState() { 270 return this.state; 271 } 272 273 private void setState(final State state) { 274 if (isLegal(this.state, state)) { 275 this.state = state; 276 } else { 277 final String msg = "Illegal state transition from [" + this.state + "] to [" + state + "]"; 278 LOG.log(Level.SEVERE, msg); 279 throw new RuntimeException(msg); 280 } 281 } 282 283 String getContextId() { 284 return this.contextId; 285 } 286 287 /** 288 * @return the messages to be sent on the Task's behalf in the next heartbeat. 289 */ 290 private Collection<TaskMessage> getMessages() { 291 final List<TaskMessage> messageList = new ArrayList<>(this.evaluatorMessageSources.size()); 292 for (final TaskMessageSource messageSource : this.evaluatorMessageSources) { 293 final Optional<TaskMessage> taskMessageOptional = messageSource.getMessage(); 294 if (taskMessageOptional.isPresent()) { 295 messageList.add(taskMessageOptional.get()); 296 } 297 } 298 return messageList; 299 } 300 301 302 enum State { 303 PRE_INIT, 304 INIT, 305 RUNNING, 306 CLOSE_REQUESTED, 307 SUSPEND_REQUESTED, 308 SUSPENDED, 309 FAILED, 310 DONE, 311 KILLED 312 } 313}