001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.bridge.client; 020 021import org.apache.commons.lang.Validate; 022import org.apache.hadoop.fs.FSDataInputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.io.Text; 026import org.apache.hadoop.security.UserGroupInformation; 027import org.apache.hadoop.security.token.Token; 028import org.apache.hadoop.yarn.api.records.LocalResource; 029import org.apache.hadoop.yarn.api.records.LocalResourceType; 030import org.apache.hadoop.yarn.conf.YarnConfiguration; 031import org.apache.hadoop.yarn.exceptions.YarnException; 032import org.apache.reef.runtime.common.files.ClasspathProvider; 033import org.apache.reef.runtime.common.files.REEFFileNames; 034import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; 035import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; 036import org.apache.reef.runtime.yarn.YarnClasspathProvider; 037import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; 038import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; 039import org.apache.reef.runtime.yarn.client.uploader.JobFolder; 040import org.apache.reef.runtime.yarn.client.uploader.JobUploader; 041import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; 042import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; 043import org.apache.reef.tang.Configuration; 044import org.apache.reef.tang.Tang; 045import org.apache.reef.tang.annotations.Parameter; 046import org.apache.reef.tang.exceptions.InjectionException; 047import org.apache.reef.util.JARFileMaker; 048 049import javax.inject.Inject; 050import java.io.*; 051import java.nio.charset.StandardCharsets; 052import java.nio.file.Files; 053import java.nio.file.Paths; 054import java.util.ArrayList; 055import java.util.List; 056import java.util.logging.Level; 057import java.util.logging.Logger; 058 059/** 060 * The Java-side of the C# YARN Job Submission API. 061 */ 062@SuppressWarnings("checkstyle:hideutilityclassconstructor") 063public final class YarnJobSubmissionClient { 064 065 private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName()); 066 private final JobUploader uploader; 067 private final REEFFileNames fileNames; 068 private final YarnConfiguration yarnConfiguration; 069 private final ClasspathProvider classpath; 070 private final SecurityTokenProvider tokenProvider; 071 private final List<String> commandPrefixList; 072 private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator; 073 074 @Inject 075 YarnJobSubmissionClient(final JobUploader uploader, 076 final YarnConfiguration yarnConfiguration, 077 final REEFFileNames fileNames, 078 final ClasspathProvider classpath, 079 @Parameter(DriverLaunchCommandPrefix.class) 080 final List<String> commandPrefixList, 081 final SecurityTokenProvider tokenProvider, 082 final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) { 083 this.uploader = uploader; 084 this.fileNames = fileNames; 085 this.yarnConfiguration = yarnConfiguration; 086 this.classpath = classpath; 087 this.tokenProvider = tokenProvider; 088 this.commandPrefixList = commandPrefixList; 089 this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator; 090 } 091 092 /** 093 * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR. 094 * @return the jar file 095 * @throws IOException 096 */ 097 private File makeJar(final File driverFolder) throws IOException { 098 Validate.isTrue(driverFolder.exists()); 099 final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar"); 100 final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName()); 101 if (!reefFolder.isDirectory()) { 102 throw new FileNotFoundException(reefFolder.getAbsolutePath()); 103 } 104 105 new JARFileMaker(jarFile).addChildren(reefFolder).close(); 106 return jarFile; 107 } 108 109 private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException { 110 // ------------------------------------------------------------------------ 111 // Get an application ID 112 try (final YarnSubmissionHelper submissionHelper = 113 new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider, commandPrefixList)) { 114 115 // ------------------------------------------------------------------------ 116 // Prepare the JAR 117 final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId()); 118 this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS); 119 final File jarFile = makeJar(yarnSubmission.getDriverFolder()); 120 LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile); 121 122 // ------------------------------------------------------------------------ 123 // Upload the JAR 124 LOG.info("Uploading job submission JAR"); 125 final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE); 126 LOG.info("Uploaded job submission JAR"); 127 128 // ------------------------------------------------------------------------ 129 // Upload the job file 130 final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource( 131 new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()), 132 LocalResourceType.FILE); 133 134 final List<String> confFiles = new ArrayList<>(); 135 confFiles.add(fileNames.getYarnBootstrapJobParamFilePath()); 136 confFiles.add(fileNames.getYarnBootstrapAppParamFilePath()); 137 138 // ------------------------------------------------------------------------ 139 // Submit 140 submissionHelper 141 .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS) 142 .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS) 143 .setApplicationName(yarnSubmission.getJobId()) 144 .setDriverMemory(yarnSubmission.getDriverMemory()) 145 .setPriority(yarnSubmission.getPriority()) 146 .setQueue(yarnSubmission.getQueue()) 147 .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions()) 148 .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0) 149 .setLauncherClass(YarnBootstrapREEFLauncher.class) 150 .setConfigurationFilePaths(confFiles) 151 .setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath()) 152 .setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath()) 153 .submit(); 154 writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(), 155 submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath()); 156 } 157 } 158 159 private static void writeSecurityTokenToUserCredential( 160 final YarnClusterSubmissionFromCS yarnSubmission) throws IOException { 161 final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); 162 final REEFFileNames fileNames = new REEFFileNames(); 163 final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile(); 164 final String securityTokenPasswordFile = fileNames.getSecurityTokenPasswordFile(); 165 final Text tokenKind = new Text(yarnSubmission.getTokenKind()); 166 final Text tokenService = new Text(yarnSubmission.getTokenService()); 167 byte[] identifier = Files.readAllBytes(Paths.get(securityTokenIdentifierFile)); 168 byte[] password = Files.readAllBytes(Paths.get(securityTokenPasswordFile)); 169 Token token = new Token(identifier, password, tokenKind, tokenService); 170 currentUser.addToken(token); 171 } 172 173 /** 174 * We leave a file behind in job submission directory so that clr client can figure out 175 * the applicationId and yarn rest endpoint. 176 * @param driverFolder 177 * @param applicationId 178 * @throws IOException 179 */ 180 private void writeDriverHttpEndPoint(final File driverFolder, 181 final String applicationId, 182 final Path dfsPath) throws IOException { 183 final FileSystem fs = FileSystem.get(yarnConfiguration); 184 final Path httpEndpointPath = new Path(dfsPath, fileNames.getDriverHttpEndpoint()); 185 186 String trackingUri = null; 187 LOG.log(Level.INFO, "Attempt to reading " + httpEndpointPath.toString()); 188 for (int i = 0; i < 60; i++) { 189 try { 190 LOG.log(Level.FINE, "Attempt " + i + " reading " + httpEndpointPath.toString()); 191 if (fs.exists(httpEndpointPath)) { 192 final FSDataInputStream input = fs.open(httpEndpointPath); 193 final BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); 194 trackingUri = reader.readLine(); 195 reader.close(); 196 break; 197 } 198 } catch (Exception ignored) { 199 // readLine might throw IOException although httpEndpointPath file exists. 200 // the for-loop waits until the actual content of file is written completely 201 } 202 try{ 203 Thread.sleep(1000); 204 } catch(InterruptedException ex2) { 205 break; 206 } 207 } 208 209 if (null == trackingUri) { 210 trackingUri = ""; 211 LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString()); 212 } else { 213 LOG.log(Level.INFO, "Completed reading trackingUri :" + trackingUri); 214 } 215 216 final File driverHttpEndpointFile = new File(driverFolder, fileNames.getDriverHttpEndpoint()); 217 BufferedWriter out = new BufferedWriter(new OutputStreamWriter( 218 new FileOutputStream(driverHttpEndpointFile), StandardCharsets.UTF_8)); 219 out.write(applicationId + "\n"); 220 out.write(trackingUri + "\n"); 221 String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address"); 222 if (null == addr || addr.startsWith("0.0.0.0")) { 223 String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids"); 224 if (null != str2) { 225 for (String rm : str2.split(",")) { 226 out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) +"\n"); 227 } 228 } 229 } else { 230 out.write(addr +"\n"); 231 } 232 out.close(); 233 } 234 235 /** 236 * .NET client calls into this main method for job submission. 237 * For arguments detail: 238 * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File) 239 */ 240 public static void main(final String[] args) throws InjectionException, IOException, YarnException { 241 final File jobSubmissionParametersFile = new File(args[0]); 242 final File appSubmissionParametersFile = new File(args[1]); 243 244 if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) { 245 throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath()); 246 } 247 248 if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) { 249 throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath()); 250 } 251 252 final YarnClusterSubmissionFromCS yarnSubmission = 253 YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile( 254 appSubmissionParametersFile, jobSubmissionParametersFile); 255 256 LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission); 257 if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) { 258 // We have to write security token to user credential before YarnJobSubmissionClient is created 259 // as that will need initialization of FileSystem which could need the token. 260 LOG.log(Level.INFO, "Writing security token to user credential"); 261 writeSecurityTokenToUserCredential(yarnSubmission); 262 } else{ 263 LOG.log(Level.FINE, "Did not find security token"); 264 } 265 266 final List<String> launchCommandPrefix = new ArrayList<String>() {{ 267 add(new REEFFileNames().getDriverLauncherExeFile().toString()); 268 }}; 269 270 final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() 271 .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) 272 .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) 273 .bindNamedParameter(JobSubmissionDirectoryPrefix.class, yarnSubmission.getJobSubmissionDirectoryPrefix()) 274 .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix) 275 .build(); 276 final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig) 277 .getInstance(YarnJobSubmissionClient.class); 278 279 client.launch(yarnSubmission); 280 281 LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient"); 282 System.exit(0); 283 LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient"); 284 } 285}