001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p> 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.client; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.yarn.api.ApplicationConstants; 023import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 024import org.apache.hadoop.yarn.api.records.*; 025import org.apache.hadoop.yarn.client.api.YarnClient; 026import org.apache.hadoop.yarn.client.api.YarnClientApplication; 027import org.apache.hadoop.yarn.conf.YarnConfiguration; 028import org.apache.hadoop.yarn.exceptions.YarnException; 029import org.apache.reef.runtime.common.REEFLauncher; 030import org.apache.reef.runtime.common.files.ClasspathProvider; 031import org.apache.reef.runtime.common.files.REEFFileNames; 032import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 033import org.apache.reef.runtime.yarn.util.YarnTypes; 034 035import java.io.Closeable; 036import java.io.IOException; 037import java.util.*; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Helper code that wraps the YARN Client API for our purposes. 043 */ 044public final class YarnSubmissionHelper implements Closeable{ 045 private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName()); 046 047 private final YarnClient yarnClient; 048 private final YarnClientApplication yarnClientApplication; 049 private final GetNewApplicationResponse applicationResponse; 050 private final ApplicationSubmissionContext applicationSubmissionContext; 051 private final ApplicationId applicationId; 052 private final Map<String, LocalResource> resources = new HashMap<>(); 053 private final REEFFileNames fileNames; 054 private final ClasspathProvider classpath; 055 private final SecurityTokenProvider tokenProvider; 056 private final List<String> commandPrefixList; 057 private String driverStdoutFilePath; 058 private String driverStderrFilePath; 059 private Class launcherClazz; 060 private List<String> configurationFilePaths; 061 062 public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, 063 final REEFFileNames fileNames, 064 final ClasspathProvider classpath, 065 final SecurityTokenProvider tokenProvider, 066 final List<String> commandPrefixList) throws IOException, YarnException { 067 this.fileNames = fileNames; 068 this.classpath = classpath; 069 070 this.driverStdoutFilePath = 071 ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName(); 072 073 this.driverStderrFilePath = 074 ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName(); 075 076 LOG.log(Level.FINE, "Initializing YARN Client"); 077 this.yarnClient = YarnClient.createYarnClient(); 078 this.yarnClient.init(yarnConfiguration); 079 this.yarnClient.start(); 080 LOG.log(Level.FINE, "Initialized YARN Client"); 081 082 LOG.log(Level.FINE, "Requesting Application ID from YARN."); 083 this.yarnClientApplication = this.yarnClient.createApplication(); 084 this.applicationResponse = yarnClientApplication.getNewApplicationResponse(); 085 this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext(); 086 this.applicationId = applicationSubmissionContext.getApplicationId(); 087 this.tokenProvider = tokenProvider; 088 this.commandPrefixList = commandPrefixList; 089 this.launcherClazz = REEFLauncher.class; 090 this.configurationFilePaths = Collections.singletonList(this.fileNames.getDriverConfigurationPath()); 091 LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId); 092 } 093 094 public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, 095 final REEFFileNames fileNames, 096 final ClasspathProvider classpath, 097 final SecurityTokenProvider tokenProvider) throws IOException, YarnException { 098 this(yarnConfiguration, fileNames, classpath, tokenProvider, null); 099 } 100 101 /** 102 * 103 * @return the application ID assigned by YARN. 104 */ 105 public int getApplicationId() { 106 return this.applicationId.getId(); 107 } 108 109 /** 110 * 111 * @return the application ID string representation assigned by YARN. 112 */ 113 public String getStringApplicationId() { 114 return this.applicationId.toString(); 115 } 116 117 /** 118 * Set the name of the application to be submitted. 119 * @param applicationName 120 * @return 121 */ 122 public YarnSubmissionHelper setApplicationName(final String applicationName) { 123 applicationSubmissionContext.setApplicationName(applicationName); 124 return this; 125 } 126 127 /** 128 * Set the amount of memory to be allocated to the Driver. 129 * @param megabytes 130 * @return 131 */ 132 public YarnSubmissionHelper setDriverMemory(final int megabytes) { 133 applicationSubmissionContext.setResource(Resource.newInstance(getMemory(megabytes), 1)); 134 return this; 135 } 136 137 /** 138 * Add a file to be localized on the driver. 139 * @param resourceName 140 * @param resource 141 * @return 142 */ 143 public YarnSubmissionHelper addLocalResource(final String resourceName, final LocalResource resource) { 144 resources.put(resourceName, resource); 145 return this; 146 } 147 148 /** 149 * Set the priority of the job. 150 * @param priority 151 * @return 152 */ 153 public YarnSubmissionHelper setPriority(final int priority) { 154 this.applicationSubmissionContext.setPriority(Priority.newInstance(priority)); 155 return this; 156 } 157 158 /** 159 * Set whether or not the resource manager should preserve evaluators across driver restarts. 160 * @param preserveEvaluators 161 * @return 162 */ 163 public YarnSubmissionHelper setPreserveEvaluators(final boolean preserveEvaluators) { 164 if (preserveEvaluators) { 165 // when supported, set KeepContainersAcrossApplicationAttempts to be true 166 // so that when driver (AM) crashes, evaluators will still be running and we can recover later. 167 if (YarnTypes.isAtOrAfterVersion(YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE)) { 168 LOG.log( 169 Level.FINE, 170 "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported," + 171 " will set it to true.", 172 YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE); 173 174 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true); 175 } else { 176 LOG.log(Level.WARNING, 177 "Hadoop version does not yet support KeepContainersAcrossApplicationAttempts. Driver restarts " + 178 "will not support recovering evaluators."); 179 180 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false); 181 } 182 } else { 183 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false); 184 } 185 186 return this; 187 } 188 189 /** 190 * Sets the maximum application attempts for the application. 191 * @param maxApplicationAttempts 192 * @return 193 */ 194 public YarnSubmissionHelper setMaxApplicationAttempts(final int maxApplicationAttempts) { 195 applicationSubmissionContext.setMaxAppAttempts(maxApplicationAttempts); 196 return this; 197 } 198 199 /** 200 * Assign this job submission to a queue. 201 * @param queueName 202 * @return 203 */ 204 public YarnSubmissionHelper setQueue(final String queueName) { 205 this.applicationSubmissionContext.setQueue(queueName); 206 return this; 207 } 208 209 /** 210 * Sets the launcher class for the job. 211 * @param launcherClass 212 * @return 213 */ 214 public YarnSubmissionHelper setLauncherClass(final Class launcherClass) { 215 this.launcherClazz = launcherClass; 216 return this; 217 } 218 219 /** 220 * Sets the configuration file for the job. 221 * Note that this does not have to be Driver TANG configuration. In the bootstrap 222 * launch case, this can be the set of the Avro files that supports the generation of a driver 223 * configuration file natively at the Launcher. 224 * @param configurationFilePaths 225 * @return 226 */ 227 public YarnSubmissionHelper setConfigurationFilePaths(final List<String> configurationFilePaths) { 228 this.configurationFilePaths = configurationFilePaths; 229 return this; 230 } 231 232 /** 233 * Sets the Driver stdout file path. 234 * @param driverStdoutPath 235 * @return 236 */ 237 public YarnSubmissionHelper setDriverStdoutPath(final String driverStdoutPath) { 238 this.driverStdoutFilePath = driverStdoutPath; 239 return this; 240 } 241 242 /** 243 * Sets the Driver stderr file path. 244 * @param driverStderrPath 245 * @return 246 */ 247 public YarnSubmissionHelper setDriverStderrPath(final String driverStderrPath) { 248 this.driverStderrFilePath = driverStderrPath; 249 return this; 250 } 251 252 public void submit() throws IOException, YarnException { 253 254 // SET EXEC COMMAND 255 final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList) 256 .setConfigurationFilePaths(configurationFilePaths) 257 .setClassPath(this.classpath.getDriverClasspath()) 258 .setMemory(this.applicationSubmissionContext.getResource().getMemory()) 259 .setStandardOut(driverStdoutFilePath) 260 .setStandardErr(driverStderrFilePath) 261 .build(); 262 263 if (this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts() && 264 this.applicationSubmissionContext.getMaxAppAttempts() == 1) { 265 LOG.log(Level.WARNING, "Application will not be restarted even though preserve evaluators is set to true" + 266 " since the max application submissions is 1. Proceeding to submit application..."); 267 } 268 269 final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext( 270 launchCommand, this.resources, tokenProvider.getTokens()); 271 this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext); 272 273 LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId); 274 275 if (LOG.isLoggable(Level.FINEST)) { 276 LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' ')); 277 } 278 279 this.yarnClient.submitApplication(applicationSubmissionContext); 280 } 281 282 /** 283 * Extract the desired driver memory from jobSubmissionProto. 284 * <p> 285 * returns maxMemory if that desired amount is more than maxMemory 286 */ 287 private int getMemory(final int requestedMemory) { 288 final int maxMemory = applicationResponse.getMaximumResourceCapability().getMemory(); 289 final int amMemory; 290 291 if (requestedMemory <= maxMemory) { 292 amMemory = requestedMemory; 293 } else { 294 LOG.log(Level.WARNING, 295 "Requested {0}MB of memory for the driver. " + 296 "The max on this YARN installation is {1}. " + 297 "Using {1} as the memory for the driver.", 298 new Object[]{requestedMemory, maxMemory}); 299 amMemory = maxMemory; 300 } 301 return amMemory; 302 } 303 304 @Override 305 public void close() throws IOException { 306 this.yarnClient.stop(); 307 } 308}