This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.proto.ReefServiceProtos;
023import org.apache.reef.runtime.common.driver.client.ClientConnection;
024import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
025import org.apache.reef.runtime.common.utils.ExceptionCodec;
026import org.apache.reef.tang.annotations.Parameter;
027import org.apache.reef.util.Optional;
028import org.apache.reef.wake.time.Clock;
029
030import javax.inject.Inject;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Manages the Driver's status.
036 */
037public final class DriverStatusManager {
038  private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName());
039  private final Clock clock;
040  private final ClientConnection clientConnection;
041  private final String jobIdentifier;
042  private final ExceptionCodec exceptionCodec;
043  private DriverStatus driverStatus = DriverStatus.PRE_INIT;
044  private Optional<Throwable> shutdownCause = Optional.empty();
045  private boolean driverTerminationHasBeenCommunicatedToClient = false;
046
047
048  /**
049   * @param clock
050   * @param clientConnection
051   * @param jobIdentifier
052   * @param exceptionCodec
053   */
054  @Inject
055  DriverStatusManager(final Clock clock,
056                      final ClientConnection clientConnection,
057                      @Parameter(JobIdentifier.class) final String jobIdentifier,
058                      final ExceptionCodec exceptionCodec) {
059    LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>");
060    this.clock = clock;
061    this.clientConnection = clientConnection;
062    this.jobIdentifier = jobIdentifier;
063    this.exceptionCodec = exceptionCodec;
064    LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
065    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>");
066  }
067
068  /**
069   * Check whether a state transition 'from->to' is legal.
070   *
071   * @param from
072   * @param to
073   * @return
074   */
075  private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) {
076    switch (from) {
077    case PRE_INIT:
078      switch (to) {
079      case INIT:
080        return true;
081      default:
082        return false;
083      }
084    case INIT:
085      switch (to) {
086      case RUNNING:
087        return true;
088      default:
089        return false;
090      }
091    case RUNNING:
092      switch (to) {
093      case SHUTTING_DOWN:
094      case FAILING:
095        return true;
096      default:
097        return false;
098      }
099    case FAILING:
100    case SHUTTING_DOWN:
101      return false;
102    default:
103      throw new IllegalStateException("Unknown input state: " + from);
104    }
105  }
106
107  /**
108   * Changes the driver status to INIT and sends message to the client about the transition.
109   */
110  public synchronized void onInit() {
111    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit");
112    this.clientConnection.send(this.getInitMessage());
113    this.setStatus(DriverStatus.INIT);
114    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit");
115  }
116
117  /**
118   * Changes the driver status to RUNNING and sends message to the client about the transition.
119   * If the driver is in status 'PRE_INIT', this first calls onInit();
120   */
121  public synchronized void onRunning() {
122    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning");
123    if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
124      this.onInit();
125    }
126    this.clientConnection.send(this.getRunningMessage());
127    this.setStatus(DriverStatus.RUNNING);
128    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning");
129  }
130
131  /**
132   * End the Driver with an exception.
133   *
134   * @param exception
135   */
136  public synchronized void onError(final Throwable exception) {
137    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
138    if (this.isShuttingDownOrFailing()) {
139      LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception);
140    } else {
141      LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
142      this.shutdownCause = Optional.of(exception);
143      this.clock.stop(exception);
144      this.setStatus(DriverStatus.FAILING);
145    }
146    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
147  }
148
149  /**
150   * Perform a clean shutdown of the Driver.
151   */
152  @SuppressWarnings("checkstyle:constructorwithoutparams") // Exception() here captures the callstack
153  public synchronized void onComplete() {
154    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete");
155    if (this.isShuttingDownOrFailing()) {
156      LOG.log(Level.WARNING, "Ignoring second call to onComplete()");
157    } else {
158      LOG.log(Level.INFO, "Clean shutdown of the Driver.");
159      if (LOG.isLoggable(Level.FINEST)) {
160        LOG.log(Level.FINEST, "Callstack: ", new Exception());
161      }
162      this.clock.close();
163      this.setStatus(DriverStatus.SHUTTING_DOWN);
164    }
165    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete");
166
167  }
168
169  /**
170   * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext().
171   *
172   * @param exception
173   */
174  public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) {
175    if (this.isNotShuttingDownOrFailing()) {
176      LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " +
177          "This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus);
178    }
179    if (this.driverTerminationHasBeenCommunicatedToClient) {
180      LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call");
181    } else {
182      // Log the shutdown situation
183      if (this.shutdownCause.isPresent()) {
184        LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get());
185      }
186      if (exception.isPresent()) {
187        LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get());
188      }
189      if (this.shutdownCause.isPresent() && exception.isPresent()) {
190        LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was " +
191            "an exception during clock.close(). Only the first exception will be sent to the client");
192      }
193
194      if (this.shutdownCause.isPresent()) {
195        // Send the earlier exception, if there was one
196        this.clientConnection.send(getJobEndingMessage(this.shutdownCause));
197      } else {
198        // Send the exception passed, if there was one.
199        this.clientConnection.send(getJobEndingMessage(exception));
200      }
201      this.driverTerminationHasBeenCommunicatedToClient = true;
202    }
203  }
204
205  public synchronized boolean isShuttingDownOrFailing() {
206    return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
207        || DriverStatus.FAILING.equals(this.driverStatus);
208  }
209
210  private synchronized boolean isNotShuttingDownOrFailing() {
211    return !isShuttingDownOrFailing();
212  }
213
214  /**
215   * Helper method to set the status. This also checks whether the transition from the current status to the new one is
216   * legal.
217   *
218   * @param newStatus
219   */
220  private synchronized void setStatus(final DriverStatus newStatus) {
221    if (isLegalTransition(this.driverStatus, newStatus)) {
222      this.driverStatus = newStatus;
223    } else {
224      LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'");
225    }
226  }
227
228  /**
229   * @param exception the exception that ended the Driver, if any.
230   * @return message to be sent to the client at the end of the job.
231   */
232  private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) {
233    final ReefServiceProtos.JobStatusProto message;
234    if (exception.isPresent()) {
235      message = ReefServiceProtos.JobStatusProto.newBuilder()
236          .setIdentifier(this.jobIdentifier)
237          .setState(ReefServiceProtos.State.FAILED)
238          .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
239          .build();
240    } else {
241      message = ReefServiceProtos.JobStatusProto.newBuilder()
242          .setIdentifier(this.jobIdentifier)
243          .setState(ReefServiceProtos.State.DONE)
244          .build();
245    }
246    return message;
247  }
248
249  /**
250   * @return The message to be sent through the ClientConnection when in state INIT.
251   */
252  private synchronized ReefServiceProtos.JobStatusProto getInitMessage() {
253    return ReefServiceProtos.JobStatusProto.newBuilder()
254        .setIdentifier(this.jobIdentifier)
255        .setState(ReefServiceProtos.State.INIT)
256        .build();
257  }
258
259  /**
260   * @return The message to be sent through the ClientConnection when in state RUNNING.
261   */
262  private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() {
263    return ReefServiceProtos.JobStatusProto.newBuilder()
264        .setIdentifier(this.jobIdentifier)
265        .setState(ReefServiceProtos.State.RUNNING)
266        .build();
267  }
268}