This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.proto.ReefServiceProtos;
023import org.apache.reef.runtime.common.driver.client.ClientConnection;
024import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
025import org.apache.reef.runtime.common.utils.ExceptionCodec;
026import org.apache.reef.tang.annotations.Parameter;
027import org.apache.reef.util.Optional;
028import org.apache.reef.wake.time.Clock;
029
030import javax.inject.Inject;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Manages the Driver's status.
036 */
037public final class DriverStatusManager {
038  private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName());
039  private final Clock clock;
040  private final ClientConnection clientConnection;
041  private final String jobIdentifier;
042  private final ExceptionCodec exceptionCodec;
043  private DriverStatus driverStatus = DriverStatus.PRE_INIT;
044  private Optional<Throwable> shutdownCause = Optional.empty();
045  private boolean driverTerminationHasBeenCommunicatedToClient = false;
046
047
048  /**
049   * @param clock
050   * @param clientConnection
051   * @param jobIdentifier
052   * @param exceptionCodec
053   */
054  @Inject
055  DriverStatusManager(final Clock clock,
056                      final ClientConnection clientConnection,
057                      @Parameter(JobIdentifier.class) final String jobIdentifier,
058                      final ExceptionCodec exceptionCodec) {
059    LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>");
060    this.clock = clock;
061    this.clientConnection = clientConnection;
062    this.jobIdentifier = jobIdentifier;
063    this.exceptionCodec = exceptionCodec;
064    LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
065    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>");
066  }
067
068  /**
069   * Check whether a state transition 'from->to' is legal.
070   *
071   * @param from
072   * @param to
073   * @return
074   */
075  private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) {
076    switch (from) {
077    case PRE_INIT:
078      switch (to) {
079      case INIT:
080        return true;
081      default:
082        return false;
083      }
084    case INIT:
085      switch (to) {
086      case RUNNING:
087        return true;
088      default:
089        return false;
090      }
091    case RUNNING:
092      switch (to) {
093      case SHUTTING_DOWN:
094      case FAILING:
095        return true;
096      default:
097        return false;
098      }
099    case FAILING:
100    case SHUTTING_DOWN:
101      return false;
102    default:
103      throw new IllegalStateException("Unknown input state: " + from);
104    }
105  }
106
107  /**
108   * Changes the driver status to INIT and sends message to the client about the transition.
109   */
110  public synchronized void onInit() {
111    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit");
112    this.clientConnection.send(this.getInitMessage());
113    this.setStatus(DriverStatus.INIT);
114    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit");
115  }
116
117  /**
118   * Changes the driver status to RUNNING and sends message to the client about the transition.
119   * If the driver is in status 'PRE_INIT', this first calls onInit();
120   */
121  public synchronized void onRunning() {
122    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning");
123    if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
124      this.onInit();
125    }
126    this.clientConnection.send(this.getRunningMessage());
127    this.setStatus(DriverStatus.RUNNING);
128    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning");
129  }
130
131  /**
132   * End the Driver with an exception.
133   *
134   * @param exception
135   */
136  public synchronized void onError(final Throwable exception) {
137    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
138    if (this.isShuttingDownOrFailing()) {
139      LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception);
140    } else {
141      LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
142      this.shutdownCause = Optional.of(exception);
143      this.clock.stop(exception);
144      this.setStatus(DriverStatus.FAILING);
145    }
146    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
147  }
148
149  /**
150   * Perform a clean shutdown of the Driver.
151   */
152  public synchronized void onComplete() {
153    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete");
154    if (this.isShuttingDownOrFailing()) {
155      LOG.log(Level.WARNING, "Ignoring second call to onComplete()");
156    } else {
157      LOG.log(Level.INFO, "Clean shutdown of the Driver.");
158      if (LOG.isLoggable(Level.FINEST)) {
159        LOG.log(Level.FINEST, "Callstack: ", new Exception());
160      }
161      this.clock.close();
162      this.setStatus(DriverStatus.SHUTTING_DOWN);
163    }
164    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete");
165
166  }
167
168  /**
169   * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext().
170   *
171   * @param exception
172   */
173  public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) {
174    if (this.isNotShuttingDownOrFailing()) {
175      LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " +
176          "This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus);
177    }
178    if (this.driverTerminationHasBeenCommunicatedToClient) {
179      LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call");
180    } else {
181      // Log the shutdown situation
182      if (this.shutdownCause.isPresent()) {
183        LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get());
184      }
185      if (exception.isPresent()) {
186        LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get());
187      }
188      if (this.shutdownCause.isPresent() && exception.isPresent()) {
189        LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was " +
190            "an exception during clock.close(). Only the first exception will be sent to the client");
191      }
192
193      if (this.shutdownCause.isPresent()) {
194        // Send the earlier exception, if there was one
195        this.clientConnection.send(getJobEndingMessage(this.shutdownCause));
196      } else {
197        // Send the exception passed, if there was one.
198        this.clientConnection.send(getJobEndingMessage(exception));
199      }
200      this.driverTerminationHasBeenCommunicatedToClient = true;
201    }
202  }
203
204  public synchronized boolean isShuttingDownOrFailing() {
205    return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
206        || DriverStatus.FAILING.equals(this.driverStatus);
207  }
208
209  private synchronized boolean isNotShuttingDownOrFailing() {
210    return !isShuttingDownOrFailing();
211  }
212
213  /**
214   * Helper method to set the status. This also checks whether the transition from the current status to the new one is
215   * legal.
216   *
217   * @param newStatus
218   */
219  private synchronized void setStatus(final DriverStatus newStatus) {
220    if (isLegalTransition(this.driverStatus, newStatus)) {
221      this.driverStatus = newStatus;
222    } else {
223      LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'");
224    }
225  }
226
227  /**
228   * @param exception the exception that ended the Driver, if any.
229   * @return message to be sent to the client at the end of the job.
230   */
231  private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) {
232    final ReefServiceProtos.JobStatusProto message;
233    if (exception.isPresent()) {
234      message = ReefServiceProtos.JobStatusProto.newBuilder()
235          .setIdentifier(this.jobIdentifier)
236          .setState(ReefServiceProtos.State.FAILED)
237          .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
238          .build();
239    } else {
240      message = ReefServiceProtos.JobStatusProto.newBuilder()
241          .setIdentifier(this.jobIdentifier)
242          .setState(ReefServiceProtos.State.DONE)
243          .build();
244    }
245    return message;
246  }
247
248  /**
249   * @return The message to be sent through the ClientConnection when in state INIT.
250   */
251  private synchronized ReefServiceProtos.JobStatusProto getInitMessage() {
252    return ReefServiceProtos.JobStatusProto.newBuilder()
253        .setIdentifier(this.jobIdentifier)
254        .setState(ReefServiceProtos.State.INIT)
255        .build();
256  }
257
258  /**
259   * @return The message to be sent through the ClientConnection when in state RUNNING.
260   */
261  private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() {
262    return ReefServiceProtos.JobStatusProto.newBuilder()
263        .setIdentifier(this.jobIdentifier)
264        .setState(ReefServiceProtos.State.RUNNING)
265        .build();
266  }
267}