001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.driver.client.ClientConnection; 024import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; 025import org.apache.reef.runtime.common.utils.ExceptionCodec; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.util.Optional; 028import org.apache.reef.wake.time.Clock; 029 030import javax.inject.Inject; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Manages the Driver's status. 036 * Communicates status changes to the client and shuts down the runtime clock on shutdown. 037 */ 038public final class DriverStatusManager { 039 040 private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName()); 041 private static final String CLASS_NAME = DriverStatusManager.class.getCanonicalName(); 042 043 private final Clock clock; 044 private final ClientConnection clientConnection; 045 private final String jobIdentifier; 046 private final ExceptionCodec exceptionCodec; 047 048 private DriverStatus driverStatus = DriverStatus.PRE_INIT; 049 private Optional<Throwable> shutdownCause = Optional.empty(); 050 private boolean driverTerminationHasBeenCommunicatedToClient = false; 051 052 /** 053 * Build a new status manager. This is done automatically by Tang. 054 * @param clock runtime event loop to shut down on completion or error. 055 * @param clientConnection Connection to the job client. Send init, running, and job ending messages. 056 * @param jobIdentifier String job ID. 057 * @param exceptionCodec codec to serialize the exception when sending job ending message to the client. 058 */ 059 @Inject 060 private DriverStatusManager( 061 @Parameter(JobIdentifier.class) final String jobIdentifier, 062 final Clock clock, 063 final ClientConnection clientConnection, 064 final ExceptionCodec exceptionCodec) { 065 066 LOG.entering(CLASS_NAME, "<init>"); 067 068 this.clock = clock; 069 this.clientConnection = clientConnection; 070 this.jobIdentifier = jobIdentifier; 071 this.exceptionCodec = exceptionCodec; 072 073 LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'"); 074 075 LOG.exiting(CLASS_NAME, "<init>"); 076 } 077 078 /** 079 * Changes the driver status to INIT and sends message to the client about the transition. 080 */ 081 public synchronized void onInit() { 082 083 LOG.entering(CLASS_NAME, "onInit"); 084 085 this.clientConnection.send(this.getInitMessage()); 086 this.setStatus(DriverStatus.INIT); 087 088 LOG.exiting(CLASS_NAME, "onInit"); 089 } 090 091 /** 092 * Changes the driver status to RUNNING and sends message to the client about the transition. 093 * If the driver is in status 'PRE_INIT', this first calls onInit(); 094 */ 095 public synchronized void onRunning() { 096 097 LOG.entering(CLASS_NAME, "onRunning"); 098 099 if (this.driverStatus == DriverStatus.PRE_INIT) { 100 this.onInit(); 101 } 102 103 this.clientConnection.send(this.getRunningMessage()); 104 this.setStatus(DriverStatus.RUNNING); 105 106 LOG.exiting(CLASS_NAME, "onRunning"); 107 } 108 109 /** 110 * End the Driver with an exception. 111 * @param exception Exception that causes the driver shutdown. 112 */ 113 public synchronized void onError(final Throwable exception) { 114 115 LOG.entering(CLASS_NAME, "onError", exception); 116 117 if (this.isClosing()) { 118 LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception); 119 } else { 120 LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception); 121 this.shutdownCause = Optional.of(exception); 122 this.clock.stop(exception); 123 this.setStatus(DriverStatus.FAILING); 124 } 125 126 LOG.exiting(CLASS_NAME, "onError", exception); 127 } 128 129 /** 130 * Perform a clean shutdown of the Driver. 131 */ 132 public synchronized void onComplete() { 133 134 LOG.entering(CLASS_NAME, "onComplete"); 135 136 if (this.isClosing()) { 137 LOG.log(Level.WARNING, "Ignoring second call to onComplete()", 138 new Exception("Dummy exception to get the call stack")); 139 } else { 140 141 LOG.log(Level.INFO, "Clean shutdown of the Driver."); 142 143 if (LOG.isLoggable(Level.FINEST)) { 144 LOG.log(Level.FINEST, "Call stack: ", 145 new Exception("Dummy exception to get the call stack")); 146 } 147 148 this.clock.close(); 149 this.setStatus(DriverStatus.SHUTTING_DOWN); 150 } 151 152 LOG.exiting(CLASS_NAME, "onComplete"); 153 } 154 155 /** 156 * Sends the final message to the client. This is used by DriverRuntimeStopHandler.onNext(). 157 * @param exception Exception that caused the job to end (optional). 158 */ 159 public synchronized void onRuntimeStop(final Optional<Throwable> exception) { 160 this.sendJobEndingMessageToClient(exception); 161 } 162 163 /** 164 * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext(). 165 * @param exception Exception that caused the job to end (can be absent). 166 * @deprecated TODO[JIRA REEF-1548] Do not use DriverStatusManager as a proxy to the job client. 167 * After release 0.16, make this method private and use it inside onRuntimeStop() method instead. 168 */ 169 @Deprecated 170 public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) { 171 172 if (!this.isClosing()) { 173 LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " + 174 "This is likely a illegal call to clock.close() at play. Current state: {0}", this.driverStatus); 175 } 176 177 if (this.driverTerminationHasBeenCommunicatedToClient) { 178 LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call"); 179 return; 180 } 181 182 // Log the shutdown situation 183 if (this.shutdownCause.isPresent()) { 184 LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get()); 185 } 186 187 if (exception.isPresent()) { 188 LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get()); 189 } 190 191 if (this.shutdownCause.isPresent() && exception.isPresent()) { 192 LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was " + 193 "an exception during clock.close(). Only the first exception will be sent to the client"); 194 } 195 196 // Send the earlier exception, if there was one. Otherwise, send the exception passed. 197 this.clientConnection.send(getJobEndingMessage( 198 this.shutdownCause.isPresent() ? this.shutdownCause : exception)); 199 200 this.driverTerminationHasBeenCommunicatedToClient = true; 201 } 202 203 /** 204 * Check if the driver is in process of shutting down (either gracefully or due to an error). 205 * @return true if the driver is shutting down (gracefully or otherwise). 206 * @deprecated TODO[JIRA REEF-1560] Use isClosing() method instead. Remove after version 0.16 207 */ 208 @Deprecated 209 public synchronized boolean isShuttingDownOrFailing() { 210 return this.isClosing(); 211 } 212 213 /** 214 * Check if the driver is in process of shutting down (either gracefully or due to an error). 215 * @return true if the driver is shutting down (gracefully or otherwise). 216 */ 217 public synchronized boolean isClosing() { 218 return this.driverStatus.isClosing(); 219 } 220 221 /** 222 * Helper method to set the status. 223 * This also checks whether the transition from the current status to the new one is legal. 224 * @param toStatus Driver status to transition to. 225 */ 226 private synchronized void setStatus(final DriverStatus toStatus) { 227 if (this.driverStatus.isLegalTransition(toStatus)) { 228 this.driverStatus = toStatus; 229 } else { 230 LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {this.driverStatus, toStatus}); 231 } 232 } 233 234 /** 235 * @param exception the exception that ended the Driver, if any. 236 * @return message to be sent to the client at the end of the job. 237 */ 238 private ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) { 239 if (exception.isPresent()) { 240 return ReefServiceProtos.JobStatusProto.newBuilder() 241 .setIdentifier(this.jobIdentifier) 242 .setState(ReefServiceProtos.State.FAILED) 243 .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get()))) 244 .build(); 245 } else { 246 return ReefServiceProtos.JobStatusProto.newBuilder() 247 .setIdentifier(this.jobIdentifier) 248 .setState(ReefServiceProtos.State.DONE) 249 .build(); 250 } 251 } 252 253 /** 254 * @return The message to be sent through the ClientConnection when in state INIT. 255 */ 256 private ReefServiceProtos.JobStatusProto getInitMessage() { 257 return ReefServiceProtos.JobStatusProto.newBuilder() 258 .setIdentifier(this.jobIdentifier) 259 .setState(ReefServiceProtos.State.INIT) 260 .build(); 261 } 262 263 /** 264 * @return The message to be sent through the ClientConnection when in state RUNNING. 265 */ 266 private ReefServiceProtos.JobStatusProto getRunningMessage() { 267 return ReefServiceProtos.JobStatusProto.newBuilder() 268 .setIdentifier(this.jobIdentifier) 269 .setState(ReefServiceProtos.State.RUNNING) 270 .build(); 271 } 272}