001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.bridge.client; 020 021import org.apache.commons.lang.Validate; 022import org.apache.hadoop.fs.FSDataInputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.io.Text; 026import org.apache.hadoop.security.UserGroupInformation; 027import org.apache.hadoop.security.token.Token; 028import org.apache.hadoop.yarn.api.records.LocalResource; 029import org.apache.hadoop.yarn.api.records.LocalResourceType; 030import org.apache.hadoop.yarn.conf.YarnConfiguration; 031import org.apache.hadoop.yarn.exceptions.YarnException; 032import org.apache.reef.runtime.common.files.ClasspathProvider; 033import org.apache.reef.runtime.common.files.REEFFileNames; 034import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; 035import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; 036import org.apache.reef.runtime.yarn.YarnClasspathProvider; 037import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; 038import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; 039import org.apache.reef.runtime.yarn.client.uploader.JobFolder; 040import org.apache.reef.runtime.yarn.client.uploader.JobUploader; 041import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; 042import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; 043import org.apache.reef.tang.Configuration; 044import org.apache.reef.tang.Tang; 045import org.apache.reef.tang.annotations.Parameter; 046import org.apache.reef.tang.exceptions.InjectionException; 047import org.apache.reef.util.JARFileMaker; 048 049import javax.inject.Inject; 050import java.io.*; 051import java.nio.charset.StandardCharsets; 052import java.nio.file.Files; 053import java.nio.file.Paths; 054import java.util.ArrayList; 055import java.util.List; 056import java.util.logging.Level; 057import java.util.logging.Logger; 058 059/** 060 * The Java-side of the C# YARN Job Submission API. 061 */ 062@SuppressWarnings("checkstyle:hideutilityclassconstructor") 063public final class YarnJobSubmissionClient { 064 065 private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName()); 066 private final JobUploader uploader; 067 private final REEFFileNames fileNames; 068 private final YarnConfiguration yarnConfiguration; 069 private final ClasspathProvider classpath; 070 private final SecurityTokenProvider tokenProvider; 071 private final List<String> commandPrefixList; 072 private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator; 073 074 @Inject 075 YarnJobSubmissionClient(final JobUploader uploader, 076 final YarnConfiguration yarnConfiguration, 077 final REEFFileNames fileNames, 078 final ClasspathProvider classpath, 079 @Parameter(DriverLaunchCommandPrefix.class) 080 final List<String> commandPrefixList, 081 final SecurityTokenProvider tokenProvider, 082 final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) { 083 this.uploader = uploader; 084 this.fileNames = fileNames; 085 this.yarnConfiguration = yarnConfiguration; 086 this.classpath = classpath; 087 this.tokenProvider = tokenProvider; 088 this.commandPrefixList = commandPrefixList; 089 this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator; 090 } 091 092 /** 093 * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR. 094 * @return the jar file 095 * @throws IOException 096 */ 097 private File makeJar(final File driverFolder) throws IOException { 098 Validate.isTrue(driverFolder.exists()); 099 final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar"); 100 final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName()); 101 if (!reefFolder.isDirectory()) { 102 throw new FileNotFoundException(reefFolder.getAbsolutePath()); 103 } 104 105 new JARFileMaker(jarFile).addChildren(reefFolder).close(); 106 return jarFile; 107 } 108 109 private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException { 110 // ------------------------------------------------------------------------ 111 // Get an application ID 112 try (final YarnSubmissionHelper submissionHelper = 113 new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider, commandPrefixList)) { 114 115 // ------------------------------------------------------------------------ 116 // Prepare the JAR 117 final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId()); 118 this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS); 119 final File jarFile = makeJar(yarnSubmission.getDriverFolder()); 120 LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile); 121 122 // ------------------------------------------------------------------------ 123 // Upload the JAR 124 LOG.info("Uploading job submission JAR"); 125 final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE); 126 LOG.info("Uploaded job submission JAR"); 127 128 // ------------------------------------------------------------------------ 129 // Upload the job file 130 final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource( 131 new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()), 132 LocalResourceType.FILE); 133 134 final List<String> confFiles = new ArrayList<>(); 135 confFiles.add(fileNames.getYarnBootstrapJobParamFilePath()); 136 confFiles.add(fileNames.getYarnBootstrapAppParamFilePath()); 137 138 // ------------------------------------------------------------------------ 139 // Submit 140 submissionHelper 141 .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS) 142 .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS) 143 .setApplicationName(yarnSubmission.getJobId()) 144 .setDriverMemory(yarnSubmission.getDriverMemory()) 145 .setPriority(yarnSubmission.getPriority()) 146 .setQueue(yarnSubmission.getQueue()) 147 .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions()) 148 .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0) 149 .setLauncherClass(YarnBootstrapREEFLauncher.class) 150 .setConfigurationFilePaths(confFiles) 151 .submit(); 152 writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(), 153 submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath()); 154 } 155 } 156 157 private static void writeSecurityTokenToUserCredential( 158 final YarnClusterSubmissionFromCS yarnSubmission) throws IOException { 159 final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); 160 final REEFFileNames fileNames = new REEFFileNames(); 161 final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile(); 162 final String securityTokenPasswordFile = fileNames.getSecurityTokenPasswordFile(); 163 final Text tokenKind = new Text(yarnSubmission.getTokenKind()); 164 final Text tokenService = new Text(yarnSubmission.getTokenService()); 165 byte[] identifier = Files.readAllBytes(Paths.get(securityTokenIdentifierFile)); 166 byte[] password = Files.readAllBytes(Paths.get(securityTokenPasswordFile)); 167 Token token = new Token(identifier, password, tokenKind, tokenService); 168 currentUser.addToken(token); 169 } 170 171 /** 172 * We leave a file behind in job submission directory so that clr client can figure out 173 * the applicationId and yarn rest endpoint. 174 * @param driverFolder 175 * @param applicationId 176 * @throws IOException 177 */ 178 private void writeDriverHttpEndPoint(final File driverFolder, 179 final String applicationId, 180 final Path dfsPath) throws IOException { 181 final FileSystem fs = FileSystem.get(yarnConfiguration); 182 final Path httpEndpointPath = new Path(dfsPath, fileNames.getDriverHttpEndpoint()); 183 184 String trackingUri = null; 185 LOG.log(Level.INFO, "Attempt to reading " + httpEndpointPath.toString()); 186 for (int i = 0; i < 60; i++) { 187 try { 188 LOG.log(Level.FINE, "Attempt " + i + " reading " + httpEndpointPath.toString()); 189 if (fs.exists(httpEndpointPath)) { 190 final FSDataInputStream input = fs.open(httpEndpointPath); 191 final BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); 192 trackingUri = reader.readLine(); 193 reader.close(); 194 break; 195 } 196 } catch (Exception ignored) { 197 // readLine might throw IOException although httpEndpointPath file exists. 198 // the for-loop waits until the actual content of file is written completely 199 } 200 try{ 201 Thread.sleep(1000); 202 } catch(InterruptedException ex2) { 203 break; 204 } 205 } 206 207 if (null == trackingUri) { 208 trackingUri = ""; 209 LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString()); 210 } else { 211 LOG.log(Level.INFO, "Completed reading trackingUri :" + trackingUri); 212 } 213 214 final File driverHttpEndpointFile = new File(driverFolder, fileNames.getDriverHttpEndpoint()); 215 BufferedWriter out = new BufferedWriter(new OutputStreamWriter( 216 new FileOutputStream(driverHttpEndpointFile), StandardCharsets.UTF_8)); 217 out.write(applicationId + "\n"); 218 out.write(trackingUri + "\n"); 219 String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address"); 220 if (null == addr || addr.startsWith("0.0.0.0")) { 221 String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids"); 222 if (null != str2) { 223 for (String rm : str2.split(",")) { 224 out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) +"\n"); 225 } 226 } 227 } else { 228 out.write(addr +"\n"); 229 } 230 out.close(); 231 } 232 233 /** 234 * .NET client calls into this main method for job submission. 235 * For arguments detail: 236 * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File) 237 */ 238 public static void main(final String[] args) throws InjectionException, IOException, YarnException { 239 final File jobSubmissionParametersFile = new File(args[0]); 240 final File appSubmissionParametersFile = new File(args[1]); 241 242 if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) { 243 throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath()); 244 } 245 246 if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) { 247 throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath()); 248 } 249 250 final YarnClusterSubmissionFromCS yarnSubmission = 251 YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile( 252 appSubmissionParametersFile, jobSubmissionParametersFile); 253 254 LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission); 255 if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) { 256 // We have to write security token to user credential before YarnJobSubmissionClient is created 257 // as that will need initialization of FileSystem which could need the token. 258 LOG.log(Level.INFO, "Writing security token to user credential"); 259 writeSecurityTokenToUserCredential(yarnSubmission); 260 } else{ 261 LOG.log(Level.FINE, "Did not find security token"); 262 } 263 264 final List<String> launchCommandPrefix = new ArrayList<String>() {{ 265 add(new REEFFileNames().getDriverLauncherExeFile().toString()); 266 }}; 267 268 final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() 269 .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) 270 .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) 271 .bindNamedParameter(JobSubmissionDirectoryPrefix.class, yarnSubmission.getJobSubmissionDirectoryPrefix()) 272 .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix) 273 .build(); 274 final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig) 275 .getInstance(YarnJobSubmissionClient.class); 276 277 client.launch(yarnSubmission); 278 279 LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient"); 280 System.exit(0); 281 LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient"); 282 } 283}