This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.driver.restart;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
025import org.apache.reef.driver.parameters.DriverRestartEvaluatorRecoverySeconds;
026import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers;
027import org.apache.reef.exception.DriverFatalRuntimeException;
028import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
029import org.apache.reef.runtime.common.driver.idle.IdleMessage;
030import org.apache.reef.tang.annotations.Parameter;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
033import org.apache.reef.wake.time.event.StartTime;
034
035import javax.inject.Inject;
036import java.util.*;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * The manager that handles aspects of driver restart such as determining whether the driver is in
042 * restart mode, what to do on restart, whether restart is completed, and others.
043 */
044@DriverSide
045@Private
046@Unstable
047public final class DriverRestartManager implements DriverIdlenessSource {
048  private static final String CLASS_NAME = DriverRestartManager.class.getName();
049  private static final Logger LOG = Logger.getLogger(CLASS_NAME);
050
051  private final DriverRuntimeRestartManager driverRuntimeRestartManager;
052  private final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers;
053  private final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers;
054  private final int driverRestartEvaluatorRecoverySeconds;
055  private final Timer restartCompletedTimer = new Timer();
056
057  private RestartEvaluators restartEvaluators;
058  private DriverRestartState state = DriverRestartState.NOT_RESTARTED;
059  private int resubmissionAttempts = 0;
060
061  @Inject
062  private DriverRestartManager(final DriverRuntimeRestartManager driverRuntimeRestartManager,
063                               @Parameter(DriverRestartEvaluatorRecoverySeconds.class)
064                               final int driverRestartEvaluatorRecoverySeconds,
065                               @Parameter(DriverRestartCompletedHandlers.class)
066                               final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
067                               @Parameter(ServiceDriverRestartCompletedHandlers.class)
068                               final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers) {
069    this.driverRuntimeRestartManager = driverRuntimeRestartManager;
070    this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
071    this.serviceDriverRestartCompletedHandlers = serviceDriverRestartCompletedHandlers;
072    if (driverRestartEvaluatorRecoverySeconds < 0) {
073      throw new IllegalArgumentException("driverRestartEvaluatorRecoverySeconds must be greater than 0.");
074    }
075
076    this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
077  }
078
079  /**
080   * Triggers the state machine if the application is a restart instance. Returns true
081   * @return true if the application is a restart instance.
082   * Can be already done with restart or in the process of restart.
083   */
084  public synchronized boolean detectRestart() {
085    if (this.state.hasNotRestarted()) {
086      resubmissionAttempts = driverRuntimeRestartManager.getResubmissionAttempts();
087
088      if (resubmissionAttempts > 0) {
089        // set the state machine in motion.
090        this.state = DriverRestartState.BEGAN;
091      }
092    }
093
094    return this.state.hasRestarted();
095  }
096
097  /**
098   * @return true if the driver is undergoing the process of restart.
099   */
100  public synchronized boolean isRestarting() {
101    return this.state.isRestarting();
102  }
103
104  /**
105   * Recovers the list of alive and failed evaluators and inform the driver restart handlers and inform the
106   * evaluator failure handlers based on the specific runtime. Also sets the expected amount of evaluators to report
107   * back as alive to the job driver.
108   */
109  public synchronized void onRestart(final StartTime startTime,
110                                     final List<EventHandler<DriverRestarted>> orderedHandlers) {
111    if (this.state == DriverRestartState.BEGAN) {
112      restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators();
113      final DriverRestarted restartedInfo = new DriverRestartedImpl(resubmissionAttempts, startTime, restartEvaluators);
114
115      for (final EventHandler<DriverRestarted> handler : orderedHandlers) {
116        handler.onNext(restartedInfo);
117      }
118
119      this.state = DriverRestartState.IN_PROGRESS;
120    } else {
121      final String errMsg = "Should not be setting the set of expected alive evaluators more than once.";
122      LOG.log(Level.SEVERE, errMsg);
123      throw new DriverFatalRuntimeException(errMsg);
124    }
125
126    driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators());
127
128    if (driverRestartEvaluatorRecoverySeconds != Integer.MAX_VALUE) {
129      // Don't use Clock here because if there is an event scheduled, the driver will not be idle, even if
130      // driver restart has already completed, and we cannot cancel the event.
131      restartCompletedTimer.schedule(new TimerTask() {
132        @Override
133        public void run() {
134          onDriverRestartCompleted(true);
135        }
136      }, driverRestartEvaluatorRecoverySeconds * 1000L);
137    }
138  }
139
140  /**
141   * @return The restart state of the specified evaluator. Returns {@link EvaluatorRestartState#NOT_EXPECTED}
142   * if the {@link DriverRestartManager} does not believe that it's an evaluator to be recovered.
143   */
144  public synchronized EvaluatorRestartState getEvaluatorRestartState(final String evaluatorId) {
145    if (this.state.hasNotRestarted()) {
146      return EvaluatorRestartState.NOT_EXPECTED;
147    }
148
149    return getStateOfPreviousEvaluator(evaluatorId);
150  }
151
152  /**
153   * @return The ResourceRecoverEvent of the specified evaluator. Throws a {@link DriverFatalRuntimeException} if
154   * the evaluator does not exist in the set of known evaluators.
155   */
156  public synchronized ResourceRecoverEvent getResourceRecoverEvent(final String evaluatorId) {
157    if (!this.restartEvaluators.contains(evaluatorId)) {
158      throw new DriverFatalRuntimeException("Unexpected evaluator [" + evaluatorId + "], should " +
159          "not have been recorded.");
160    }
161
162    return this.restartEvaluators.get(evaluatorId).getResourceRecoverEvent();
163  }
164
165  /**
166   * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run.
167   * @return true if the evaluator has been newly recovered.
168   */
169  public synchronized boolean onRecoverEvaluator(final String evaluatorId) {
170    if (getStateOfPreviousEvaluator(evaluatorId).isFailedOrNotExpected()) {
171      final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " not expected to be alive.";
172      LOG.log(Level.SEVERE, errMsg);
173      throw new DriverFatalRuntimeException(errMsg);
174    }
175
176    if (getStateOfPreviousEvaluator(evaluatorId) != EvaluatorRestartState.EXPECTED) {
177      LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " added to the set" +
178          " of recovered evaluators more than once. Ignoring second add...");
179      return false;
180    }
181
182    // set the status for this evaluator ID to be reported.
183    setEvaluatorReported(evaluatorId);
184
185    if (haveAllExpectedEvaluatorsReported()) {
186      onDriverRestartCompleted(false);
187    }
188
189    return true;
190  }
191
192  /**
193   * Records the evaluators when it is allocated. The implementation depends on the runtime.
194   * @param id The evaluator ID of the allocated evaluator.
195   */
196  public synchronized void recordAllocatedEvaluator(final String id) {
197    driverRuntimeRestartManager.recordAllocatedEvaluator(id);
198  }
199
200  /**
201   * Records a removed evaluator into the evaluator log. The implementation depends on the runtime.
202   * @param id The evaluator ID of the removed evaluator.
203   */
204  public synchronized void recordRemovedEvaluator(final String id) {
205    driverRuntimeRestartManager.recordRemovedEvaluator(id);
206  }
207
208  /**
209   * Signals to the {@link DriverRestartManager} that an evaluator has reported back after restart.
210   */
211  public synchronized void setEvaluatorReported(final String evaluatorId) {
212    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REPORTED);
213  }
214
215  /**
216   * Signals to the {@link DriverRestartManager} that an evaluator has had its recovery heartbeat processed.
217   */
218  public synchronized void setEvaluatorReregistered(final String evaluatorId) {
219    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REREGISTERED);
220  }
221
222  /**
223   * Signals to the {@link DriverRestartManager} that an evaluator has had its running task or active context processed.
224   */
225  public synchronized void setEvaluatorProcessed(final String evaluatorId) {
226    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED);
227  }
228
229  /**
230   * Signals to the {@link DriverRestartManager} that an expected evaluator has been expired.
231   */
232  public synchronized void setEvaluatorExpired(final String evaluatorId) {
233    setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.EXPIRED);
234  }
235
236  private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final String evaluatorId) {
237    if (!this.restartEvaluators.contains(evaluatorId)) {
238      return EvaluatorRestartState.NOT_EXPECTED;
239    }
240
241    return this.restartEvaluators.get(evaluatorId).getEvaluatorRestartState();
242  }
243
244  private synchronized void setStateOfPreviousEvaluator(final String evaluatorId,
245                                                        final EvaluatorRestartState to) {
246    if (!restartEvaluators.contains(evaluatorId) ||
247        !restartEvaluators.get(evaluatorId).setEvaluatorRestartState(to)) {
248      throw evaluatorTransitionFailed(evaluatorId, to);
249    }
250  }
251
252  private synchronized DriverFatalRuntimeException evaluatorTransitionFailed(final String evaluatorId,
253                                                                             final EvaluatorRestartState to) {
254    if (!restartEvaluators.contains(evaluatorId)) {
255      return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is not expected.");
256    }
257
258    return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " wants to transition to state " +
259        "[" + to + "], but is in the illegal state [" +
260        restartEvaluators.get(evaluatorId).getEvaluatorRestartState() + "].");
261  }
262
263  private synchronized boolean haveAllExpectedEvaluatorsReported() {
264    for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) {
265      final EvaluatorRestartState restartState = getStateOfPreviousEvaluator(previousEvaluatorId);
266      if (restartState == EvaluatorRestartState.EXPECTED) {
267        return false;
268      }
269    }
270
271    return true;
272  }
273
274  /**
275   * Sets the driver restart status to be completed if not yet set and notifies the restart completed event handlers.
276   */
277  private synchronized void onDriverRestartCompleted(final boolean isTimedOut) {
278    if (this.state != DriverRestartState.COMPLETED) {
279      final Set<String> outstandingEvaluatorIds = getOutstandingEvaluatorsAndMarkExpired();
280      driverRuntimeRestartManager.informAboutEvaluatorFailures(outstandingEvaluatorIds);
281
282      this.state = DriverRestartState.COMPLETED;
283      final DriverRestartCompleted driverRestartCompleted = new DriverRestartCompletedImpl(
284          System.currentTimeMillis(), isTimedOut);
285
286      for (final EventHandler<DriverRestartCompleted> serviceRestartCompletedHandler
287          : this.serviceDriverRestartCompletedHandlers) {
288        serviceRestartCompletedHandler.onNext(driverRestartCompleted);
289      }
290
291      for (final EventHandler<DriverRestartCompleted> restartCompletedHandler : this.driverRestartCompletedHandlers) {
292        restartCompletedHandler.onNext(driverRestartCompleted);
293      }
294
295      LOG.log(Level.FINE, "Restart completed. Evaluators that have not reported back are: " + outstandingEvaluatorIds);
296    }
297
298    restartCompletedTimer.cancel();
299  }
300
301  /**
302   * Gets the outstanding evaluators that have not yet reported back and mark them as expired.
303   */
304  private Set<String> getOutstandingEvaluatorsAndMarkExpired() {
305    final Set<String> outstanding = new HashSet<>();
306    for (final String previousEvaluatorId : restartEvaluators.getEvaluatorIds()) {
307      if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.EXPECTED) {
308        outstanding.add(previousEvaluatorId);
309        setEvaluatorExpired(previousEvaluatorId);
310      }
311    }
312
313    return outstanding;
314  }
315
316  private Set<String> getFailedEvaluators() {
317    final Set<String> failed = new HashSet<>();
318    for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) {
319      if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.FAILED) {
320        failed.add(previousEvaluatorId);
321      }
322    }
323
324    return failed;
325  }
326
327  /**
328   * {@inheritDoc}
329   * @return True if not in process of restart. False otherwise.
330   */
331  @Override
332  public IdleMessage getIdleStatus() {
333    boolean idleState = !this.state.isRestarting();
334    final String idleMessage = idleState ? CLASS_NAME + " currently not in the process of restart." :
335        CLASS_NAME + " currently in the process of restart.";
336    return new IdleMessage(CLASS_NAME, idleMessage, idleState);
337  }
338}