001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.proto.ReefServiceProtos;
023import org.apache.reef.runtime.common.driver.client.ClientConnection;
024import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
025import org.apache.reef.runtime.common.utils.ExceptionCodec;
026import org.apache.reef.tang.annotations.Parameter;
027import org.apache.reef.util.Optional;
028import org.apache.reef.wake.time.Clock;
029
030import javax.inject.Inject;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Manages the Driver's status.
036 * Communicates status changes to the client and shuts down the runtime clock on shutdown.
037 */
038public final class DriverStatusManager {
039
040  private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName());
041  private static final String CLASS_NAME = DriverStatusManager.class.getCanonicalName();
042
043  private final Clock clock;
044  private final ClientConnection clientConnection;
045  private final String jobIdentifier;
046  private final ExceptionCodec exceptionCodec;
047
048  private DriverStatus driverStatus = DriverStatus.PRE_INIT;
049  private Optional<Throwable> shutdownCause = Optional.empty();
050  private boolean driverTerminationHasBeenCommunicatedToClient = false;
051
052  /**
053   * Build a new status manager. This is done automatically by Tang.
054   * @param clock runtime event loop to shut down on completion or error.
055   * @param clientConnection Connection to the job client. Send init, running, and job ending messages.
056   * @param jobIdentifier String job ID.
057   * @param exceptionCodec codec to serialize the exception when sending job ending message to the client.
058   */
059  @Inject
060  private DriverStatusManager(
061      @Parameter(JobIdentifier.class) final String jobIdentifier,
062      final Clock clock,
063      final ClientConnection clientConnection,
064      final ExceptionCodec exceptionCodec) {
065
066    LOG.entering(CLASS_NAME, "<init>");
067
068    this.clock = clock;
069    this.clientConnection = clientConnection;
070    this.jobIdentifier = jobIdentifier;
071    this.exceptionCodec = exceptionCodec;
072
073    LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
074
075    LOG.exiting(CLASS_NAME, "<init>");
076  }
077
078  /**
079   * Changes the driver status to INIT and sends message to the client about the transition.
080   */
081  public synchronized void onInit() {
082
083    LOG.entering(CLASS_NAME, "onInit");
084
085    this.clientConnection.send(this.getInitMessage());
086    this.setStatus(DriverStatus.INIT);
087
088    LOG.exiting(CLASS_NAME, "onInit");
089  }
090
091  /**
092   * Changes the driver status to RUNNING and sends message to the client about the transition.
093   * If the driver is in status 'PRE_INIT', this first calls onInit();
094   */
095  public synchronized void onRunning() {
096
097    LOG.entering(CLASS_NAME, "onRunning");
098
099    if (this.driverStatus == DriverStatus.PRE_INIT) {
100      this.onInit();
101    }
102
103    this.clientConnection.send(this.getRunningMessage());
104    this.setStatus(DriverStatus.RUNNING);
105
106    LOG.exiting(CLASS_NAME, "onRunning");
107  }
108
109  /**
110   * End the Driver with an exception.
111   * @param exception Exception that causes the driver shutdown.
112   */
113  public synchronized void onError(final Throwable exception) {
114
115    LOG.entering(CLASS_NAME, "onError", exception);
116
117    if (this.isClosing()) {
118      LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception);
119    } else {
120      LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
121      this.shutdownCause = Optional.of(exception);
122      this.clock.stop(exception);
123      this.setStatus(DriverStatus.FAILING);
124    }
125
126    LOG.exiting(CLASS_NAME, "onError", exception);
127  }
128
129  /**
130   * Perform a clean shutdown of the Driver.
131   */
132  public synchronized void onComplete() {
133
134    LOG.entering(CLASS_NAME, "onComplete");
135
136    if (this.isClosing()) {
137      LOG.log(Level.WARNING, "Ignoring second call to onComplete()",
138          new Exception("Dummy exception to get the call stack"));
139    } else {
140
141      LOG.log(Level.INFO, "Clean shutdown of the Driver.");
142
143      if (LOG.isLoggable(Level.FINEST)) {
144        LOG.log(Level.FINEST, "Call stack: ",
145            new Exception("Dummy exception to get the call stack"));
146      }
147
148      this.clock.close();
149      this.setStatus(DriverStatus.SHUTTING_DOWN);
150    }
151
152    LOG.exiting(CLASS_NAME, "onComplete");
153  }
154
155  /**
156   * Sends the final message to the client. This is used by DriverRuntimeStopHandler.onNext().
157   * @param exception Exception that caused the job to end (optional).
158   */
159  public synchronized void onRuntimeStop(final Optional<Throwable> exception) {
160    this.sendJobEndingMessageToClient(exception);
161  }
162
163  /**
164   * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext().
165   * @param exception Exception that caused the job to end (can be absent).
166   * @deprecated TODO[JIRA REEF-1548] Do not use DriverStatusManager as a proxy to the job client.
167   * After release 0.16, make this method private and use it inside onRuntimeStop() method instead.
168   */
169  @Deprecated
170  public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) {
171
172    if (!this.isClosing()) {
173      LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " +
174          "This is likely a illegal call to clock.close() at play. Current state: {0}", this.driverStatus);
175    }
176
177    if (this.driverTerminationHasBeenCommunicatedToClient) {
178      LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call");
179      return;
180    }
181
182    // Log the shutdown situation
183    if (this.shutdownCause.isPresent()) {
184      LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get());
185    }
186
187    if (exception.isPresent()) {
188      LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get());
189    }
190
191    if (this.shutdownCause.isPresent() && exception.isPresent()) {
192      LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was " +
193          "an exception during clock.close(). Only the first exception will be sent to the client");
194    }
195
196    // Send the earlier exception, if there was one. Otherwise, send the exception passed.
197    this.clientConnection.send(getJobEndingMessage(
198        this.shutdownCause.isPresent() ? this.shutdownCause : exception));
199
200    this.driverTerminationHasBeenCommunicatedToClient = true;
201  }
202
203  /**
204   * Check if the driver is in process of shutting down (either gracefully or due to an error).
205   * @return true if the driver is shutting down (gracefully or otherwise).
206   * @deprecated TODO[JIRA REEF-1560] Use isClosing() method instead. Remove after version 0.16
207   */
208  @Deprecated
209  public synchronized boolean isShuttingDownOrFailing() {
210    return this.isClosing();
211  }
212
213  /**
214   * Check if the driver is in process of shutting down (either gracefully or due to an error).
215   * @return true if the driver is shutting down (gracefully or otherwise).
216   */
217  public synchronized boolean isClosing() {
218    return this.driverStatus.isClosing();
219  }
220
221  /**
222   * Helper method to set the status.
223   * This also checks whether the transition from the current status to the new one is legal.
224   * @param toStatus Driver status to transition to.
225   */
226  private synchronized void setStatus(final DriverStatus toStatus) {
227    if (this.driverStatus.isLegalTransition(toStatus)) {
228      this.driverStatus = toStatus;
229    } else {
230      LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {this.driverStatus, toStatus});
231    }
232  }
233
234  /**
235   * @param exception the exception that ended the Driver, if any.
236   * @return message to be sent to the client at the end of the job.
237   */
238  private ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) {
239    if (exception.isPresent()) {
240      return ReefServiceProtos.JobStatusProto.newBuilder()
241          .setIdentifier(this.jobIdentifier)
242          .setState(ReefServiceProtos.State.FAILED)
243          .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
244          .build();
245    } else {
246      return ReefServiceProtos.JobStatusProto.newBuilder()
247          .setIdentifier(this.jobIdentifier)
248          .setState(ReefServiceProtos.State.DONE)
249          .build();
250    }
251  }
252
253  /**
254   * @return The message to be sent through the ClientConnection when in state INIT.
255   */
256  private ReefServiceProtos.JobStatusProto getInitMessage() {
257    return ReefServiceProtos.JobStatusProto.newBuilder()
258        .setIdentifier(this.jobIdentifier)
259        .setState(ReefServiceProtos.State.INIT)
260        .build();
261  }
262
263  /**
264   * @return The message to be sent through the ClientConnection when in state RUNNING.
265   */
266  private ReefServiceProtos.JobStatusProto getRunningMessage() {
267    return ReefServiceProtos.JobStatusProto.newBuilder()
268        .setIdentifier(this.jobIdentifier)
269        .setState(ReefServiceProtos.State.RUNNING)
270        .build();
271  }
272}