001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p> 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.client; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.yarn.api.ApplicationConstants; 023import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 024import org.apache.hadoop.yarn.api.records.*; 025import org.apache.hadoop.yarn.client.api.YarnClient; 026import org.apache.hadoop.yarn.client.api.YarnClientApplication; 027import org.apache.hadoop.yarn.conf.YarnConfiguration; 028import org.apache.hadoop.yarn.exceptions.YarnException; 029import org.apache.reef.runtime.common.REEFLauncher; 030import org.apache.reef.runtime.common.files.ClasspathProvider; 031import org.apache.reef.runtime.common.files.REEFFileNames; 032import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 033import org.apache.reef.runtime.yarn.util.YarnTypes; 034 035import java.io.Closeable; 036import java.io.IOException; 037import java.util.*; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Helper code that wraps the YARN Client API for our purposes. 043 */ 044public final class YarnSubmissionHelper implements Closeable{ 045 private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName()); 046 047 private final YarnClient yarnClient; 048 private final YarnClientApplication yarnClientApplication; 049 private final GetNewApplicationResponse applicationResponse; 050 private final ApplicationSubmissionContext applicationSubmissionContext; 051 private final ApplicationId applicationId; 052 private final Map<String, LocalResource> resources = new HashMap<>(); 053 private final REEFFileNames fileNames; 054 private final ClasspathProvider classpath; 055 private final SecurityTokenProvider tokenProvider; 056 private final List<String> commandPrefixList; 057 private Class launcherClazz; 058 private List<String> configurationFilePaths; 059 060 public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, 061 final REEFFileNames fileNames, 062 final ClasspathProvider classpath, 063 final SecurityTokenProvider tokenProvider, 064 final List<String> commandPrefixList) throws IOException, YarnException { 065 this.fileNames = fileNames; 066 this.classpath = classpath; 067 068 LOG.log(Level.FINE, "Initializing YARN Client"); 069 this.yarnClient = YarnClient.createYarnClient(); 070 this.yarnClient.init(yarnConfiguration); 071 this.yarnClient.start(); 072 LOG.log(Level.FINE, "Initialized YARN Client"); 073 074 LOG.log(Level.FINE, "Requesting Application ID from YARN."); 075 this.yarnClientApplication = this.yarnClient.createApplication(); 076 this.applicationResponse = yarnClientApplication.getNewApplicationResponse(); 077 this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext(); 078 this.applicationId = applicationSubmissionContext.getApplicationId(); 079 this.tokenProvider = tokenProvider; 080 this.commandPrefixList = commandPrefixList; 081 this.launcherClazz = REEFLauncher.class; 082 this.configurationFilePaths = Collections.singletonList(this.fileNames.getDriverConfigurationPath()); 083 LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId); 084 } 085 086 public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, 087 final REEFFileNames fileNames, 088 final ClasspathProvider classpath, 089 final SecurityTokenProvider tokenProvider) throws IOException, YarnException { 090 this(yarnConfiguration, fileNames, classpath, tokenProvider, null); 091 } 092 093 /** 094 * 095 * @return the application ID assigned by YARN. 096 */ 097 public int getApplicationId() { 098 return this.applicationId.getId(); 099 } 100 101 /** 102 * 103 * @return the application ID string representation assigned by YARN. 104 */ 105 public String getStringApplicationId() { 106 return this.applicationId.toString(); 107 } 108 109 /** 110 * Set the name of the application to be submitted. 111 * @param applicationName 112 * @return 113 */ 114 public YarnSubmissionHelper setApplicationName(final String applicationName) { 115 applicationSubmissionContext.setApplicationName(applicationName); 116 return this; 117 } 118 119 /** 120 * Set the amount of memory to be allocated to the Driver. 121 * @param megabytes 122 * @return 123 */ 124 public YarnSubmissionHelper setDriverMemory(final int megabytes) { 125 applicationSubmissionContext.setResource(Resource.newInstance(getMemory(megabytes), 1)); 126 return this; 127 } 128 129 /** 130 * Add a file to be localized on the driver. 131 * @param resourceName 132 * @param resource 133 * @return 134 */ 135 public YarnSubmissionHelper addLocalResource(final String resourceName, final LocalResource resource) { 136 resources.put(resourceName, resource); 137 return this; 138 } 139 140 /** 141 * Set the priority of the job. 142 * @param priority 143 * @return 144 */ 145 public YarnSubmissionHelper setPriority(final int priority) { 146 this.applicationSubmissionContext.setPriority(Priority.newInstance(priority)); 147 return this; 148 } 149 150 /** 151 * Set whether or not the resource manager should preserve evaluators across driver restarts. 152 * @param preserveEvaluators 153 * @return 154 */ 155 public YarnSubmissionHelper setPreserveEvaluators(final boolean preserveEvaluators) { 156 if (preserveEvaluators) { 157 // when supported, set KeepContainersAcrossApplicationAttempts to be true 158 // so that when driver (AM) crashes, evaluators will still be running and we can recover later. 159 if (YarnTypes.isAtOrAfterVersion(YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE)) { 160 LOG.log( 161 Level.FINE, 162 "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported," + 163 " will set it to true.", 164 YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE); 165 166 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true); 167 } else { 168 LOG.log(Level.WARNING, 169 "Hadoop version does not yet support KeepContainersAcrossApplicationAttempts. Driver restarts " + 170 "will not support recovering evaluators."); 171 172 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false); 173 } 174 } else { 175 applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false); 176 } 177 178 return this; 179 } 180 181 /** 182 * Sets the maximum application attempts for the application. 183 * @param maxApplicationAttempts 184 * @return 185 */ 186 public YarnSubmissionHelper setMaxApplicationAttempts(final int maxApplicationAttempts) { 187 applicationSubmissionContext.setMaxAppAttempts(maxApplicationAttempts); 188 return this; 189 } 190 191 /** 192 * Assign this job submission to a queue. 193 * @param queueName 194 * @return 195 */ 196 public YarnSubmissionHelper setQueue(final String queueName) { 197 this.applicationSubmissionContext.setQueue(queueName); 198 return this; 199 } 200 201 /** 202 * Sets the launcher class for the job. 203 * @param launcherClass 204 * @return 205 */ 206 public YarnSubmissionHelper setLauncherClass(final Class launcherClass) { 207 this.launcherClazz = launcherClass; 208 return this; 209 } 210 211 /** 212 * Sets the configuration file for the job. 213 * Note that this does not have to be Driver TANG configuration. In the bootstrap 214 * launch case, this can be the set of the Avro files that supports the generation of a driver 215 * configuration file natively at the Launcher. 216 * @param configurationFilePaths 217 * @return 218 */ 219 public YarnSubmissionHelper setConfigurationFilePaths(final List<String> configurationFilePaths) { 220 this.configurationFilePaths = configurationFilePaths; 221 return this; 222 } 223 224 public void submit() throws IOException, YarnException { 225 226 // SET EXEC COMMAND 227 final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList) 228 .setConfigurationFilePaths(configurationFilePaths) 229 .setClassPath(this.classpath.getDriverClasspath()) 230 .setMemory(this.applicationSubmissionContext.getResource().getMemory()) 231 .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName()) 232 .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName()) 233 .build(); 234 235 if (this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts() && 236 this.applicationSubmissionContext.getMaxAppAttempts() == 1) { 237 LOG.log(Level.WARNING, "Application will not be restarted even though preserve evaluators is set to true" + 238 " since the max application submissions is 1. Proceeding to submit application..."); 239 } 240 241 final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext( 242 launchCommand, this.resources, tokenProvider.getTokens()); 243 this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext); 244 245 LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId); 246 247 if (LOG.isLoggable(Level.FINEST)) { 248 LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' ')); 249 } 250 251 this.yarnClient.submitApplication(applicationSubmissionContext); 252 } 253 254 /** 255 * Extract the desired driver memory from jobSubmissionProto. 256 * <p> 257 * returns maxMemory if that desired amount is more than maxMemory 258 */ 259 private int getMemory(final int requestedMemory) { 260 final int maxMemory = applicationResponse.getMaximumResourceCapability().getMemory(); 261 final int amMemory; 262 263 if (requestedMemory <= maxMemory) { 264 amMemory = requestedMemory; 265 } else { 266 LOG.log(Level.WARNING, 267 "Requested {0}MB of memory for the driver. " + 268 "The max on this YARN installation is {1}. " + 269 "Using {1} as the memory for the driver.", 270 new Object[]{requestedMemory, maxMemory}); 271 amMemory = maxMemory; 272 } 273 return amMemory; 274 } 275 276 @Override 277 public void close() throws IOException { 278 this.yarnClient.stop(); 279 } 280}