001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver; 020 021import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 022import org.apache.hadoop.yarn.api.records.Container; 023import org.apache.reef.annotations.Unstable; 024import org.apache.reef.annotations.audience.DriverSide; 025import org.apache.reef.annotations.audience.Private; 026import org.apache.reef.annotations.audience.RuntimeAuthor; 027import org.apache.reef.driver.restart.DriverRuntimeRestartManager; 028import org.apache.reef.driver.restart.EvaluatorRestartInfo; 029import org.apache.reef.driver.restart.RestartEvaluators; 030import org.apache.reef.runtime.common.driver.EvaluatorPreserver; 031import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 032import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; 033import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; 034import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver; 035import org.apache.reef.runtime.yarn.util.YarnUtilities; 036import org.apache.reef.tang.annotations.Parameter; 037 038import javax.inject.Inject; 039import java.util.*; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043/** 044 * The implementation of restart manager for YARN. Handles evaluator preservation as well 045 * as evaluator recovery on YARN. 046 */ 047@DriverSide 048@RuntimeAuthor 049@Private 050@Unstable 051public final class YarnDriverRuntimeRestartManager implements DriverRuntimeRestartManager { 052 053 private static final Logger LOG = Logger.getLogger(YarnDriverRuntimeRestartManager.class.getName()); 054 055 /** 056 * The default resubmission attempts number returned if: 057 * 1) we are not able to determine the number of application attempts based on the environment provided by YARN. 058 * 2) we are able to receive a list of previous containers from the Resource Manager. 059 */ 060 private static final int DEFAULT_RESTART_RESUBMISSION_ATTEMPTS = 1; 061 062 private final EvaluatorPreserver evaluatorPreserver; 063 private final ApplicationMasterRegistration registration; 064 private final REEFEventHandlers reefEventHandlers; 065 private final YarnContainerManager yarnContainerManager; 066 private final RackNameFormatter rackNameFormatter; 067 068 private Set<Container> previousContainers = null; 069 070 @Inject 071 private YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class) 072 final EvaluatorPreserver evaluatorPreserver, 073 final REEFEventHandlers reefEventHandlers, 074 final ApplicationMasterRegistration registration, 075 final YarnContainerManager yarnContainerManager, 076 final RackNameFormatter rackNameFormatter) { 077 this.registration = registration; 078 this.evaluatorPreserver = evaluatorPreserver; 079 this.reefEventHandlers = reefEventHandlers; 080 this.yarnContainerManager = yarnContainerManager; 081 this.rackNameFormatter = rackNameFormatter; 082 } 083 084 /** 085 * Determines the number of times the Driver has been submitted based on the container ID environment 086 * variable provided by YARN. If that fails, determine whether the application master is a restart 087 * based on the number of previous containers reported by YARN. In the failure scenario, returns 1 if restart, 0 088 * otherwise. 089 * @return positive value if the application master is a restarted instance, 0 otherwise. 090 */ 091 @Override 092 public int getResubmissionAttempts() { 093 final String containerIdString = YarnUtilities.getContainerIdString(); 094 final ApplicationAttemptId appAttemptID = YarnUtilities.getAppAttemptId(containerIdString); 095 096 if (containerIdString == null || appAttemptID == null) { 097 LOG.log(Level.WARNING, "Was not able to fetch application attempt, container ID is [" + containerIdString + 098 "] and application attempt is [" + appAttemptID + "]. Determining restart based on previous containers."); 099 100 if (this.isRestartByPreviousContainers()) { 101 LOG.log(Level.WARNING, "Driver is a restarted instance based on the number of previous containers. " + 102 "As returned by the Resource Manager. Returning default resubmission attempts " + 103 DEFAULT_RESTART_RESUBMISSION_ATTEMPTS + "."); 104 return DEFAULT_RESTART_RESUBMISSION_ATTEMPTS; 105 } 106 107 return 0; 108 } 109 110 int appAttempt = appAttemptID.getAttemptId(); 111 112 LOG.log(Level.FINE, "Application attempt: " + appAttempt); 113 assert appAttempt > 0; 114 return appAttempt - 1; 115 } 116 117 /** 118 * Initializes the list of previous containers and determine whether or not this is an instance of restart 119 * based on information reported by the RM. 120 * @return true if previous containers is not empty. 121 */ 122 private boolean isRestartByPreviousContainers() { 123 this.initializeListOfPreviousContainers(); 124 return !this.previousContainers.isEmpty(); 125 } 126 127 /** 128 * Initializes the list of previous containers as reported by YARN. 129 */ 130 private synchronized void initializeListOfPreviousContainers() { 131 if (this.previousContainers == null) { 132 final List<Container> yarnPrevContainers = 133 this.registration.getRegistration().getContainersFromPreviousAttempts(); 134 135 // If it's still null, create an empty list to indicate that it's not a restart. 136 if (yarnPrevContainers == null) { 137 this.previousContainers = Collections.unmodifiableSet(new HashSet<Container>()); 138 } else { 139 this.previousContainers = Collections.unmodifiableSet(new HashSet<>(yarnPrevContainers)); 140 } 141 142 yarnContainerManager.onContainersRecovered(this.previousContainers); 143 } 144 } 145 146 @Override 147 public void recordAllocatedEvaluator(final String id) { 148 this.evaluatorPreserver.recordAllocatedEvaluator(id); 149 } 150 151 @Override 152 public void recordRemovedEvaluator(final String id) { 153 this.evaluatorPreserver.recordRemovedEvaluator(id); 154 } 155 156 /** 157 * Used by {@link org.apache.reef.driver.restart.DriverRestartManager}. 158 * Gets the list of previous containers from the resource manager, 159 * compares that list to the YarnDriverRuntimeRestartManager's own list based on the evalutor preserver, 160 * and determine which evaluators are alive and which have failed during restart. 161 * @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for evaluators that have either failed or survived 162 * driver restart. 163 */ 164 @Override 165 public RestartEvaluators getPreviousEvaluators() { 166 final RestartEvaluators.Builder restartEvaluatorsBuilder = RestartEvaluators.newBuilder(); 167 168 this.initializeListOfPreviousContainers(); 169 170 if (this.previousContainers != null && !this.previousContainers.isEmpty()) { 171 LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", this.previousContainers.size()); 172 final Set<String> expectedContainers = this.evaluatorPreserver.recoverEvaluators(); 173 174 final int numExpectedContainers = expectedContainers.size(); 175 final int numPreviousContainers = this.previousContainers.size(); 176 if (numExpectedContainers > numPreviousContainers) { 177 // we expected more containers to be alive, some containers must have died during driver restart 178 LOG.log(Level.WARNING, "Expected {0} containers while only {1} are still alive", 179 new Object[]{numExpectedContainers, numPreviousContainers}); 180 final Set<String> previousContainersIds = new HashSet<>(); 181 for (final Container container : this.previousContainers) { 182 previousContainersIds.add(container.getId().toString()); 183 } 184 for (final String expectedContainerId : expectedContainers) { 185 if (!previousContainersIds.contains(expectedContainerId)) { 186 LOG.log(Level.WARNING, "Expected container [{0}] not alive, must have failed during driver restart.", 187 expectedContainerId); 188 restartEvaluatorsBuilder.addRestartEvaluator( 189 EvaluatorRestartInfo.createFailedEvaluatorInfo(expectedContainerId)); 190 } 191 } 192 } 193 if (numExpectedContainers < numPreviousContainers) { 194 // somehow we have more alive evaluators, this should not happen 195 throw new RuntimeException("Expected only [" + numExpectedContainers + "] containers " + 196 "but resource manager believe that [" + numPreviousContainers + "] are outstanding for driver."); 197 } 198 199 // numExpectedContainers == numPreviousContainers 200 for (final Container container : this.previousContainers) { 201 LOG.log(Level.FINE, "Previous container: [{0}]", container.toString()); 202 if (!expectedContainers.contains(container.getId().toString())) { 203 throw new RuntimeException("Not expecting container " + container.getId().toString()); 204 } 205 206 restartEvaluatorsBuilder.addRestartEvaluator(EvaluatorRestartInfo.createExpectedEvaluatorInfo( 207 ResourceEventImpl.newRecoveryBuilder().setIdentifier(container.getId().toString()) 208 .setNodeId(container.getNodeId().toString()).setRackName(rackNameFormatter.getRackName(container)) 209 .setResourceMemory(container.getResource().getMemory()) 210 .setVirtualCores(container.getResource().getVirtualCores()) 211 .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build())); 212 } 213 } 214 215 return restartEvaluatorsBuilder.build(); 216 } 217 218 /** 219 * Calls the appropriate handler via REEFEventHandlers, which is a runtime specific implementation 220 * of the YARN runtime. 221 * @param evaluatorIds the set of evaluator IDs of failed evaluators during restart. 222 */ 223 @Override 224 public void informAboutEvaluatorFailures(final Set<String> evaluatorIds) { 225 for (String evaluatorId : evaluatorIds) { 226 LOG.log(Level.WARNING, "Container [" + evaluatorId + 227 "] has failed during driver restart process, FailedEvaluatorHandler will be triggered, but " + 228 "no additional evaluator can be requested due to YARN-2433."); 229 // trigger a failed evaluator event 230 this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder() 231 .setIdentifier(evaluatorId) 232 .setState(State.FAILED) 233 .setExitCode(1) 234 .setDiagnostics("Container [" + evaluatorId + "] failed during driver restart process.") 235 .build()); 236 } 237 } 238}