001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration; 024import org.apache.reef.runtime.common.driver.client.ClientConnection; 025import org.apache.reef.runtime.common.utils.ExceptionCodec; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.util.Optional; 028import org.apache.reef.wake.time.Clock; 029 030import javax.inject.Inject; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Manages the Driver's status. 036 */ 037public final class DriverStatusManager { 038 private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName()); 039 private final Clock clock; 040 private final ClientConnection clientConnection; 041 private final String jobIdentifier; 042 private final ExceptionCodec exceptionCodec; 043 private DriverStatus driverStatus = DriverStatus.PRE_INIT; 044 private Optional<Throwable> shutdownCause = Optional.empty(); 045 private boolean driverTerminationHasBeenCommunicatedToClient = false; 046 private boolean restartCompleted = false; 047 private int numPreviousContainers = -1; 048 private int numRecoveredContainers = 0; 049 050 051 /** 052 * @param clock 053 * @param clientConnection 054 * @param jobIdentifier 055 * @param exceptionCodec 056 */ 057 @Inject 058 DriverStatusManager(final Clock clock, 059 final ClientConnection clientConnection, 060 final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier, 061 final ExceptionCodec exceptionCodec) { 062 LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>"); 063 this.clock = clock; 064 this.clientConnection = clientConnection; 065 this.jobIdentifier = jobIdentifier; 066 this.exceptionCodec = exceptionCodec; 067 LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'"); 068 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>"); 069 } 070 071 /** 072 * Check whether a state transition 'from->to' is legal. 073 * 074 * @param from 075 * @param to 076 * @return 077 */ 078 private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) { 079 switch (from) { 080 case PRE_INIT: 081 switch (to) { 082 case INIT: 083 return true; 084 default: 085 return false; 086 } 087 case INIT: 088 switch (to) { 089 case RUNNING: 090 return true; 091 default: 092 return false; 093 } 094 case RUNNING: 095 switch (to) { 096 case SHUTTING_DOWN: 097 case FAILING: 098 return true; 099 default: 100 return false; 101 } 102 case FAILING: 103 case SHUTTING_DOWN: 104 return false; 105 default: 106 throw new IllegalStateException("Unknown input state: " + from); 107 } 108 } 109 110 /** 111 * Changes the driver status to INIT and sends message to the client about the transition. 112 */ 113 public synchronized void onInit() { 114 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit"); 115 this.clientConnection.send(this.getInitMessage()); 116 this.setStatus(DriverStatus.INIT); 117 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit"); 118 } 119 120 /** 121 * Changes the driver status to RUNNING and sends message to the client about the transition. 122 * If the driver is in status 'PRE_INIT', this first calls onInit(); 123 */ 124 public synchronized void onRunning() { 125 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning"); 126 if (this.driverStatus.equals(DriverStatus.PRE_INIT)) { 127 this.onInit(); 128 } 129 this.clientConnection.send(this.getRunningMessage()); 130 this.setStatus(DriverStatus.RUNNING); 131 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning"); 132 } 133 134 /** 135 * End the Driver with an exception. 136 * 137 * @param exception 138 */ 139 public synchronized void onError(final Throwable exception) { 140 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception}); 141 if (this.isShuttingDownOrFailing()) { 142 LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception); 143 } else { 144 LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception); 145 this.shutdownCause = Optional.of(exception); 146 this.clock.stop(); 147 this.setStatus(DriverStatus.FAILING); 148 } 149 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception}); 150 } 151 152 /** 153 * Perform a clean shutdown of the Driver. 154 */ 155 public synchronized void onComplete() { 156 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete"); 157 if (this.isShuttingDownOrFailing()) { 158 LOG.log(Level.WARNING, "Ignoring second call to onComplete()"); 159 } else { 160 LOG.log(Level.INFO, "Clean shutdown of the Driver."); 161 if (LOG.isLoggable(Level.FINEST)) { 162 LOG.log(Level.FINEST, "Callstack: ", new Exception()); 163 } 164 this.clock.close(); 165 this.setStatus(DriverStatus.SHUTTING_DOWN); 166 } 167 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete"); 168 169 } 170 171 /** 172 * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext(). 173 * 174 * @param exception 175 */ 176 public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) { 177 if (this.isNotShuttingDownOrFailing()) { 178 LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus); 179 } 180 if (this.driverTerminationHasBeenCommunicatedToClient) { 181 LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call"); 182 } else { 183 { // Log the shutdown situation 184 if (this.shutdownCause.isPresent()) { 185 LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get()); 186 } 187 if (exception.isPresent()) { 188 LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get()); 189 } 190 if (this.shutdownCause.isPresent() && exception.isPresent()) { 191 LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was an exception during clock.close(). Only the first exception will be sent to the client"); 192 } 193 } 194 if (this.shutdownCause.isPresent()) { 195 // Send the earlier exception, if there was one 196 this.clientConnection.send(getJobEndingMessage(this.shutdownCause)); 197 } else { 198 // Send the exception passed, if there was one. 199 this.clientConnection.send(getJobEndingMessage(exception)); 200 } 201 this.driverTerminationHasBeenCommunicatedToClient = true; 202 } 203 } 204 205 /** 206 * Indicate that the Driver restart is complete. It is meant to be called exactly once during a restart and never 207 * during the ininital launch of a Driver. 208 */ 209 public synchronized void setRestartCompleted() { 210 if (!this.isDriverRestart()) { 211 throw new IllegalStateException("setRestartCompleted() called in a Driver that is not, in fact, restarted."); 212 } else if (this.restartCompleted) { 213 LOG.log(Level.WARNING, "Calling setRestartCompleted more than once."); 214 } else { 215 this.restartCompleted = true; 216 } 217 } 218 219 /** 220 * @return the number of Evaluators expected to check in from a previous run. 221 */ 222 public synchronized int getNumPreviousContainers() { 223 return this.numPreviousContainers; 224 } 225 226 /** 227 * Set the number of containers to expect still active from a previous execution of the Driver in a restart situation. 228 * To be called exactly once during a driver restart. 229 * 230 * @param num 231 */ 232 public synchronized void setNumPreviousContainers(final int num) { 233 if (this.numPreviousContainers >= 0) { 234 throw new IllegalStateException("Attempting to set the number of expected containers left from a previous container more than once."); 235 } else { 236 this.numPreviousContainers = num; 237 } 238 } 239 240 /** 241 * @return the number of Evaluators from a previous Driver that have checked in with the Driver in a restart situation. 242 */ 243 public synchronized int getNumRecoveredContainers() { 244 return this.numRecoveredContainers; 245 } 246 247 /** 248 * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run. 249 */ 250 public synchronized void oneContainerRecovered() { 251 this.numRecoveredContainers += 1; 252 if (this.numRecoveredContainers > this.numPreviousContainers) { 253 throw new IllegalStateException("Reconnected to" + 254 this.numRecoveredContainers + 255 "Evaluators while only expecting " + 256 this.numPreviousContainers); 257 } 258 } 259 260 /** 261 * @return true if the Driver is a restarted driver of an earlier attempt. 262 */ 263 private synchronized boolean isDriverRestart() { 264 return this.getNumPreviousContainers() > 0; 265 } 266 267 public synchronized boolean isShuttingDownOrFailing() { 268 return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus) 269 || DriverStatus.FAILING.equals(this.driverStatus); 270 } 271 272 private synchronized boolean isNotShuttingDownOrFailing() { 273 return !isShuttingDownOrFailing(); 274 } 275 276 /** 277 * Helper method to set the status. This also checks whether the transition from the current status to the new one is 278 * legal. 279 * 280 * @param newStatus 281 */ 282 private synchronized void setStatus(final DriverStatus newStatus) { 283 if (isLegalTransition(this.driverStatus, newStatus)) { 284 this.driverStatus = newStatus; 285 } else { 286 LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'"); 287 } 288 } 289 290 /** 291 * @param exception the exception that ended the Driver, if any. 292 * @return message to be sent to the client at the end of the job. 293 */ 294 private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) { 295 final ReefServiceProtos.JobStatusProto message; 296 if (exception.isPresent()) { 297 message = ReefServiceProtos.JobStatusProto.newBuilder() 298 .setIdentifier(this.jobIdentifier) 299 .setState(ReefServiceProtos.State.FAILED) 300 .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get()))) 301 .build(); 302 } else { 303 message = ReefServiceProtos.JobStatusProto.newBuilder() 304 .setIdentifier(this.jobIdentifier) 305 .setState(ReefServiceProtos.State.DONE) 306 .build(); 307 } 308 return message; 309 } 310 311 /** 312 * @return The message to be sent through the ClientConnection when in state INIT. 313 */ 314 private synchronized ReefServiceProtos.JobStatusProto getInitMessage() { 315 return ReefServiceProtos.JobStatusProto.newBuilder() 316 .setIdentifier(this.jobIdentifier) 317 .setState(ReefServiceProtos.State.INIT) 318 .build(); 319 } 320 321 /** 322 * @return The message to be sent through the ClientConnection when in state RUNNING. 323 */ 324 private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() { 325 return ReefServiceProtos.JobStatusProto.newBuilder() 326 .setIdentifier(this.jobIdentifier) 327 .setState(ReefServiceProtos.State.RUNNING) 328 .build(); 329 } 330}