001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.bridge.client; 020 021import org.apache.commons.lang.Validate; 022import org.apache.hadoop.fs.FSDataInputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.io.Text; 026import org.apache.hadoop.security.UserGroupInformation; 027import org.apache.hadoop.security.token.Token; 028import org.apache.hadoop.yarn.api.records.LocalResource; 029import org.apache.hadoop.yarn.api.records.LocalResourceType; 030import org.apache.hadoop.yarn.conf.YarnConfiguration; 031import org.apache.hadoop.yarn.exceptions.YarnException; 032import org.apache.reef.driver.parameters.DriverIsUnmanaged; 033import org.apache.reef.runtime.common.files.ClasspathProvider; 034import org.apache.reef.runtime.common.files.REEFFileNames; 035import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; 036import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; 037import org.apache.reef.runtime.yarn.YarnClasspathProvider; 038import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; 039import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; 040import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; 041import org.apache.reef.runtime.yarn.client.uploader.JobFolder; 042import org.apache.reef.runtime.yarn.client.uploader.JobUploader; 043import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl; 044import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; 045import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; 046import org.apache.reef.tang.Configuration; 047import org.apache.reef.tang.Tang; 048import org.apache.reef.tang.annotations.Parameter; 049import org.apache.reef.tang.exceptions.InjectionException; 050import org.apache.reef.util.JARFileMaker; 051 052import javax.inject.Inject; 053import java.io.*; 054import java.nio.charset.StandardCharsets; 055import java.nio.file.Files; 056import java.nio.file.Paths; 057import java.util.ArrayList; 058import java.util.List; 059import java.util.logging.Level; 060import java.util.logging.Logger; 061 062/** 063 * The Java-side of the C# YARN Job Submission API. 064 */ 065@SuppressWarnings("checkstyle:hideutilityclassconstructor") 066public final class YarnJobSubmissionClient { 067 068 private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName()); 069 070 private final boolean isUnmanaged; 071 private final List<String> commandPrefixList; 072 private final JobUploader uploader; 073 private final REEFFileNames fileNames; 074 private final YarnConfiguration yarnConfiguration; 075 private final ClasspathProvider classpath; 076 private final YarnProxyUser yarnProxyUser; 077 private final SecurityTokenProvider tokenProvider; 078 private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator; 079 080 @Inject 081 YarnJobSubmissionClient(@Parameter(DriverIsUnmanaged.class) final boolean isUnmanaged, 082 @Parameter(DriverLaunchCommandPrefix.class) final List<String> commandPrefixList, 083 final JobUploader uploader, 084 final YarnConfiguration yarnConfiguration, 085 final REEFFileNames fileNames, 086 final ClasspathProvider classpath, 087 final YarnProxyUser yarnProxyUser, 088 final SecurityTokenProvider tokenProvider, 089 final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) { 090 091 this.isUnmanaged = isUnmanaged; 092 this.commandPrefixList = commandPrefixList; 093 this.uploader = uploader; 094 this.fileNames = fileNames; 095 this.yarnConfiguration = yarnConfiguration; 096 this.classpath = classpath; 097 this.yarnProxyUser = yarnProxyUser; 098 this.tokenProvider = tokenProvider; 099 this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator; 100 } 101 102 /** 103 * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR. 104 * @return the jar file 105 * @throws IOException 106 */ 107 private File makeJar(final File driverFolder) throws IOException { 108 Validate.isTrue(driverFolder.exists()); 109 final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar"); 110 final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName()); 111 if (!reefFolder.isDirectory()) { 112 throw new FileNotFoundException(reefFolder.getAbsolutePath()); 113 } 114 115 new JARFileMaker(jarFile).addChildren(reefFolder).close(); 116 return jarFile; 117 } 118 119 private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException { 120 // ------------------------------------------------------------------------ 121 // Get an application ID 122 try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper( 123 this.yarnConfiguration, this.fileNames, this.classpath, this.yarnProxyUser, 124 this.tokenProvider, this.isUnmanaged, this.commandPrefixList)) { 125 126 // ------------------------------------------------------------------------ 127 // Prepare the JAR 128 final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId()); 129 this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS); 130 final File jarFile = makeJar(yarnSubmission.getDriverFolder()); 131 LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile); 132 133 // ------------------------------------------------------------------------ 134 // Upload the JAR 135 LOG.info("Uploading job submission JAR"); 136 final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE); 137 LOG.info("Uploaded job submission JAR"); 138 139 // ------------------------------------------------------------------------ 140 // Upload the job file 141 final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource( 142 new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()), 143 LocalResourceType.FILE); 144 145 final List<String> confFiles = new ArrayList<>(); 146 confFiles.add(fileNames.getYarnBootstrapJobParamFilePath()); 147 confFiles.add(fileNames.getYarnBootstrapAppParamFilePath()); 148 149 // ------------------------------------------------------------------------ 150 // Submit 151 submissionHelper 152 .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS) 153 .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS) 154 .setApplicationName(yarnSubmission.getJobId()) 155 .setDriverMemory(yarnSubmission.getDriverMemory()) 156 .setPriority(yarnSubmission.getPriority()) 157 .setQueue(yarnSubmission.getQueue()) 158 .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions()) 159 .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0) 160 .setLauncherClass(YarnBootstrapREEFLauncher.class) 161 .setConfigurationFilePaths(confFiles) 162 .setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath()) 163 .setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath()) 164 .submit(); 165 writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(), 166 submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath()); 167 } 168 } 169 170 private static void writeSecurityTokenToUserCredential( 171 final YarnClusterSubmissionFromCS yarnSubmission) throws IOException { 172 final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); 173 final REEFFileNames fileNames = new REEFFileNames(); 174 final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile(); 175 final String securityTokenPasswordFile = fileNames.getSecurityTokenPasswordFile(); 176 final Text tokenKind = new Text(yarnSubmission.getTokenKind()); 177 final Text tokenService = new Text(yarnSubmission.getTokenService()); 178 byte[] identifier = Files.readAllBytes(Paths.get(securityTokenIdentifierFile)); 179 byte[] password = Files.readAllBytes(Paths.get(securityTokenPasswordFile)); 180 Token token = new Token(identifier, password, tokenKind, tokenService); 181 currentUser.addToken(token); 182 } 183 184 /** 185 * We leave a file behind in job submission directory so that clr client can figure out 186 * the applicationId and yarn rest endpoint. 187 * @param driverFolder 188 * @param applicationId 189 * @throws IOException 190 */ 191 private void writeDriverHttpEndPoint(final File driverFolder, 192 final String applicationId, 193 final Path dfsPath) throws IOException { 194 final FileSystem fs = FileSystem.get(yarnConfiguration); 195 final Path httpEndpointPath = new Path(dfsPath, fileNames.getDriverHttpEndpoint()); 196 197 String trackingUri = null; 198 LOG.log(Level.INFO, "Attempt to reading " + httpEndpointPath.toString()); 199 for (int i = 0; i < 60; i++) { 200 try { 201 LOG.log(Level.FINE, "Attempt " + i + " reading " + httpEndpointPath.toString()); 202 if (fs.exists(httpEndpointPath)) { 203 final FSDataInputStream input = fs.open(httpEndpointPath); 204 final BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); 205 trackingUri = reader.readLine(); 206 reader.close(); 207 break; 208 } 209 } catch (Exception ignored) { 210 // readLine might throw IOException although httpEndpointPath file exists. 211 // the for-loop waits until the actual content of file is written completely 212 } 213 try{ 214 Thread.sleep(1000); 215 } catch(InterruptedException ex2) { 216 break; 217 } 218 } 219 220 if (null == trackingUri) { 221 trackingUri = ""; 222 LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString()); 223 } else { 224 LOG.log(Level.INFO, "Completed reading trackingUri :" + trackingUri); 225 } 226 227 final File driverHttpEndpointFile = new File(driverFolder, fileNames.getDriverHttpEndpoint()); 228 BufferedWriter out = new BufferedWriter(new OutputStreamWriter( 229 new FileOutputStream(driverHttpEndpointFile), StandardCharsets.UTF_8)); 230 out.write(applicationId + "\n"); 231 out.write(trackingUri + "\n"); 232 String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address"); 233 if (null == addr || addr.startsWith("0.0.0.0")) { 234 String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids"); 235 if (null != str2) { 236 for (String rm : str2.split(",")) { 237 out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) +"\n"); 238 } 239 } 240 } else { 241 out.write(addr +"\n"); 242 } 243 out.close(); 244 } 245 246 /** 247 * .NET client calls into this main method for job submission. 248 * For arguments detail: 249 * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File) 250 */ 251 public static void main(final String[] args) throws InjectionException, IOException, YarnException { 252 final File jobSubmissionParametersFile = new File(args[0]); 253 final File appSubmissionParametersFile = new File(args[1]); 254 255 if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) { 256 throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath()); 257 } 258 259 if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) { 260 throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath()); 261 } 262 263 final YarnClusterSubmissionFromCS yarnSubmission = 264 YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile( 265 appSubmissionParametersFile, jobSubmissionParametersFile); 266 267 LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission); 268 if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) { 269 // We have to write security token to user credential before YarnJobSubmissionClient is created 270 // as that will need initialization of FileSystem which could need the token. 271 LOG.log(Level.INFO, "Writing security token to user credential"); 272 writeSecurityTokenToUserCredential(yarnSubmission); 273 } else{ 274 LOG.log(Level.FINE, "Did not find security token"); 275 } 276 277 if (!yarnSubmission.getFileSystemUrl().equalsIgnoreCase(FileSystemUrl.DEFAULT_VALUE)) { 278 LOG.log(Level.INFO, "getFileSystemUrl: {0}", yarnSubmission.getFileSystemUrl()); 279 } else { 280 LOG.log(Level.INFO, "FileSystemUrl is not set, use default from the environment."); 281 } 282 283 final List<String> launchCommandPrefix = new ArrayList<String>() {{ 284 add(new REEFFileNames().getDriverLauncherExeFile().toString()); 285 }}; 286 287 final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() 288 .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) 289 .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) 290 .bindNamedParameter(JobSubmissionDirectoryPrefix.class, yarnSubmission.getJobSubmissionDirectoryPrefix()) 291 .bindNamedParameter(FileSystemUrl.class, yarnSubmission.getFileSystemUrl()) 292 .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix) 293 .build(); 294 final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig) 295 .getInstance(YarnJobSubmissionClient.class); 296 297 client.launch(yarnSubmission); 298 299 LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient"); 300 System.exit(0); 301 LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient"); 302 } 303}