001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.driver.restart; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers; 025import org.apache.reef.driver.parameters.DriverRestartEvaluatorRecoverySeconds; 026import org.apache.reef.driver.parameters.ServiceDriverRestartCompletedHandlers; 027import org.apache.reef.exception.DriverFatalRuntimeException; 028import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; 029import org.apache.reef.runtime.common.driver.idle.IdleMessage; 030import org.apache.reef.tang.annotations.Parameter; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent; 033import org.apache.reef.wake.time.event.StartTime; 034 035import javax.inject.Inject; 036import java.util.*; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * The manager that handles aspects of driver restart such as determining whether the driver is in 042 * restart mode, what to do on restart, whether restart is completed, and others. 043 */ 044@DriverSide 045@Private 046@Unstable 047public final class DriverRestartManager implements DriverIdlenessSource, AutoCloseable { 048 049 private static final String CLASS_NAME = DriverRestartManager.class.getName(); 050 private static final Logger LOG = Logger.getLogger(CLASS_NAME); 051 052 private final Timer restartCompletedTimer = new Timer(this.getClass().getSimpleName() + ":Timer"); 053 054 private final DriverRuntimeRestartManager driverRuntimeRestartManager; 055 private final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers; 056 private final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers; 057 private final int driverRestartEvaluatorRecoverySeconds; 058 059 private RestartEvaluators restartEvaluators; 060 private DriverRestartState state = DriverRestartState.NOT_RESTARTED; 061 private int resubmissionAttempts = 0; 062 063 @Inject 064 private DriverRestartManager(final DriverRuntimeRestartManager driverRuntimeRestartManager, 065 @Parameter(DriverRestartEvaluatorRecoverySeconds.class) 066 final int driverRestartEvaluatorRecoverySeconds, 067 @Parameter(DriverRestartCompletedHandlers.class) 068 final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers, 069 @Parameter(ServiceDriverRestartCompletedHandlers.class) 070 final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers) { 071 this.driverRuntimeRestartManager = driverRuntimeRestartManager; 072 this.driverRestartCompletedHandlers = driverRestartCompletedHandlers; 073 this.serviceDriverRestartCompletedHandlers = serviceDriverRestartCompletedHandlers; 074 if (driverRestartEvaluatorRecoverySeconds < 0) { 075 throw new IllegalArgumentException("driverRestartEvaluatorRecoverySeconds must be greater than 0."); 076 } 077 078 this.driverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds; 079 } 080 081 /** 082 * Triggers the state machine if the application is a restart instance. Returns true 083 * @return true if the application is a restart instance. 084 * Can be already done with restart or in the process of restart. 085 */ 086 public synchronized boolean detectRestart() { 087 if (this.state.hasNotRestarted()) { 088 resubmissionAttempts = driverRuntimeRestartManager.getResubmissionAttempts(); 089 090 if (resubmissionAttempts > 0) { 091 // set the state machine in motion. 092 this.state = DriverRestartState.BEGAN; 093 } 094 } 095 096 return this.state.hasRestarted(); 097 } 098 099 /** 100 * @return true if the driver is undergoing the process of restart. 101 */ 102 public synchronized boolean isRestarting() { 103 return this.state.isRestarting(); 104 } 105 106 /** 107 * Recovers the list of alive and failed evaluators and inform the driver restart handlers and inform the 108 * evaluator failure handlers based on the specific runtime. Also sets the expected amount of evaluators to report 109 * back as alive to the job driver. 110 */ 111 public synchronized void onRestart(final StartTime startTime, 112 final List<EventHandler<DriverRestarted>> orderedHandlers) { 113 if (this.state == DriverRestartState.BEGAN) { 114 restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators(); 115 final DriverRestarted restartedInfo = new DriverRestartedImpl(resubmissionAttempts, startTime, restartEvaluators); 116 117 for (final EventHandler<DriverRestarted> handler : orderedHandlers) { 118 handler.onNext(restartedInfo); 119 } 120 121 this.state = DriverRestartState.IN_PROGRESS; 122 } else { 123 final String errMsg = "Should not be setting the set of expected alive evaluators more than once."; 124 LOG.log(Level.SEVERE, errMsg); 125 throw new DriverFatalRuntimeException(errMsg); 126 } 127 128 driverRuntimeRestartManager.informAboutEvaluatorFailures(getFailedEvaluators()); 129 130 if (driverRestartEvaluatorRecoverySeconds != Integer.MAX_VALUE) { 131 // Don't use Clock here because if there is an event scheduled, the driver will not be idle, even if 132 // driver restart has already completed, and we cannot cancel the event. 133 restartCompletedTimer.schedule(new TimerTask() { 134 @Override 135 public void run() { 136 onDriverRestartCompleted(true); 137 } 138 }, driverRestartEvaluatorRecoverySeconds * 1000L); 139 } 140 } 141 142 /** 143 * @return The restart state of the specified evaluator. Returns {@link EvaluatorRestartState#NOT_EXPECTED} 144 * if the {@link DriverRestartManager} does not believe that it's an evaluator to be recovered. 145 */ 146 public synchronized EvaluatorRestartState getEvaluatorRestartState(final String evaluatorId) { 147 if (this.state.hasNotRestarted()) { 148 return EvaluatorRestartState.NOT_EXPECTED; 149 } 150 151 return getStateOfPreviousEvaluator(evaluatorId); 152 } 153 154 /** 155 * @return The ResourceRecoverEvent of the specified evaluator. Throws a {@link DriverFatalRuntimeException} if 156 * the evaluator does not exist in the set of known evaluators. 157 */ 158 public synchronized ResourceRecoverEvent getResourceRecoverEvent(final String evaluatorId) { 159 if (!this.restartEvaluators.contains(evaluatorId)) { 160 throw new DriverFatalRuntimeException("Unexpected evaluator [" + evaluatorId + "], should " + 161 "not have been recorded."); 162 } 163 164 return this.restartEvaluators.get(evaluatorId).getResourceRecoverEvent(); 165 } 166 167 /** 168 * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run. 169 * @return true if the evaluator has been newly recovered. 170 */ 171 public synchronized boolean onRecoverEvaluator(final String evaluatorId) { 172 if (getStateOfPreviousEvaluator(evaluatorId).isFailedOrNotExpected()) { 173 final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " not expected to be alive."; 174 LOG.log(Level.SEVERE, errMsg); 175 throw new DriverFatalRuntimeException(errMsg); 176 } 177 178 if (getStateOfPreviousEvaluator(evaluatorId) != EvaluatorRestartState.EXPECTED) { 179 LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " added to the set" + 180 " of recovered evaluators more than once. Ignoring second add..."); 181 return false; 182 } 183 184 // set the status for this evaluator ID to be reported. 185 setEvaluatorReported(evaluatorId); 186 187 if (haveAllExpectedEvaluatorsReported()) { 188 onDriverRestartCompleted(false); 189 } 190 191 return true; 192 } 193 194 /** 195 * Records the evaluators when it is allocated. The implementation depends on the runtime. 196 * @param id The evaluator ID of the allocated evaluator. 197 */ 198 public synchronized void recordAllocatedEvaluator(final String id) { 199 driverRuntimeRestartManager.recordAllocatedEvaluator(id); 200 } 201 202 /** 203 * Records a removed evaluator into the evaluator log. The implementation depends on the runtime. 204 * @param id The evaluator ID of the removed evaluator. 205 */ 206 public synchronized void recordRemovedEvaluator(final String id) { 207 driverRuntimeRestartManager.recordRemovedEvaluator(id); 208 } 209 210 /** 211 * Signals to the {@link DriverRestartManager} that an evaluator has reported back after restart. 212 */ 213 public synchronized void setEvaluatorReported(final String evaluatorId) { 214 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REPORTED); 215 } 216 217 /** 218 * Signals to the {@link DriverRestartManager} that an evaluator has had its recovery heartbeat processed. 219 */ 220 public synchronized void setEvaluatorReregistered(final String evaluatorId) { 221 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.REREGISTERED); 222 } 223 224 /** 225 * Signals to the {@link DriverRestartManager} that an evaluator has had its running task or active context processed. 226 */ 227 public synchronized void setEvaluatorProcessed(final String evaluatorId) { 228 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.PROCESSED); 229 } 230 231 /** 232 * Signals to the {@link DriverRestartManager} that an expected evaluator has been expired. 233 */ 234 public synchronized void setEvaluatorExpired(final String evaluatorId) { 235 setStateOfPreviousEvaluator(evaluatorId, EvaluatorRestartState.EXPIRED); 236 } 237 238 private synchronized EvaluatorRestartState getStateOfPreviousEvaluator(final String evaluatorId) { 239 if (!this.restartEvaluators.contains(evaluatorId)) { 240 return EvaluatorRestartState.NOT_EXPECTED; 241 } 242 243 return this.restartEvaluators.get(evaluatorId).getEvaluatorRestartState(); 244 } 245 246 private synchronized void setStateOfPreviousEvaluator(final String evaluatorId, 247 final EvaluatorRestartState to) { 248 if (!restartEvaluators.contains(evaluatorId) || 249 !restartEvaluators.get(evaluatorId).setEvaluatorRestartState(to)) { 250 throw evaluatorTransitionFailed(evaluatorId, to); 251 } 252 } 253 254 private synchronized DriverFatalRuntimeException evaluatorTransitionFailed(final String evaluatorId, 255 final EvaluatorRestartState to) { 256 if (!restartEvaluators.contains(evaluatorId)) { 257 return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is not expected."); 258 } 259 260 return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " wants to transition to state " + 261 "[" + to + "], but is in the illegal state [" + 262 restartEvaluators.get(evaluatorId).getEvaluatorRestartState() + "]."); 263 } 264 265 private synchronized boolean haveAllExpectedEvaluatorsReported() { 266 for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) { 267 final EvaluatorRestartState restartState = getStateOfPreviousEvaluator(previousEvaluatorId); 268 if (restartState == EvaluatorRestartState.EXPECTED) { 269 return false; 270 } 271 } 272 273 return true; 274 } 275 276 /** 277 * Sets the driver restart status to be completed if not yet set and notifies the restart completed event handlers. 278 */ 279 private synchronized void onDriverRestartCompleted(final boolean isTimedOut) { 280 if (this.state != DriverRestartState.COMPLETED) { 281 final Set<String> outstandingEvaluatorIds = getOutstandingEvaluatorsAndMarkExpired(); 282 driverRuntimeRestartManager.informAboutEvaluatorFailures(outstandingEvaluatorIds); 283 284 this.state = DriverRestartState.COMPLETED; 285 final DriverRestartCompleted driverRestartCompleted = new DriverRestartCompletedImpl( 286 System.currentTimeMillis(), isTimedOut); 287 288 for (final EventHandler<DriverRestartCompleted> serviceRestartCompletedHandler 289 : this.serviceDriverRestartCompletedHandlers) { 290 serviceRestartCompletedHandler.onNext(driverRestartCompleted); 291 } 292 293 for (final EventHandler<DriverRestartCompleted> restartCompletedHandler : this.driverRestartCompletedHandlers) { 294 restartCompletedHandler.onNext(driverRestartCompleted); 295 } 296 297 LOG.log(Level.FINE, "Restart completed. Evaluators that have not reported back are: " + outstandingEvaluatorIds); 298 } 299 300 restartCompletedTimer.cancel(); 301 } 302 303 /** 304 * Gets the outstanding evaluators that have not yet reported back and mark them as expired. 305 */ 306 private Set<String> getOutstandingEvaluatorsAndMarkExpired() { 307 final Set<String> outstanding = new HashSet<>(); 308 for (final String previousEvaluatorId : restartEvaluators.getEvaluatorIds()) { 309 if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.EXPECTED) { 310 outstanding.add(previousEvaluatorId); 311 setEvaluatorExpired(previousEvaluatorId); 312 } 313 } 314 315 return outstanding; 316 } 317 318 private Set<String> getFailedEvaluators() { 319 final Set<String> failed = new HashSet<>(); 320 for (final String previousEvaluatorId : this.restartEvaluators.getEvaluatorIds()) { 321 if (getStateOfPreviousEvaluator(previousEvaluatorId) == EvaluatorRestartState.FAILED) { 322 failed.add(previousEvaluatorId); 323 } 324 } 325 326 return failed; 327 } 328 329 /** 330 * {@inheritDoc} 331 * @return True if not in process of restart. False otherwise. 332 */ 333 @Override 334 public IdleMessage getIdleStatus() { 335 boolean idleState = !this.state.isRestarting(); 336 final String idleMessage = idleState ? CLASS_NAME + " currently not in the process of restart." : 337 CLASS_NAME + " currently in the process of restart."; 338 return new IdleMessage(CLASS_NAME, idleMessage, idleState); 339 } 340 341 /** 342 * Close the restart timer. 343 */ 344 @Override 345 public void close() { 346 LOG.log(Level.FINER, "Closing restart timer. Final state: {0}", this.state); 347 this.restartCompletedTimer.cancel(); 348 } 349}