001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p> 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.client; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.security.UserGroupInformation; 023import org.apache.hadoop.security.token.Token; 024import org.apache.hadoop.yarn.api.ApplicationConstants; 025import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 026import org.apache.hadoop.yarn.api.records.*; 027import org.apache.hadoop.yarn.client.api.YarnClient; 028import org.apache.hadoop.yarn.client.api.YarnClientApplication; 029import org.apache.hadoop.yarn.conf.YarnConfiguration; 030import org.apache.hadoop.yarn.exceptions.YarnException; 031import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 032import org.apache.reef.runtime.common.REEFLauncher; 033import org.apache.reef.runtime.common.files.ClasspathProvider; 034import org.apache.reef.runtime.common.files.REEFFileNames; 035import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 036import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; 037import org.apache.reef.runtime.yarn.util.YarnTypes; 038 039import java.io.IOException; 040import java.util.*; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Helper code that wraps the YARN Client API for our purposes. 046 */ 047public final class YarnSubmissionHelper implements AutoCloseable { 048 049 private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName()); 050 051 private final YarnClient yarnClient; 052 private final GetNewApplicationResponse applicationResponse; 053 private final ApplicationSubmissionContext applicationSubmissionContext; 054 private final ApplicationId applicationId; 055 private final Map<String, LocalResource> resources = new HashMap<>(); 056 private final ClasspathProvider classpath; 057 private final YarnProxyUser yarnProxyUser; 058 private final SecurityTokenProvider tokenProvider; 059 private final boolean isUnmanaged; 060 private final List<String> commandPrefixList; 061 062 private String driverStdoutFilePath; 063 private String driverStderrFilePath; 064 private Class launcherClazz = REEFLauncher.class; 065 private List<String> configurationFilePaths; 066 067 public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, 068 final REEFFileNames fileNames, 069 final ClasspathProvider classpath, 070 final YarnProxyUser yarnProxyUser, 071 final SecurityTokenProvider tokenProvider, 072 final boolean isUnmanaged, 073 final List<String> commandPrefixList) throws IOException, YarnException { 074 075 this.classpath = classpath; 076 this.yarnProxyUser = yarnProxyUser; 077 this.isUnmanaged = isUnmanaged; 078 079 this.driverStdoutFilePath = 080 ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStdoutFileName(); 081 082 this.driverStderrFilePath = 083 ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStderrFileName(); 084 085 LOG.log(Level.FINE, "Initializing YARN Client"); 086 this.yarnClient = YarnClient.createYarnClient(); 087 this.yarnClient.init(yarnConfiguration); 088 this.yarnClient.start(); 089 LOG.log(Level.FINE, "Initialized YARN Client"); 090 091 LOG.log(Level.FINE, "Requesting Application ID from YARN."); 092 final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication(); 093 this.applicationResponse = yarnClientApplication.getNewApplicationResponse(); 094 this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext(); 095 this.applicationSubmissionContext.setUnmanagedAM(isUnmanaged); 096 this.applicationId = this.applicationSubmissionContext.getApplicationId(); 097 this.tokenProvider = tokenProvider; 098 this.commandPrefixList = commandPrefixList; 099 this.configurationFilePaths = Collections.singletonList(fileNames.getDriverConfigurationPath()); 100 LOG.log(Level.INFO, "YARN Application ID: {0}", this.applicationId); 101 } 102 103 public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, 104 final REEFFileNames fileNames, 105 final ClasspathProvider classpath, 106 final YarnProxyUser yarnProxyUser, 107 final SecurityTokenProvider tokenProvider, 108 final boolean isUnmanaged) throws IOException, YarnException { 109 this(yarnConfiguration, fileNames, classpath, yarnProxyUser, tokenProvider, isUnmanaged, null); 110 } 111 112 /** 113 * 114 * @return the application ID assigned by YARN. 115 */ 116 public int getApplicationId() { 117 return this.applicationId.getId(); 118 } 119 120 /** 121 * 122 * @return the application ID string representation assigned by YARN. 123 */ 124 public String getStringApplicationId() { 125 return this.applicationId.toString(); 126 } 127 128 /** 129 * Set the name of the application to be submitted. 130 * @param applicationName 131 * @return 132 */ 133 public YarnSubmissionHelper setApplicationName(final String applicationName) { 134 applicationSubmissionContext.setApplicationName(applicationName); 135 return this; 136 } 137 138 /** 139 * Set the amount of memory to be allocated to the Driver. 140 * @param megabytes 141 * @return 142 */ 143 public YarnSubmissionHelper setDriverMemory(final int megabytes) { 144 applicationSubmissionContext.setResource(Resource.newInstance(getMemory(megabytes), 1)); 145 return this; 146 } 147 148 /** 149 * Add a file to be localized on the driver. 150 * @param resourceName 151 * @param resource 152 * @return 153 */ 154 public YarnSubmissionHelper addLocalResource(final String resourceName, final LocalResource resource) { 155 resources.put(resourceName, resource); 156 return this; 157 } 158 159 /** 160 * Set the priority of the job. 161 * @param priority 162 * @return 163 */ 164 public YarnSubmissionHelper setPriority(final int priority) { 165 this.applicationSubmissionContext.setPriority(Priority.newInstance(priority)); 166 return this; 167 } 168 169 /** 170 * Set whether or not the resource manager should preserve evaluators across driver restarts. 171 * @param preserveEvaluators 172 * @return 173 */ 174 public YarnSubmissionHelper setPreserveEvaluators(final boolean preserveEvaluators) { 175 if (preserveEvaluators) { 176 // when supported, set KeepContainersAcrossApplicationAttempts to be true 177 // so that when driver (AM) crashes, evaluators will still be running and we can recover later. 178 if (YarnTypes.isAtOrAfterVersion(YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE)) { 179 LOG.log( 180 Level.FINE, 181 "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported," + 182 " will set it to true.", 183 YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE); 184 185 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true); 186 } else { 187 LOG.log(Level.WARNING, 188 "Hadoop version does not yet support KeepContainersAcrossApplicationAttempts. Driver restarts " + 189 "will not support recovering evaluators."); 190 191 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false); 192 } 193 } else { 194 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false); 195 } 196 197 return this; 198 } 199 200 /** 201 * Sets the maximum application attempts for the application. 202 * @param maxApplicationAttempts 203 * @return 204 */ 205 public YarnSubmissionHelper setMaxApplicationAttempts(final int maxApplicationAttempts) { 206 applicationSubmissionContext.setMaxAppAttempts(maxApplicationAttempts); 207 return this; 208 } 209 210 /** 211 * Assign this job submission to a queue. 212 * @param queueName 213 * @return 214 */ 215 public YarnSubmissionHelper setQueue(final String queueName) { 216 this.applicationSubmissionContext.setQueue(queueName); 217 return this; 218 } 219 220 /** 221 * Sets the launcher class for the job. 222 * @param launcherClass 223 * @return 224 */ 225 public YarnSubmissionHelper setLauncherClass(final Class launcherClass) { 226 this.launcherClazz = launcherClass; 227 return this; 228 } 229 230 /** 231 * Sets the configuration file for the job. 232 * Note that this does not have to be Driver TANG configuration. In the bootstrap 233 * launch case, this can be the set of the Avro files that supports the generation of a driver 234 * configuration file natively at the Launcher. 235 * @param configurationFilePaths 236 * @return 237 */ 238 public YarnSubmissionHelper setConfigurationFilePaths(final List<String> configurationFilePaths) { 239 this.configurationFilePaths = configurationFilePaths; 240 return this; 241 } 242 243 /** 244 * Sets the Driver stdout file path. 245 * @param driverStdoutPath 246 * @return 247 */ 248 public YarnSubmissionHelper setDriverStdoutPath(final String driverStdoutPath) { 249 this.driverStdoutFilePath = driverStdoutPath; 250 return this; 251 } 252 253 /** 254 * Sets the Driver stderr file path. 255 * @param driverStderrPath 256 * @return 257 */ 258 public YarnSubmissionHelper setDriverStderrPath(final String driverStderrPath) { 259 this.driverStderrFilePath = driverStderrPath; 260 return this; 261 } 262 263 public void submit() throws IOException, YarnException { 264 265 // SET EXEC COMMAND 266 final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList) 267 .setConfigurationFilePaths(configurationFilePaths) 268 .setClassPath(this.classpath.getDriverClasspath()) 269 .setMemory(this.applicationSubmissionContext.getResource().getMemory()) 270 .setStandardOut(driverStdoutFilePath) 271 .setStandardErr(driverStderrFilePath) 272 .build(); 273 274 if (this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts() && 275 this.applicationSubmissionContext.getMaxAppAttempts() == 1) { 276 LOG.log(Level.WARNING, "Application will not be restarted even though preserve evaluators is set to true" + 277 " since the max application submissions is 1. Proceeding to submit application..."); 278 } 279 280 final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext( 281 launchCommand, this.resources, tokenProvider.getTokens()); 282 this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext); 283 284 LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId); 285 286 if (LOG.isLoggable(Level.INFO)) { 287 LOG.log(Level.INFO, "REEF app command: {0}", StringUtils.join(launchCommand, ' ')); 288 } 289 290 this.yarnClient.submitApplication(applicationSubmissionContext); 291 292 if (this.isUnmanaged) { 293 // For Unmanaged AM mode, add a new app token to the 294 // current process so it can talk to the RM as an AM. 295 final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId); 296 this.yarnProxyUser.set("reef-proxy", UserGroupInformation.getCurrentUser(), token); 297 this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token)); 298 } 299 } 300 301 /** 302 * Extract the desired driver memory from jobSubmissionProto. 303 * <p> 304 * returns maxMemory if that desired amount is more than maxMemory 305 */ 306 private int getMemory(final int requestedMemory) { 307 final int maxMemory = applicationResponse.getMaximumResourceCapability().getMemory(); 308 final int amMemory; 309 310 if (requestedMemory <= maxMemory) { 311 amMemory = requestedMemory; 312 } else { 313 LOG.log(Level.WARNING, 314 "Requested {0}MB of memory for the driver. " + 315 "The max on this YARN installation is {1}. " + 316 "Using {1} as the memory for the driver.", 317 new Object[]{requestedMemory, maxMemory}); 318 amMemory = maxMemory; 319 } 320 return amMemory; 321 } 322 323 @Override 324 public void close() { 325 LOG.log(Level.FINE, "Closing YARN application: {0}", this.applicationId); 326 this.yarnClient.stop(); // same as yarnClient.close() 327 } 328}