This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.proto.ReefServiceProtos;
023import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
024import org.apache.reef.runtime.common.driver.client.ClientConnection;
025import org.apache.reef.runtime.common.utils.ExceptionCodec;
026import org.apache.reef.tang.annotations.Parameter;
027import org.apache.reef.util.Optional;
028import org.apache.reef.wake.time.Clock;
029
030import javax.inject.Inject;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Manages the Driver's status.
036 */
037public final class DriverStatusManager {
038  private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName());
039  private final Clock clock;
040  private final ClientConnection clientConnection;
041  private final String jobIdentifier;
042  private final ExceptionCodec exceptionCodec;
043  private DriverStatus driverStatus = DriverStatus.PRE_INIT;
044  private Optional<Throwable> shutdownCause = Optional.empty();
045  private boolean driverTerminationHasBeenCommunicatedToClient = false;
046  private boolean restartCompleted = false;
047  private int numPreviousContainers = -1;
048  private int numRecoveredContainers = 0;
049
050
051  /**
052   * @param clock
053   * @param clientConnection
054   * @param jobIdentifier
055   * @param exceptionCodec
056   */
057  @Inject
058  DriverStatusManager(final Clock clock,
059                      final ClientConnection clientConnection,
060                      final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier,
061                      final ExceptionCodec exceptionCodec) {
062    LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>");
063    this.clock = clock;
064    this.clientConnection = clientConnection;
065    this.jobIdentifier = jobIdentifier;
066    this.exceptionCodec = exceptionCodec;
067    LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
068    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>");
069  }
070
071  /**
072   * Check whether a state transition 'from->to' is legal.
073   *
074   * @param from
075   * @param to
076   * @return
077   */
078  private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) {
079    switch (from) {
080      case PRE_INIT:
081        switch (to) {
082          case INIT:
083            return true;
084          default:
085            return false;
086        }
087      case INIT:
088        switch (to) {
089          case RUNNING:
090            return true;
091          default:
092            return false;
093        }
094      case RUNNING:
095        switch (to) {
096          case SHUTTING_DOWN:
097          case FAILING:
098            return true;
099          default:
100            return false;
101        }
102      case FAILING:
103      case SHUTTING_DOWN:
104        return false;
105      default:
106        throw new IllegalStateException("Unknown input state: " + from);
107    }
108  }
109
110  /**
111   * Changes the driver status to INIT and sends message to the client about the transition.
112   */
113  public synchronized void onInit() {
114    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit");
115    this.clientConnection.send(this.getInitMessage());
116    this.setStatus(DriverStatus.INIT);
117    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit");
118  }
119
120  /**
121   * Changes the driver status to RUNNING and sends message to the client about the transition.
122   * If the driver is in status 'PRE_INIT', this first calls onInit();
123   */
124  public synchronized void onRunning() {
125    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning");
126    if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
127      this.onInit();
128    }
129    this.clientConnection.send(this.getRunningMessage());
130    this.setStatus(DriverStatus.RUNNING);
131    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning");
132  }
133
134  /**
135   * End the Driver with an exception.
136   *
137   * @param exception
138   */
139  public synchronized void onError(final Throwable exception) {
140    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
141    if (this.isShuttingDownOrFailing()) {
142      LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception);
143    } else {
144      LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
145      this.shutdownCause = Optional.of(exception);
146      this.clock.stop();
147      this.setStatus(DriverStatus.FAILING);
148    }
149    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
150  }
151
152  /**
153   * Perform a clean shutdown of the Driver.
154   */
155  public synchronized void onComplete() {
156    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete");
157    if (this.isShuttingDownOrFailing()) {
158      LOG.log(Level.WARNING, "Ignoring second call to onComplete()");
159    } else {
160      LOG.log(Level.INFO, "Clean shutdown of the Driver.");
161      if (LOG.isLoggable(Level.FINEST)) {
162        LOG.log(Level.FINEST, "Callstack: ", new Exception());
163      }
164      this.clock.close();
165      this.setStatus(DriverStatus.SHUTTING_DOWN);
166    }
167    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete");
168
169  }
170
171  /**
172   * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext().
173   *
174   * @param exception
175   */
176  public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) {
177    if (this.isNotShuttingDownOrFailing()) {
178      LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus);
179    }
180    if (this.driverTerminationHasBeenCommunicatedToClient) {
181      LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call");
182    } else {
183      { // Log the shutdown situation
184        if (this.shutdownCause.isPresent()) {
185          LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get());
186        }
187        if (exception.isPresent()) {
188          LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get());
189        }
190        if (this.shutdownCause.isPresent() && exception.isPresent()) {
191          LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was an exception during clock.close(). Only the first exception will be sent to the client");
192        }
193      }
194      if (this.shutdownCause.isPresent()) {
195        // Send the earlier exception, if there was one
196        this.clientConnection.send(getJobEndingMessage(this.shutdownCause));
197      } else {
198        // Send the exception passed, if there was one.
199        this.clientConnection.send(getJobEndingMessage(exception));
200      }
201      this.driverTerminationHasBeenCommunicatedToClient = true;
202    }
203  }
204
205  /**
206   * Indicate that the Driver restart is complete. It is meant to be called exactly once during a restart and never
207   * during the ininital launch of a Driver.
208   */
209  public synchronized void setRestartCompleted() {
210    if (!this.isDriverRestart()) {
211      throw new IllegalStateException("setRestartCompleted() called in a Driver that is not, in fact, restarted.");
212    } else if (this.restartCompleted) {
213      LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
214    } else {
215      this.restartCompleted = true;
216    }
217  }
218
219  /**
220   * @return the number of Evaluators expected to check in from a previous run.
221   */
222  public synchronized int getNumPreviousContainers() {
223    return this.numPreviousContainers;
224  }
225
226  /**
227   * Set the number of containers to expect still active from a previous execution of the Driver in a restart situation.
228   * To be called exactly once during a driver restart.
229   *
230   * @param num
231   */
232  public synchronized void setNumPreviousContainers(final int num) {
233    if (this.numPreviousContainers >= 0) {
234      throw new IllegalStateException("Attempting to set the number of expected containers left from a previous container more than once.");
235    } else {
236      this.numPreviousContainers = num;
237    }
238  }
239
240  /**
241   * @return the number of Evaluators from a previous Driver that have checked in with the Driver in a restart situation.
242   */
243  public synchronized int getNumRecoveredContainers() {
244    return this.numRecoveredContainers;
245  }
246
247  /**
248   * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run.
249   */
250  public synchronized void oneContainerRecovered() {
251    this.numRecoveredContainers += 1;
252    if (this.numRecoveredContainers > this.numPreviousContainers) {
253      throw new IllegalStateException("Reconnected to" +
254          this.numRecoveredContainers +
255          "Evaluators while only expecting " +
256          this.numPreviousContainers);
257    }
258  }
259
260  /**
261   * @return true if the Driver is a restarted driver of an earlier attempt.
262   */
263  private synchronized boolean isDriverRestart() {
264    return this.getNumPreviousContainers() > 0;
265  }
266
267  public synchronized boolean isShuttingDownOrFailing() {
268    return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
269        || DriverStatus.FAILING.equals(this.driverStatus);
270  }
271
272  private synchronized boolean isNotShuttingDownOrFailing() {
273    return !isShuttingDownOrFailing();
274  }
275
276  /**
277   * Helper method to set the status. This also checks whether the transition from the current status to the new one is
278   * legal.
279   *
280   * @param newStatus
281   */
282  private synchronized void setStatus(final DriverStatus newStatus) {
283    if (isLegalTransition(this.driverStatus, newStatus)) {
284      this.driverStatus = newStatus;
285    } else {
286      LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'");
287    }
288  }
289
290  /**
291   * @param exception the exception that ended the Driver, if any.
292   * @return message to be sent to the client at the end of the job.
293   */
294  private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) {
295    final ReefServiceProtos.JobStatusProto message;
296    if (exception.isPresent()) {
297      message = ReefServiceProtos.JobStatusProto.newBuilder()
298          .setIdentifier(this.jobIdentifier)
299          .setState(ReefServiceProtos.State.FAILED)
300          .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
301          .build();
302    } else {
303      message = ReefServiceProtos.JobStatusProto.newBuilder()
304          .setIdentifier(this.jobIdentifier)
305          .setState(ReefServiceProtos.State.DONE)
306          .build();
307    }
308    return message;
309  }
310
311  /**
312   * @return The message to be sent through the ClientConnection when in state INIT.
313   */
314  private synchronized ReefServiceProtos.JobStatusProto getInitMessage() {
315    return ReefServiceProtos.JobStatusProto.newBuilder()
316        .setIdentifier(this.jobIdentifier)
317        .setState(ReefServiceProtos.State.INIT)
318        .build();
319  }
320
321  /**
322   * @return The message to be sent through the ClientConnection when in state RUNNING.
323   */
324  private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() {
325    return ReefServiceProtos.JobStatusProto.newBuilder()
326        .setIdentifier(this.jobIdentifier)
327        .setState(ReefServiceProtos.State.RUNNING)
328        .build();
329  }
330}