001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.driver.restart; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers; 025import org.apache.reef.driver.parameters.DriverRestartEvaluatorRecoverySeconds; 026import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers; 027import org.apache.reef.exception.DriverFatalRuntimeException; 028import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; 029import org.apache.reef.runtime.common.driver.idle.IdleMessage; 030import org.apache.reef.tang.annotations.Parameter; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent; 033import org.apache.reef.wake.time.event.StartTime; 034 035import javax.inject.Inject; 036import java.util.*; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * The manager that handles aspects of driver restart such as determining whether the driver is in 042 * restart mode, what to do on restart, whether restart is completed, and others. 043 */ 044@DriverSide 045@Private 046@Unstable 047public final class DriverRestartManager implements DriverIdlenessSource { 048 private static final String CLASS_NAME = DriverRestartManager.class.getName(); 049 private static final Logger LOG = Logger.getLogger(CLASS_NAME); 050 051 private final DriverRuntimeRestartManager driverRuntimeRestartManager; 052 private final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers; 053 private final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers; 054 private final int driverRestartEvaluatorRecoverySeconds; 055 private final Timer restartCompletedTimer = new Timer(); 056 057 private RestartEvaluators restartEvaluators; 058 private DriverRestartState state = DriverRestartState.NOT_RESTARTED; 059 private int resubmissionAttempts = 0; 060 061 @Inject 062 private DriverRestartManager(final DriverRuntimeRestartManager driverRuntimeRestartManager, 063 @Parameter(DriverRestartEvaluatorRecoverySeconds.class) 064 final int driverRestartEvaluatorRecoverySeconds, 065 @Parameter(DriverRestartCompletedHandlers.class) 066 final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers, 067 @Parameter(ServiceDriverRestartCompletedHandlers.class) 068 final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers) { 069 this.driverRuntimeRestartManager = driverRuntimeRestartManager; 070 this.driverRestartCompletedHandlers = driverRestartCompletedHandlers; 071 this.serviceDriverRestartCompletedHandlers = serviceDriverRestartCompletedHandlers; 072 if (driverRestartEvaluatorRecoverySeconds < 0) { 073 throw new IllegalArgumentException("driverRestartEvaluatorRecoverySeconds must be greater than 0."); 074 } 075 076 this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds; 077 } 078 079 /** 080 * Triggers the state machine if the application is a restart instance. Returns true 081 * @return true if the application is a restart instance. 082 * Can be already done with restart or in the process of restart. 083 */ 084 public synchronized boolean detectRestart() { 085 if (this.state.hasNotRestarted()) { 086 resubmissionAttempts = driverRuntimeRestartManager.getResubmissionAttempts(); 087 088 if (resubmissionAttempts > 0) { 089 // set the state machine in motion. 090 this.state = DriverRestartState.BEGAN; 091 } 092 } 093 094 return this.state.hasRestarted(); 095 } 096 097 /** 098 * @return true if the driver is undergoing the process of restart. 099 */ 100 public synchronized boolean isRestarting() { 101 return this.state.isRestarting(); 102 } 103 104 /** 105 * Recovers the list of alive and failed evaluators and inform the driver restart handlers and inform the 106 * evaluator failure handlers based on the specific runtime. Also sets the expected amount of evaluators to report 107 * back as alive to the job driver. 108 */ 109 public synchronized void onRestart(final StartTime startTime, 110 final List<EventHandler<DriverRestarted>> orderedHandlers) { 111 if (this.state == DriverRestartState.BEGAN) { 112 restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators(); 113 final DriverRestarted restartedInfo = new DriverRestartedImpl(resubmissionAttempts, startTime, restartEvaluators); 114 115 for (final EventHandler<DriverRestarted> handler : orderedHandlers) { 116 handler.onNext(restartedInfo); 117 } 118 119 this.state = DriverRestartState.IN_PROGRESS; 120 } else { 121 final String errMsg = "Should not be setting the set of expected alive evaluators more than once."; 122 LOG.log(Level.SEVERE, errMsg); 123 throw new DriverFatalRuntimeException(errMsg); 124 } 125 126 driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators()); 127 128 if (driverRestartEvaluatorRecoverySeconds != Integer.MAX_VALUE) { 129 // Don't use Clock here because if there is an event scheduled, the driver will not be idle, even if 130 // driver restart has already completed, and we cannot cancel the event. 131 restartCompletedTimer.schedule(new TimerTask() { 132 @Override 133 public void run() { 134 onDriverRestartCompleted(true); 135 } 136 }, driverRestartEvaluatorRecoverySeconds * 1000L); 137 } 138 } 139 140 /** 141 * @return The restart state of the specified evaluator. Returns {@link EvaluatorRestartState#NOT_EXPECTED} 142 * if the {@link DriverRestartManager} does not believe that it's an evaluator to be recovered. 143 */ 144 public synchronized EvaluatorRestartState getEvaluatorRestartState(final String evaluatorId) { 145 if (this.state.hasNotRestarted()) { 146 return EvaluatorRestartState.NOT_EXPECTED; 147 } 148 149 return getStateOfPreviousEvaluator(evaluatorId); 150 } 151 152 /** 153 * @return The ResourceRecoverEvent of the specified evaluator. Throws a {@link DriverFatalRuntimeException} if 154 * the evaluator does not exist in the set of known evaluators. 155 */ 156 public synchronized ResourceRecoverEvent getResourceRecoverEvent(final String evaluatorId) { 157 if (!this.restartEvaluators.contains(evaluatorId)) { 158 throw new DriverFatalRuntimeException("Unexpected evaluator [" + evaluatorId + "], should " + 159 "not have been recorded."); 160 } 161 162 return this.restartEvaluators.get(evaluatorId).getResourceRecoverEvent(); 163 } 164 165 /** 166 * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run. 167 * @return true if the evaluator has been newly recovered. 168 */ 169 public synchronized boolean onRecoverEvaluator(final String evaluatorId) { 170 if (getStateOfPreviousEvaluator(evaluatorId).isFailedOrNotExpected()) { 171 final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " not expected to be alive."; 172 LOG.log(Level.SEVERE, errMsg); 173 throw new DriverFatalRuntimeException(errMsg); 174 } 175 176 if (getStateOfPreviousEvaluator(evaluatorId) != EvaluatorRestartState.EXPECTED) { 177 LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " added to the set" + 178 " of recovered evaluators more than once. Ignoring second add..."); 179 return false; 180 } 181 182 // set the status for this evaluator ID to be reported. 183 setEvaluatorReported(evaluatorId); 184 185 if (haveAllExpectedEvaluatorsReported()) { 186 onDriverRestartCompleted(false); 187 } 188 189 return true; 190 } 191 192 /** 193 * Records the evaluators when it is allocated. The implementation depends on the runtime. 194 * @param id The evaluator ID of the allocated evaluator. 195 */ 196 public synchronized void recordAllocatedEvaluator(final String id) { 197 driverRuntimeRestartManager.recordAllocatedEvaluator(id); 198 } 199 200 /** 201 * Records a removed evaluator into the evaluator log. The implementation depends on the runtime. 202 * @param id The evaluator ID of the removed evaluator. 203 */ 204 public synchronized void recordRemovedEvaluator(final String id) { 205 driverRuntimeRestartManager.recordRemovedEvaluator(id); 206 } 207 208 /** 209 * Signals to the {@link DriverRestartManager} that an evaluator has reported back after restart. 210 */ 211 public synchronized void setEvaluatorReported(final String evaluatorId) { 212 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REPORTED); 213 } 214 215 /** 216 * Signals to the {@link DriverRestartManager} that an evaluator has had its recovery heartbeat processed. 217 */ 218 public synchronized void setEvaluatorReregistered(final String evaluatorId) { 219 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REREGISTERED); 220 } 221 222 /** 223 * Signals to the {@link DriverRestartManager} that an evaluator has had its running task or active context processed. 224 */ 225 public synchronized void setEvaluatorProcessed(final String evaluatorId) { 226 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED); 227 } 228 229 /** 230 * Signals to the {@link DriverRestartManager} that an expected evaluator has been expired. 231 */ 232 public synchronized void setEvaluatorExpired(final String evaluatorId) { 233 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.EXPIRED); 234 } 235 236 private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final String evaluatorId) { 237 if (!this.restartEvaluators.contains(evaluatorId)) { 238 return EvaluatorRestartState.NOT_EXPECTED; 239 } 240 241 return this.restartEvaluators.get(evaluatorId).getEvaluatorRestartState(); 242 } 243 244 private synchronized void setStateOfPreviousEvaluator(final String evaluatorId, 245 final EvaluatorRestartState to) { 246 if (!restartEvaluators.contains(evaluatorId) || 247 !restartEvaluators.get(evaluatorId).setEvaluatorRestartState(to)) { 248 throw evaluatorTransitionFailed(evaluatorId, to); 249 } 250 } 251 252 private synchronized DriverFatalRuntimeException evaluatorTransitionFailed(final String evaluatorId, 253 final EvaluatorRestartState to) { 254 if (!restartEvaluators.contains(evaluatorId)) { 255 return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is not expected."); 256 } 257 258 return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " wants to transition to state " + 259 "[" + to + "], but is in the illegal state [" + 260 restartEvaluators.get(evaluatorId).getEvaluatorRestartState() + "]."); 261 } 262 263 private synchronized boolean haveAllExpectedEvaluatorsReported() { 264 for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) { 265 final EvaluatorRestartState restartState = getStateOfPreviousEvaluator(previousEvaluatorId); 266 if (restartState == EvaluatorRestartState.EXPECTED) { 267 return false; 268 } 269 } 270 271 return true; 272 } 273 274 /** 275 * Sets the driver restart status to be completed if not yet set and notifies the restart completed event handlers. 276 */ 277 private synchronized void onDriverRestartCompleted(final boolean isTimedOut) { 278 if (this.state != DriverRestartState.COMPLETED) { 279 final Set<String> outstandingEvaluatorIds = getOutstandingEvaluatorsAndMarkExpired(); 280 driverRuntimeRestartManager.informAboutEvaluatorFailures(outstandingEvaluatorIds); 281 282 this.state = DriverRestartState.COMPLETED; 283 final DriverRestartCompleted driverRestartCompleted = new DriverRestartCompletedImpl( 284 System.currentTimeMillis(), isTimedOut); 285 286 for (final EventHandler<DriverRestartCompleted> serviceRestartCompletedHandler 287 : this.serviceDriverRestartCompletedHandlers) { 288 serviceRestartCompletedHandler.onNext(driverRestartCompleted); 289 } 290 291 for (final EventHandler<DriverRestartCompleted> restartCompletedHandler : this.driverRestartCompletedHandlers) { 292 restartCompletedHandler.onNext(driverRestartCompleted); 293 } 294 295 LOG.log(Level.FINE, "Restart completed. Evaluators that have not reported back are: " + outstandingEvaluatorIds); 296 } 297 298 restartCompletedTimer.cancel(); 299 } 300 301 /** 302 * Gets the outstanding evaluators that have not yet reported back and mark them as expired. 303 */ 304 private Set<String> getOutstandingEvaluatorsAndMarkExpired() { 305 final Set<String> outstanding = new HashSet<>(); 306 for (final String previousEvaluatorId : restartEvaluators.getEvaluatorIds()) { 307 if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.EXPECTED) { 308 outstanding.add(previousEvaluatorId); 309 setEvaluatorExpired(previousEvaluatorId); 310 } 311 } 312 313 return outstanding; 314 } 315 316 private Set<String> getFailedEvaluators() { 317 final Set<String> failed = new HashSet<>(); 318 for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) { 319 if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.FAILED) { 320 failed.add(previousEvaluatorId); 321 } 322 } 323 324 return failed; 325 } 326 327 /** 328 * {@inheritDoc} 329 * @return True if not in process of restart. False otherwise. 330 */ 331 @Override 332 public IdleMessage getIdleStatus() { 333 boolean idleState = !this.state.isRestarting(); 334 final String idleMessage = idleState ? CLASS_NAME + " currently not in the process of restart." : 335 CLASS_NAME + " currently in the process of restart."; 336 return new IdleMessage(CLASS_NAME, idleMessage, idleState); 337 } 338}