001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.driver.client.ClientConnection; 024import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; 025import org.apache.reef.runtime.common.utils.ExceptionCodec; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.util.Optional; 028import org.apache.reef.wake.time.Clock; 029 030import javax.inject.Inject; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Manages the Driver's status. 036 */ 037public final class DriverStatusManager { 038 private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName()); 039 private final Clock clock; 040 private final ClientConnection clientConnection; 041 private final String jobIdentifier; 042 private final ExceptionCodec exceptionCodec; 043 private DriverStatus driverStatus = DriverStatus.PRE_INIT; 044 private Optional<Throwable> shutdownCause = Optional.empty(); 045 private boolean driverTerminationHasBeenCommunicatedToClient = false; 046 047 048 /** 049 * @param clock 050 * @param clientConnection 051 * @param jobIdentifier 052 * @param exceptionCodec 053 */ 054 @Inject 055 DriverStatusManager(final Clock clock, 056 final ClientConnection clientConnection, 057 @Parameter(JobIdentifier.class) final String jobIdentifier, 058 final ExceptionCodec exceptionCodec) { 059 LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>"); 060 this.clock = clock; 061 this.clientConnection = clientConnection; 062 this.jobIdentifier = jobIdentifier; 063 this.exceptionCodec = exceptionCodec; 064 LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'"); 065 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>"); 066 } 067 068 /** 069 * Check whether a state transition 'from->to' is legal. 070 * 071 * @param from 072 * @param to 073 * @return 074 */ 075 private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) { 076 switch (from) { 077 case PRE_INIT: 078 switch (to) { 079 case INIT: 080 return true; 081 default: 082 return false; 083 } 084 case INIT: 085 switch (to) { 086 case RUNNING: 087 return true; 088 default: 089 return false; 090 } 091 case RUNNING: 092 switch (to) { 093 case SHUTTING_DOWN: 094 case FAILING: 095 return true; 096 default: 097 return false; 098 } 099 case FAILING: 100 case SHUTTING_DOWN: 101 return false; 102 default: 103 throw new IllegalStateException("Unknown input state: " + from); 104 } 105 } 106 107 /** 108 * Changes the driver status to INIT and sends message to the client about the transition. 109 */ 110 public synchronized void onInit() { 111 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit"); 112 this.clientConnection.send(this.getInitMessage()); 113 this.setStatus(DriverStatus.INIT); 114 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit"); 115 } 116 117 /** 118 * Changes the driver status to RUNNING and sends message to the client about the transition. 119 * If the driver is in status 'PRE_INIT', this first calls onInit(); 120 */ 121 public synchronized void onRunning() { 122 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning"); 123 if (this.driverStatus.equals(DriverStatus.PRE_INIT)) { 124 this.onInit(); 125 } 126 this.clientConnection.send(this.getRunningMessage()); 127 this.setStatus(DriverStatus.RUNNING); 128 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning"); 129 } 130 131 /** 132 * End the Driver with an exception. 133 * 134 * @param exception 135 */ 136 public synchronized void onError(final Throwable exception) { 137 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception}); 138 if (this.isShuttingDownOrFailing()) { 139 LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception); 140 } else { 141 LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception); 142 this.shutdownCause = Optional.of(exception); 143 this.clock.stop(exception); 144 this.setStatus(DriverStatus.FAILING); 145 } 146 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception}); 147 } 148 149 /** 150 * Perform a clean shutdown of the Driver. 151 */ 152 @SuppressWarnings("checkstyle:constructorwithoutparams") // Exception() here captures the callstack 153 public synchronized void onComplete() { 154 LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete"); 155 if (this.isShuttingDownOrFailing()) { 156 LOG.log(Level.WARNING, "Ignoring second call to onComplete()"); 157 } else { 158 LOG.log(Level.INFO, "Clean shutdown of the Driver."); 159 if (LOG.isLoggable(Level.FINEST)) { 160 LOG.log(Level.FINEST, "Callstack: ", new Exception()); 161 } 162 this.clock.close(); 163 this.setStatus(DriverStatus.SHUTTING_DOWN); 164 } 165 LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete"); 166 167 } 168 169 /** 170 * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext(). 171 * 172 * @param exception 173 */ 174 public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) { 175 if (this.isNotShuttingDownOrFailing()) { 176 LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " + 177 "This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus); 178 } 179 if (this.driverTerminationHasBeenCommunicatedToClient) { 180 LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call"); 181 } else { 182 // Log the shutdown situation 183 if (this.shutdownCause.isPresent()) { 184 LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get()); 185 } 186 if (exception.isPresent()) { 187 LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get()); 188 } 189 if (this.shutdownCause.isPresent() && exception.isPresent()) { 190 LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was " + 191 "an exception during clock.close(). Only the first exception will be sent to the client"); 192 } 193 194 if (this.shutdownCause.isPresent()) { 195 // Send the earlier exception, if there was one 196 this.clientConnection.send(getJobEndingMessage(this.shutdownCause)); 197 } else { 198 // Send the exception passed, if there was one. 199 this.clientConnection.send(getJobEndingMessage(exception)); 200 } 201 this.driverTerminationHasBeenCommunicatedToClient = true; 202 } 203 } 204 205 public synchronized boolean isShuttingDownOrFailing() { 206 return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus) 207 || DriverStatus.FAILING.equals(this.driverStatus); 208 } 209 210 private synchronized boolean isNotShuttingDownOrFailing() { 211 return !isShuttingDownOrFailing(); 212 } 213 214 /** 215 * Helper method to set the status. This also checks whether the transition from the current status to the new one is 216 * legal. 217 * 218 * @param newStatus 219 */ 220 private synchronized void setStatus(final DriverStatus newStatus) { 221 if (isLegalTransition(this.driverStatus, newStatus)) { 222 this.driverStatus = newStatus; 223 } else { 224 LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'"); 225 } 226 } 227 228 /** 229 * @param exception the exception that ended the Driver, if any. 230 * @return message to be sent to the client at the end of the job. 231 */ 232 private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) { 233 final ReefServiceProtos.JobStatusProto message; 234 if (exception.isPresent()) { 235 message = ReefServiceProtos.JobStatusProto.newBuilder() 236 .setIdentifier(this.jobIdentifier) 237 .setState(ReefServiceProtos.State.FAILED) 238 .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get()))) 239 .build(); 240 } else { 241 message = ReefServiceProtos.JobStatusProto.newBuilder() 242 .setIdentifier(this.jobIdentifier) 243 .setState(ReefServiceProtos.State.DONE) 244 .build(); 245 } 246 return message; 247 } 248 249 /** 250 * @return The message to be sent through the ClientConnection when in state INIT. 251 */ 252 private synchronized ReefServiceProtos.JobStatusProto getInitMessage() { 253 return ReefServiceProtos.JobStatusProto.newBuilder() 254 .setIdentifier(this.jobIdentifier) 255 .setState(ReefServiceProtos.State.INIT) 256 .build(); 257 } 258 259 /** 260 * @return The message to be sent through the ClientConnection when in state RUNNING. 261 */ 262 private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() { 263 return ReefServiceProtos.JobStatusProto.newBuilder() 264 .setIdentifier(this.jobIdentifier) 265 .setState(ReefServiceProtos.State.RUNNING) 266 .build(); 267 } 268}