This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver;
020
021import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
022import org.apache.hadoop.yarn.api.records.Container;
023import org.apache.reef.annotations.Unstable;
024import org.apache.reef.annotations.audience.DriverSide;
025import org.apache.reef.annotations.audience.Private;
026import org.apache.reef.annotations.audience.RuntimeAuthor;
027import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
028import org.apache.reef.driver.restart.EvaluatorRestartInfo;
029import org.apache.reef.driver.restart.RestartEvaluators;
030import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
031import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
032import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
033import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
034import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
035import org.apache.reef.runtime.yarn.util.YarnUtilities;
036import org.apache.reef.tang.annotations.Parameter;
037
038import javax.inject.Inject;
039import java.util.*;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043/**
044 * The implementation of restart manager for YARN. Handles evaluator preservation as well
045 * as evaluator recovery on YARN.
046 */
047@DriverSide
048@RuntimeAuthor
049@Private
050@Unstable
051public final class YarnDriverRuntimeRestartManager implements DriverRuntimeRestartManager {
052
053  private static final Logger LOG = Logger.getLogger(YarnDriverRuntimeRestartManager.class.getName());
054
055  /**
056   * The default resubmission attempts number returned if:
057   * 1) we are not able to determine the number of application attempts based on the environment provided by YARN.
058   * 2) we are able to receive a list of previous containers from the Resource Manager.
059   */
060  private static final int DEFAULT_RESTART_RESUBMISSION_ATTEMPTS = 1;
061
062  private final EvaluatorPreserver evaluatorPreserver;
063  private final ApplicationMasterRegistration registration;
064  private final REEFEventHandlers reefEventHandlers;
065  private final YarnContainerManager yarnContainerManager;
066  private final RackNameFormatter rackNameFormatter;
067
068  private Set<Container> previousContainers = null;
069
070  @Inject
071  private YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class)
072                                          final EvaluatorPreserver evaluatorPreserver,
073                                          final REEFEventHandlers reefEventHandlers,
074                                          final ApplicationMasterRegistration registration,
075                                          final YarnContainerManager yarnContainerManager,
076                                          final RackNameFormatter rackNameFormatter) {
077    this.registration = registration;
078    this.evaluatorPreserver = evaluatorPreserver;
079    this.reefEventHandlers = reefEventHandlers;
080    this.yarnContainerManager = yarnContainerManager;
081    this.rackNameFormatter = rackNameFormatter;
082  }
083
084  /**
085   * Determines the number of times the Driver has been submitted based on the container ID environment
086   * variable provided by YARN. If that fails, determine whether the application master is a restart
087   * based on the number of previous containers reported by YARN. In the failure scenario, returns 1 if restart, 0
088   * otherwise.
089   * @return positive value if the application master is a restarted instance, 0 otherwise.
090   */
091  @Override
092  public int getResubmissionAttempts() {
093    final String containerIdString = YarnUtilities.getContainerIdString();
094    final ApplicationAttemptId appAttemptID = YarnUtilities.getAppAttemptId(containerIdString);
095
096    if (containerIdString == null || appAttemptID == null) {
097      LOG.log(Level.WARNING, "Was not able to fetch application attempt, container ID is [" + containerIdString +
098          "] and application attempt is [" + appAttemptID + "]. Determining restart based on previous containers.");
099
100      if (this.isRestartByPreviousContainers()) {
101        LOG.log(Level.WARNING, "Driver is a restarted instance based on the number of previous containers. " +
102            "As returned by the Resource Manager. Returning default resubmission attempts " +
103            DEFAULT_RESTART_RESUBMISSION_ATTEMPTS + ".");
104        return DEFAULT_RESTART_RESUBMISSION_ATTEMPTS;
105      }
106
107      return 0;
108    }
109
110    int appAttempt = appAttemptID.getAttemptId();
111
112    LOG.log(Level.FINE, "Application attempt: " + appAttempt);
113    assert appAttempt > 0;
114    return appAttempt - 1;
115  }
116
117  /**
118   * Initializes the list of previous containers and determine whether or not this is an instance of restart
119   * based on information reported by the RM.
120   * @return true if previous containers is not empty.
121   */
122  private boolean isRestartByPreviousContainers() {
123    this.initializeListOfPreviousContainers();
124    return !this.previousContainers.isEmpty();
125  }
126
127  /**
128   * Initializes the list of previous containers as reported by YARN.
129   */
130  private synchronized void initializeListOfPreviousContainers() {
131    if (this.previousContainers == null) {
132      final List<Container> yarnPrevContainers =
133          this.registration.getRegistration().getContainersFromPreviousAttempts();
134
135      // If it's still null, create an empty list to indicate that it's not a restart.
136      if (yarnPrevContainers == null) {
137        this.previousContainers = Collections.unmodifiableSet(new HashSet<Container>());
138      } else {
139        this.previousContainers = Collections.unmodifiableSet(new HashSet<>(yarnPrevContainers));
140      }
141
142      yarnContainerManager.onContainersRecovered(this.previousContainers);
143    }
144  }
145
146  @Override
147  public void recordAllocatedEvaluator(final String id) {
148    this.evaluatorPreserver.recordAllocatedEvaluator(id);
149  }
150
151  @Override
152  public void recordRemovedEvaluator(final String id) {
153    this.evaluatorPreserver.recordRemovedEvaluator(id);
154  }
155
156  /**
157   * Used by {@link org.apache.reef.driver.restart.DriverRestartManager}.
158   * Gets the list of previous containers from the resource manager,
159   * compares that list to the YarnDriverRuntimeRestartManager's own list based on the evalutor preserver,
160   * and determine which evaluators are alive and which have failed during restart.
161   * @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for evaluators that have either failed or survived
162   * driver restart.
163   */
164  @Override
165  public RestartEvaluators getPreviousEvaluators() {
166    final RestartEvaluators.Builder restartEvaluatorsBuilder = RestartEvaluators.newBuilder();
167
168    this.initializeListOfPreviousContainers();
169
170    if (this.previousContainers != null && !this.previousContainers.isEmpty()) {
171      LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", this.previousContainers.size());
172      final Set<String> expectedContainers = this.evaluatorPreserver.recoverEvaluators();
173
174      final int numExpectedContainers = expectedContainers.size();
175      final int numPreviousContainers = this.previousContainers.size();
176      if (numExpectedContainers > numPreviousContainers) {
177        // we expected more containers to be alive, some containers must have died during driver restart
178        LOG.log(Level.WARNING, "Expected {0} containers while only {1} are still alive",
179            new Object[]{numExpectedContainers, numPreviousContainers});
180        final Set<String> previousContainersIds = new HashSet<>();
181        for (final Container container : this.previousContainers) {
182          previousContainersIds.add(container.getId().toString());
183        }
184        for (final String expectedContainerId : expectedContainers) {
185          if (!previousContainersIds.contains(expectedContainerId)) {
186            LOG.log(Level.WARNING, "Expected container [{0}] not alive, must have failed during driver restart.",
187                expectedContainerId);
188            restartEvaluatorsBuilder.addRestartEvaluator(
189                EvaluatorRestartInfo.createFailedEvaluatorInfo(expectedContainerId));
190          }
191        }
192      }
193      if (numExpectedContainers < numPreviousContainers) {
194        // somehow we have more alive evaluators, this should not happen
195        throw new RuntimeException("Expected only [" + numExpectedContainers + "] containers " +
196            "but resource manager believe that [" + numPreviousContainers + "] are outstanding for driver.");
197      }
198
199      //  numExpectedContainers == numPreviousContainers
200      for (final Container container : this.previousContainers) {
201        LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
202        if (!expectedContainers.contains(container.getId().toString())) {
203          throw new RuntimeException("Not expecting container " + container.getId().toString());
204        }
205
206        restartEvaluatorsBuilder.addRestartEvaluator(EvaluatorRestartInfo.createExpectedEvaluatorInfo(
207            ResourceEventImpl.newRecoveryBuilder().setIdentifier(container.getId().toString())
208                .setNodeId(container.getNodeId().toString()).setRackName(rackNameFormatter.getRackName(container))
209                .setResourceMemory(container.getResource().getMemory())
210                .setVirtualCores(container.getResource().getVirtualCores())
211                .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build()));
212      }
213    }
214
215    return restartEvaluatorsBuilder.build();
216  }
217
218  /**
219   * Calls the appropriate handler via REEFEventHandlers, which is a runtime specific implementation
220   * of the YARN runtime.
221   * @param evaluatorIds the set of evaluator IDs of failed evaluators during restart.
222   */
223  @Override
224  public void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
225    for (String evaluatorId : evaluatorIds) {
226      LOG.log(Level.WARNING, "Container [" + evaluatorId +
227          "] has failed during driver restart process, FailedEvaluatorHandler will be triggered, but " +
228          "no additional evaluator can be requested due to YARN-2433.");
229      // trigger a failed evaluator event
230      this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
231          .setIdentifier(evaluatorId)
232          .setState(State.FAILED)
233          .setExitCode(1)
234          .setDiagnostics("Container [" + evaluatorId + "] failed during driver restart process.")
235          .build());
236    }
237  }
238}