001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.driver.restart;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
025import org.apache.reef.driver.parameters.DriverRestartEvaluatorRecoverySeconds;
026import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers;
027import org.apache.reef.exception.DriverFatalRuntimeException;
028import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
029import org.apache.reef.runtime.common.driver.idle.IdleMessage;
030import org.apache.reef.tang.annotations.Parameter;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
033import org.apache.reef.wake.time.event.StartTime;
034
035import javax.inject.Inject;
036import java.util.*;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * The manager that handles aspects of driver restart such as determining whether the driver is in
042 * restart mode, what to do on restart, whether restart is completed, and others.
043 */
044@DriverSide
045@Private
046@Unstable
047public final class DriverRestartManager implements DriverIdlenessSource, AutoCloseable {
048
049  private static final String CLASS_NAME = DriverRestartManager.class.getName();
050  private static final Logger LOG = Logger.getLogger(CLASS_NAME);
051
052  private final Timer restartCompletedTimer = new Timer(this.getClass().getSimpleName() + ":Timer");
053
054  private final DriverRuntimeRestartManager driverRuntimeRestartManager;
055  private final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers;
056  private final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers;
057  private final int driverRestartEvaluatorRecoverySeconds;
058
059  private RestartEvaluators restartEvaluators;
060  private DriverRestartState state = DriverRestartState.NOT_RESTARTED;
061  private int resubmissionAttempts = 0;
062
063  @Inject
064  private DriverRestartManager(final DriverRuntimeRestartManager driverRuntimeRestartManager,
065                               @Parameter(DriverRestartEvaluatorRecoverySeconds.class)
066                               final int driverRestartEvaluatorRecoverySeconds,
067                               @Parameter(DriverRestartCompletedHandlers.class)
068                               final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
069                               @Parameter(ServiceDriverRestartCompletedHandlers.class)
070                               final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers) {
071    this.driverRuntimeRestartManager = driverRuntimeRestartManager;
072    this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
073    this.serviceDriverRestartCompletedHandlers = serviceDriverRestartCompletedHandlers;
074    if (driverRestartEvaluatorRecoverySeconds < 0) {
075      throw new IllegalArgumentException("driverRestartEvaluatorRecoverySeconds must be greater than 0.");
076    }
077
078    this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
079  }
080
081  /**
082   * Triggers the state machine if the application is a restart instance. Returns true
083   * @return true if the application is a restart instance.
084   * Can be already done with restart or in the process of restart.
085   */
086  public synchronized boolean detectRestart() {
087    if (this.state.hasNotRestarted()) {
088      resubmissionAttempts = driverRuntimeRestartManager.getResubmissionAttempts();
089
090      if (resubmissionAttempts > 0) {
091        // set the state machine in motion.
092        this.state = DriverRestartState.BEGAN;
093      }
094    }
095
096    return this.state.hasRestarted();
097  }
098
099  /**
100   * @return true if the driver is undergoing the process of restart.
101   */
102  public synchronized boolean isRestarting() {
103    return this.state.isRestarting();
104  }
105
106  /**
107   * Recovers the list of alive and failed evaluators and inform the driver restart handlers and inform the
108   * evaluator failure handlers based on the specific runtime. Also sets the expected amount of evaluators to report
109   * back as alive to the job driver.
110   */
111  public synchronized void onRestart(final StartTime startTime,
112                                     final List<EventHandler<DriverRestarted>> orderedHandlers) {
113    if (this.state == DriverRestartState.BEGAN) {
114      restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators();
115      final DriverRestarted restartedInfo = new DriverRestartedImpl(resubmissionAttempts, startTime, restartEvaluators);
116
117      for (final EventHandler<DriverRestarted> handler : orderedHandlers) {
118        handler.onNext(restartedInfo);
119      }
120
121      this.state = DriverRestartState.IN_PROGRESS;
122    } else {
123      final String errMsg = "Should not be setting the set of expected alive evaluators more than once.";
124      LOG.log(Level.SEVERE, errMsg);
125      throw new DriverFatalRuntimeException(errMsg);
126    }
127
128    driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
129
130    if (driverRestartEvaluatorRecoverySeconds != Integer.MAX_VALUE) {
131      // Don't use Clock here because if there is an event scheduled, the driver will not be idle, even if
132      // driver restart has already completed, and we cannot cancel the event.
133      restartCompletedTimer.schedule(new TimerTask() {
134        @Override
135        public void run() {
136          onDriverRestartCompleted(true);
137        }
138      }, driverRestartEvaluatorRecoverySeconds * 1000L);
139    }
140  }
141
142  /**
143   * @return The restart state of the specified evaluator. Returns {@link EvaluatorRestartState#NOT_EXPECTED}
144   * if the {@link DriverRestartManager} does not believe that it's an evaluator to be recovered.
145   */
146  public synchronized EvaluatorRestartState getEvaluatorRestartState(final String evaluatorId) {
147    if (this.state.hasNotRestarted()) {
148      return EvaluatorRestartState.NOT_EXPECTED;
149    }
150
151    return getStateOfPreviousEvaluator(evaluatorId);
152  }
153
154  /**
155   * @return The ResourceRecoverEvent of the specified evaluator. Throws a {@link DriverFatalRuntimeException} if
156   * the evaluator does not exist in the set of known evaluators.
157   */
158  public synchronized ResourceRecoverEvent getResourceRecoverEvent(final String evaluatorId) {
159    if (!this.restartEvaluators.contains(evaluatorId)) {
160      throw new DriverFatalRuntimeException("Unexpected evaluator [" + evaluatorId + "], should " +
161          "not have been recorded.");
162    }
163
164    return this.restartEvaluators.get(evaluatorId).getResourceRecoverEvent();
165  }
166
167  /**
168   * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run.
169   * @return true if the evaluator has been newly recovered.
170   */
171  public synchronized boolean onRecoverEvaluator(final String evaluatorId) {
172    if (getStateOfPreviousEvaluator(evaluatorId).isFailedOrNotExpected()) {
173      final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " not expected to be alive.";
174      LOG.log(Level.SEVERE, errMsg);
175      throw new DriverFatalRuntimeException(errMsg);
176    }
177
178    if (getStateOfPreviousEvaluator(evaluatorId) != EvaluatorRestartState.EXPECTED) {
179      LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " added to the set" +
180          " of recovered evaluators more than once. Ignoring second add...");
181      return false;
182    }
183
184    // set the status for this evaluator ID to be reported.
185    setEvaluatorReported(evaluatorId);
186
187    if (haveAllExpectedEvaluatorsReported()) {
188      onDriverRestartCompleted(false);
189    }
190
191    return true;
192  }
193
194  /**
195   * Records the evaluators when it is allocated. The implementation depends on the runtime.
196   * @param id The evaluator ID of the allocated evaluator.
197   */
198  public synchronized void recordAllocatedEvaluator(final String id) {
199    driverRuntimeRestartManager.recordAllocatedEvaluator(id);
200  }
201
202  /**
203   * Records a removed evaluator into the evaluator log. The implementation depends on the runtime.
204   * @param id The evaluator ID of the removed evaluator.
205   */
206  public synchronized void recordRemovedEvaluator(final String id) {
207    driverRuntimeRestartManager.recordRemovedEvaluator(id);
208  }
209
210  /**
211   * Signals to the {@link DriverRestartManager} that an evaluator has reported back after restart.
212   */
213  public synchronized void setEvaluatorReported(final String evaluatorId) {
214    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REPORTED);
215  }
216
217  /**
218   * Signals to the {@link DriverRestartManager} that an evaluator has had its recovery heartbeat processed.
219   */
220  public synchronized void setEvaluatorReregistered(final String evaluatorId) {
221    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REREGISTERED);
222  }
223
224  /**
225   * Signals to the {@link DriverRestartManager} that an evaluator has had its running task or active context processed.
226   */
227  public synchronized void setEvaluatorProcessed(final String evaluatorId) {
228    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED);
229  }
230
231  /**
232   * Signals to the {@link DriverRestartManager} that an expected evaluator has been expired.
233   */
234  public synchronized void setEvaluatorExpired(final String evaluatorId) {
235    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.EXPIRED);
236  }
237
238  private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final String evaluatorId) {
239    if (!this.restartEvaluators.contains(evaluatorId)) {
240      return EvaluatorRestartState.NOT_EXPECTED;
241    }
242
243    return this.restartEvaluators.get(evaluatorId).getEvaluatorRestartState();
244  }
245
246  private synchronized void setStateOfPreviousEvaluator(final String evaluatorId,
247                                                        final EvaluatorRestartState to) {
248    if (!restartEvaluators.contains(evaluatorId) ||
249        !restartEvaluators.get(evaluatorId).setEvaluatorRestartState(to)) {
250      throw evaluatorTransitionFailed(evaluatorId, to);
251    }
252  }
253
254  private synchronized DriverFatalRuntimeException evaluatorTransitionFailed(final String evaluatorId,
255                                                                             final EvaluatorRestartState to) {
256    if (!restartEvaluators.contains(evaluatorId)) {
257      return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is not expected.");
258    }
259
260    return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " wants to transition to state " +
261        "[" + to + "], but is in the illegal state [" +
262        restartEvaluators.get(evaluatorId).getEvaluatorRestartState() + "].");
263  }
264
265  private synchronized boolean haveAllExpectedEvaluatorsReported() {
266    for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) {
267      final EvaluatorRestartState restartState = getStateOfPreviousEvaluator(previousEvaluatorId);
268      if (restartState == EvaluatorRestartState.EXPECTED) {
269        return false;
270      }
271    }
272
273    return true;
274  }
275
276  /**
277   * Sets the driver restart status to be completed if not yet set and notifies the restart completed event handlers.
278   */
279  private synchronized void onDriverRestartCompleted(final boolean isTimedOut) {
280    if (this.state != DriverRestartState.COMPLETED) {
281      final Set<String> outstandingEvaluatorIds = getOutstandingEvaluatorsAndMarkExpired();
282      driverRuntimeRestartManager.informAboutEvaluatorFailures(outstandingEvaluatorIds);
283
284      this.state = DriverRestartState.COMPLETED;
285      final DriverRestartCompleted driverRestartCompleted = new DriverRestartCompletedImpl(
286          System.currentTimeMillis(), isTimedOut);
287
288      for (final EventHandler<DriverRestartCompleted> serviceRestartCompletedHandler
289          : this.serviceDriverRestartCompletedHandlers) {
290        serviceRestartCompletedHandler.onNext(driverRestartCompleted);
291      }
292
293      for (final EventHandler<DriverRestartCompleted> restartCompletedHandler : this.driverRestartCompletedHandlers) {
294        restartCompletedHandler.onNext(driverRestartCompleted);
295      }
296
297      LOG.log(Level.FINE, "Restart completed. Evaluators that have not reported back are: " + outstandingEvaluatorIds);
298    }
299
300    restartCompletedTimer.cancel();
301  }
302
303  /**
304   * Gets the outstanding evaluators that have not yet reported back and mark them as expired.
305   */
306  private Set<String> getOutstandingEvaluatorsAndMarkExpired() {
307    final Set<String> outstanding = new HashSet<>();
308    for (final String previousEvaluatorId : restartEvaluators.getEvaluatorIds()) {
309      if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.EXPECTED) {
310        outstanding.add(previousEvaluatorId);
311        setEvaluatorExpired(previousEvaluatorId);
312      }
313    }
314
315    return outstanding;
316  }
317
318  private Set<String> getFailedEvaluators() {
319    final Set<String> failed = new HashSet<>();
320    for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) {
321      if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.FAILED) {
322        failed.add(previousEvaluatorId);
323      }
324    }
325
326    return failed;
327  }
328
329  /**
330   * {@inheritDoc}
331   * @return True if not in process of restart. False otherwise.
332   */
333  @Override
334  public IdleMessage getIdleStatus() {
335    boolean idleState = !this.state.isRestarting();
336    final String idleMessage = idleState ? CLASS_NAME + " currently not in the process of restart." :
337        CLASS_NAME + " currently in the process of restart.";
338    return new IdleMessage(CLASS_NAME, idleMessage, idleState);
339  }
340
341  /**
342   * Close the restart timer.
343   */
344  @Override
345  public void close() {
346    LOG.log(Level.FINER, "Closing restart timer. Final state: {0}", this.state);
347    this.restartCompletedTimer.cancel();
348  }
349}