This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * <p>
010 * http://www.apache.org/licenses/LICENSE-2.0
011 * <p>
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.client;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.security.UserGroupInformation;
023import org.apache.hadoop.security.token.Token;
024import org.apache.hadoop.yarn.api.ApplicationConstants;
025import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
026import org.apache.hadoop.yarn.api.records.*;
027import org.apache.hadoop.yarn.client.api.YarnClient;
028import org.apache.hadoop.yarn.client.api.YarnClientApplication;
029import org.apache.hadoop.yarn.conf.YarnConfiguration;
030import org.apache.hadoop.yarn.exceptions.YarnException;
031import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
032import org.apache.reef.runtime.common.REEFLauncher;
033import org.apache.reef.runtime.common.files.ClasspathProvider;
034import org.apache.reef.runtime.common.files.REEFFileNames;
035import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
036import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
037import org.apache.reef.runtime.yarn.util.YarnTypes;
038
039import java.io.IOException;
040import java.util.*;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Helper code that wraps the YARN Client API for our purposes.
046 */
047public final class YarnSubmissionHelper implements AutoCloseable {
048
049  private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName());
050
051  private final YarnClient yarnClient;
052  private final GetNewApplicationResponse applicationResponse;
053  private final ApplicationSubmissionContext applicationSubmissionContext;
054  private final ApplicationId applicationId;
055  private final Map<String, LocalResource> resources = new HashMap<>();
056  private final ClasspathProvider classpath;
057  private final YarnProxyUser yarnProxyUser;
058  private final SecurityTokenProvider tokenProvider;
059  private final boolean isUnmanaged;
060  private final List<String> commandPrefixList;
061
062  private String driverStdoutFilePath;
063  private String driverStderrFilePath;
064  private Class launcherClazz = REEFLauncher.class;
065  private List<String> configurationFilePaths;
066
067  public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
068                              final REEFFileNames fileNames,
069                              final ClasspathProvider classpath,
070                              final YarnProxyUser yarnProxyUser,
071                              final SecurityTokenProvider tokenProvider,
072                              final boolean isUnmanaged,
073                              final List<String> commandPrefixList) throws IOException, YarnException {
074
075    this.classpath = classpath;
076    this.yarnProxyUser = yarnProxyUser;
077    this.isUnmanaged = isUnmanaged;
078
079    this.driverStdoutFilePath =
080        ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStdoutFileName();
081
082    this.driverStderrFilePath =
083        ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStderrFileName();
084
085    LOG.log(Level.FINE, "Initializing YARN Client");
086    this.yarnClient = YarnClient.createYarnClient();
087    this.yarnClient.init(yarnConfiguration);
088    this.yarnClient.start();
089    LOG.log(Level.FINE, "Initialized YARN Client");
090
091    LOG.log(Level.FINE, "Requesting Application ID from YARN.");
092    final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
093    this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
094    this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
095    this.applicationSubmissionContext.setUnmanagedAM(isUnmanaged);
096    this.applicationId = this.applicationSubmissionContext.getApplicationId();
097    this.tokenProvider = tokenProvider;
098    this.commandPrefixList = commandPrefixList;
099    this.configurationFilePaths = Collections.singletonList(fileNames.getDriverConfigurationPath());
100    LOG.log(Level.INFO, "YARN Application ID: {0}", this.applicationId);
101  }
102
103  public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
104                              final REEFFileNames fileNames,
105                              final ClasspathProvider classpath,
106                              final YarnProxyUser yarnProxyUser,
107                              final SecurityTokenProvider tokenProvider,
108                              final boolean isUnmanaged) throws IOException, YarnException {
109    this(yarnConfiguration, fileNames, classpath, yarnProxyUser, tokenProvider, isUnmanaged, null);
110  }
111
112  /**
113   *
114   * @return the application ID assigned by YARN.
115   */
116  public int getApplicationId() {
117    return this.applicationId.getId();
118  }
119
120  /**
121   *
122   * @return the application ID string representation assigned by YARN.
123   */
124  public String getStringApplicationId() {
125    return this.applicationId.toString();
126  }
127
128  /**
129   * Set the name of the application to be submitted.
130   * @param applicationName
131   * @return
132   */
133  public YarnSubmissionHelper setApplicationName(final String applicationName) {
134    applicationSubmissionContext.setApplicationName(applicationName);
135    return this;
136  }
137
138  /**
139   * Set the amount of memory to be allocated to the Driver.
140   * @param megabytes
141   * @return
142   */
143  public YarnSubmissionHelper setDriverMemory(final int megabytes) {
144    applicationSubmissionContext.setResource(Resource.newInstance(getMemory(megabytes), 1));
145    return this;
146  }
147
148  /**
149   * Add a file to be localized on the driver.
150   * @param resourceName
151   * @param resource
152   * @return
153   */
154  public YarnSubmissionHelper addLocalResource(final String resourceName, final LocalResource resource) {
155    resources.put(resourceName, resource);
156    return this;
157  }
158
159  /**
160   * Set the priority of the job.
161   * @param priority
162   * @return
163   */
164  public YarnSubmissionHelper setPriority(final int priority) {
165    this.applicationSubmissionContext.setPriority(Priority.newInstance(priority));
166    return this;
167  }
168
169  /**
170   * Set whether or not the resource manager should preserve evaluators across driver restarts.
171   * @param preserveEvaluators
172   * @return
173   */
174  public YarnSubmissionHelper setPreserveEvaluators(final boolean preserveEvaluators) {
175    if (preserveEvaluators) {
176      // when supported, set KeepContainersAcrossApplicationAttempts to be true
177      // so that when driver (AM) crashes, evaluators will still be running and we can recover later.
178      if (YarnTypes.isAtOrAfterVersion(YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE)) {
179        LOG.log(
180            Level.FINE,
181            "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported," +
182                " will set it to true.",
183            YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE);
184
185        applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
186      } else {
187        LOG.log(Level.WARNING,
188            "Hadoop version does not yet support KeepContainersAcrossApplicationAttempts. Driver restarts " +
189                "will not support recovering evaluators.");
190
191        applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
192      }
193    } else {
194      applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
195    }
196
197    return this;
198  }
199
200  /**
201   * Sets the maximum application attempts for the application.
202   * @param maxApplicationAttempts
203   * @return
204   */
205  public YarnSubmissionHelper setMaxApplicationAttempts(final int maxApplicationAttempts) {
206    applicationSubmissionContext.setMaxAppAttempts(maxApplicationAttempts);
207    return this;
208  }
209
210  /**
211   * Assign this job submission to a queue.
212   * @param queueName
213   * @return
214   */
215  public YarnSubmissionHelper setQueue(final String queueName) {
216    this.applicationSubmissionContext.setQueue(queueName);
217    return this;
218  }
219
220  /**
221   * Sets the launcher class for the job.
222   * @param launcherClass
223   * @return
224   */
225  public YarnSubmissionHelper setLauncherClass(final Class launcherClass) {
226    this.launcherClazz = launcherClass;
227    return this;
228  }
229
230  /**
231   * Sets the configuration file for the job.
232   * Note that this does not have to be Driver TANG configuration. In the bootstrap
233   * launch case, this can be the set of  the Avro files that supports the generation of a driver
234   * configuration file natively at the Launcher.
235   * @param configurationFilePaths
236   * @return
237   */
238  public YarnSubmissionHelper setConfigurationFilePaths(final List<String> configurationFilePaths) {
239    this.configurationFilePaths = configurationFilePaths;
240    return this;
241  }
242
243  /**
244   * Sets the Driver stdout file path.
245   * @param driverStdoutPath
246   * @return
247   */
248  public YarnSubmissionHelper setDriverStdoutPath(final String driverStdoutPath) {
249    this.driverStdoutFilePath = driverStdoutPath;
250    return this;
251  }
252
253  /**
254   * Sets the Driver stderr file path.
255   * @param driverStderrPath
256   * @return
257   */
258  public YarnSubmissionHelper setDriverStderrPath(final String driverStderrPath) {
259    this.driverStderrFilePath = driverStderrPath;
260    return this;
261  }
262
263  public void submit() throws IOException, YarnException {
264
265    // SET EXEC COMMAND
266    final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList)
267        .setConfigurationFilePaths(configurationFilePaths)
268        .setClassPath(this.classpath.getDriverClasspath())
269        .setMemory(this.applicationSubmissionContext.getResource().getMemory())
270        .setStandardOut(driverStdoutFilePath)
271        .setStandardErr(driverStderrFilePath)
272        .build();
273
274    if (this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts() &&
275        this.applicationSubmissionContext.getMaxAppAttempts() == 1) {
276      LOG.log(Level.WARNING, "Application will not be restarted even though preserve evaluators is set to true" +
277          " since the max application submissions is 1. Proceeding to submit application...");
278    }
279
280    final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext(
281        launchCommand, this.resources, tokenProvider.getTokens());
282    this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
283
284    LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId);
285
286    if (LOG.isLoggable(Level.INFO)) {
287      LOG.log(Level.INFO, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
288    }
289
290    this.yarnClient.submitApplication(applicationSubmissionContext);
291
292    if (this.isUnmanaged) {
293      // For Unmanaged AM mode, add a new app token to the
294      // current process so it can talk to the RM as an AM.
295      final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId);
296      this.yarnProxyUser.set("reef-proxy", UserGroupInformation.getCurrentUser(), token);
297      this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
298    }
299  }
300
301  /**
302   * Extract the desired driver memory from jobSubmissionProto.
303   * <p>
304   * returns maxMemory if that desired amount is more than maxMemory
305   */
306  private int getMemory(final int requestedMemory) {
307    final int maxMemory = applicationResponse.getMaximumResourceCapability().getMemory();
308    final int amMemory;
309
310    if (requestedMemory <= maxMemory) {
311      amMemory = requestedMemory;
312    } else {
313      LOG.log(Level.WARNING,
314          "Requested {0}MB of memory for the driver. " +
315              "The max on this YARN installation is {1}. " +
316              "Using {1} as the memory for the driver.",
317          new Object[]{requestedMemory, maxMemory});
318      amMemory = maxMemory;
319    }
320    return amMemory;
321  }
322
323  @Override
324  public void close() {
325    LOG.log(Level.FINE, "Closing YARN application: {0}", this.applicationId);
326    this.yarnClient.stop(); // same as yarnClient.close()
327  }
328}