001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.driver.client.ClientConnection; 024import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; 025import org.apache.reef.runtime.common.utils.ExceptionCodec; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.util.Optional; 028import org.apache.reef.wake.time.Clock; 029 030import javax.inject.Inject; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Manages the Driver's status. 036 */ 037public final class DriverStatusManager { 038 private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName()); 039 private final Clock clock; 040 private final ClientConnection clientConnection; 041 private final String jobIdentifier; 042 private final ExceptionCodec exceptionCodec; 043 private DriverStatus driverStatus = DriverStatus.PRE_INIT; 044 private Optional<Throwable> shutdownCause = Optional.empty(); 045 private boolean driverTerminationHasBeenCommunicatedToClient = false; 046 047 048 /** 049 * @param clock 050 * @param clientConnection 051 * @param jobIdentifier 052 * @param exceptionCodec 053 */ 054 @Inject 055 DriverStatusManager(final Clock clock, 056 final ClientConnection clientConnection, 057 @Parameter(JobIdentifier.class) final String jobIdentifier, 058 final ExceptionCodec exceptionCodec) { 059 LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>"); 060 this.clock = clock; 061 this.clientConnection = clientConnection; 062 this.jobIdentifier = jobIdentifier; 063 this.exceptionCodec = exceptionCodec; 064 LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'"); 065 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>"); 066 } 067 068 /** 069 * Check whether a state transition 'from->to' is legal. 070 * 071 * @param from 072 * @param to 073 * @return 074 */ 075 private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) { 076 switch (from) { 077 case PRE_INIT: 078 switch (to) { 079 case INIT: 080 return true; 081 default: 082 return false; 083 } 084 case INIT: 085 switch (to) { 086 case RUNNING: 087 return true; 088 default: 089 return false; 090 } 091 case RUNNING: 092 switch (to) { 093 case SHUTTING_DOWN: 094 case FAILING: 095 return true; 096 default: 097 return false; 098 } 099 case FAILING: 100 case SHUTTING_DOWN: 101 return false; 102 default: 103 throw new IllegalStateException("Unknown input state: " + from); 104 } 105 } 106 107 /** 108 * Changes the driver status to INIT and sends message to the client about the transition. 109 */ 110 public synchronized void onInit() { 111 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit"); 112 this.clientConnection.send(this.getInitMessage()); 113 this.setStatus(DriverStatus.INIT); 114 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit"); 115 } 116 117 /** 118 * Changes the driver status to RUNNING and sends message to the client about the transition. 119 * If the driver is in status 'PRE_INIT', this first calls onInit(); 120 */ 121 public synchronized void onRunning() { 122 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning"); 123 if (this.driverStatus.equals(DriverStatus.PRE_INIT)) { 124 this.onInit(); 125 } 126 this.clientConnection.send(this.getRunningMessage()); 127 this.setStatus(DriverStatus.RUNNING); 128 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning"); 129 } 130 131 /** 132 * End the Driver with an exception. 133 * 134 * @param exception 135 */ 136 public synchronized void onError(final Throwable exception) { 137 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception}); 138 if (this.isShuttingDownOrFailing()) { 139 LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception); 140 } else { 141 LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception); 142 this.shutdownCause = Optional.of(exception); 143 this.clock.stop(exception); 144 this.setStatus(DriverStatus.FAILING); 145 } 146 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception}); 147 } 148 149 /** 150 * Perform a clean shutdown of the Driver. 151 */ 152 public synchronized void onComplete() { 153 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete"); 154 if (this.isShuttingDownOrFailing()) { 155 LOG.log(Level.WARNING, "Ignoring second call to onComplete()"); 156 } else { 157 LOG.log(Level.INFO, "Clean shutdown of the Driver."); 158 if (LOG.isLoggable(Level.FINEST)) { 159 LOG.log(Level.FINEST, "Callstack: ", new Exception()); 160 } 161 this.clock.close(); 162 this.setStatus(DriverStatus.SHUTTING_DOWN); 163 } 164 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete"); 165 166 } 167 168 /** 169 * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext(). 170 * 171 * @param exception 172 */ 173 public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) { 174 if (this.isNotShuttingDownOrFailing()) { 175 LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " + 176 "This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus); 177 } 178 if (this.driverTerminationHasBeenCommunicatedToClient) { 179 LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call"); 180 } else { 181 // Log the shutdown situation 182 if (this.shutdownCause.isPresent()) { 183 LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get()); 184 } 185 if (exception.isPresent()) { 186 LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get()); 187 } 188 if (this.shutdownCause.isPresent() && exception.isPresent()) { 189 LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was " + 190 "an exception during clock.close(). Only the first exception will be sent to the client"); 191 } 192 193 if (this.shutdownCause.isPresent()) { 194 // Send the earlier exception, if there was one 195 this.clientConnection.send(getJobEndingMessage(this.shutdownCause)); 196 } else { 197 // Send the exception passed, if there was one. 198 this.clientConnection.send(getJobEndingMessage(exception)); 199 } 200 this.driverTerminationHasBeenCommunicatedToClient = true; 201 } 202 } 203 204 public synchronized boolean isShuttingDownOrFailing() { 205 return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus) 206 || DriverStatus.FAILING.equals(this.driverStatus); 207 } 208 209 private synchronized boolean isNotShuttingDownOrFailing() { 210 return !isShuttingDownOrFailing(); 211 } 212 213 /** 214 * Helper method to set the status. This also checks whether the transition from the current status to the new one is 215 * legal. 216 * 217 * @param newStatus 218 */ 219 private synchronized void setStatus(final DriverStatus newStatus) { 220 if (isLegalTransition(this.driverStatus, newStatus)) { 221 this.driverStatus = newStatus; 222 } else { 223 LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'"); 224 } 225 } 226 227 /** 228 * @param exception the exception that ended the Driver, if any. 229 * @return message to be sent to the client at the end of the job. 230 */ 231 private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) { 232 final ReefServiceProtos.JobStatusProto message; 233 if (exception.isPresent()) { 234 message = ReefServiceProtos.JobStatusProto.newBuilder() 235 .setIdentifier(this.jobIdentifier) 236 .setState(ReefServiceProtos.State.FAILED) 237 .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get()))) 238 .build(); 239 } else { 240 message = ReefServiceProtos.JobStatusProto.newBuilder() 241 .setIdentifier(this.jobIdentifier) 242 .setState(ReefServiceProtos.State.DONE) 243 .build(); 244 } 245 return message; 246 } 247 248 /** 249 * @return The message to be sent through the ClientConnection when in state INIT. 250 */ 251 private synchronized ReefServiceProtos.JobStatusProto getInitMessage() { 252 return ReefServiceProtos.JobStatusProto.newBuilder() 253 .setIdentifier(this.jobIdentifier) 254 .setState(ReefServiceProtos.State.INIT) 255 .build(); 256 } 257 258 /** 259 * @return The message to be sent through the ClientConnection when in state RUNNING. 260 */ 261 private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() { 262 return ReefServiceProtos.JobStatusProto.newBuilder() 263 .setIdentifier(this.jobIdentifier) 264 .setState(ReefServiceProtos.State.RUNNING) 265 .build(); 266 } 267}