This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.bridge.client;
020
021import org.apache.commons.lang.Validate;
022import org.apache.hadoop.fs.FSDataInputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.io.Text;
026import org.apache.hadoop.security.UserGroupInformation;
027import org.apache.hadoop.security.token.Token;
028import org.apache.hadoop.yarn.api.records.LocalResource;
029import org.apache.hadoop.yarn.api.records.LocalResourceType;
030import org.apache.hadoop.yarn.conf.YarnConfiguration;
031import org.apache.hadoop.yarn.exceptions.YarnException;
032import org.apache.reef.runtime.common.files.ClasspathProvider;
033import org.apache.reef.runtime.common.files.REEFFileNames;
034import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
035import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
036import org.apache.reef.runtime.yarn.YarnClasspathProvider;
037import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
038import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
039import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
040import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
041import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
042import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
043import org.apache.reef.tang.Configuration;
044import org.apache.reef.tang.Tang;
045import org.apache.reef.tang.annotations.Parameter;
046import org.apache.reef.tang.exceptions.InjectionException;
047import org.apache.reef.util.JARFileMaker;
048
049import javax.inject.Inject;
050import java.io.*;
051import java.nio.charset.StandardCharsets;
052import java.nio.file.Files;
053import java.nio.file.Paths;
054import java.util.ArrayList;
055import java.util.List;
056import java.util.logging.Level;
057import java.util.logging.Logger;
058
059/**
060 * The Java-side of the C# YARN Job Submission API.
061 */
062@SuppressWarnings("checkstyle:hideutilityclassconstructor")
063public final class YarnJobSubmissionClient {
064
065  private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
066  private final JobUploader uploader;
067  private final REEFFileNames fileNames;
068  private final YarnConfiguration yarnConfiguration;
069  private final ClasspathProvider classpath;
070  private final SecurityTokenProvider tokenProvider;
071  private final List<String> commandPrefixList;
072  private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
073
074  @Inject
075  YarnJobSubmissionClient(final JobUploader uploader,
076                          final YarnConfiguration yarnConfiguration,
077                          final REEFFileNames fileNames,
078                          final ClasspathProvider classpath,
079                          @Parameter(DriverLaunchCommandPrefix.class)
080                          final List<String> commandPrefixList,
081                          final SecurityTokenProvider tokenProvider,
082                          final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
083    this.uploader = uploader;
084    this.fileNames = fileNames;
085    this.yarnConfiguration = yarnConfiguration;
086    this.classpath = classpath;
087    this.tokenProvider = tokenProvider;
088    this.commandPrefixList = commandPrefixList;
089    this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
090  }
091
092  /**
093   * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
094   * @return the jar file
095   * @throws IOException
096   */
097  private File makeJar(final File driverFolder) throws IOException {
098    Validate.isTrue(driverFolder.exists());
099    final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar");
100    final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName());
101    if (!reefFolder.isDirectory()) {
102      throw new FileNotFoundException(reefFolder.getAbsolutePath());
103    }
104
105    new JARFileMaker(jarFile).addChildren(reefFolder).close();
106    return jarFile;
107  }
108
109  private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException {
110    // ------------------------------------------------------------------------
111    // Get an application ID
112    try (final YarnSubmissionHelper submissionHelper =
113             new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider, commandPrefixList)) {
114
115      // ------------------------------------------------------------------------
116      // Prepare the JAR
117      final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
118      this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS);
119      final File jarFile = makeJar(yarnSubmission.getDriverFolder());
120      LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
121
122      // ------------------------------------------------------------------------
123      // Upload the JAR
124      LOG.info("Uploading job submission JAR");
125      final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE);
126      LOG.info("Uploaded job submission JAR");
127
128      // ------------------------------------------------------------------------
129      // Upload the job file
130      final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(
131          new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()),
132          LocalResourceType.FILE);
133
134      final List<String> confFiles = new ArrayList<>();
135      confFiles.add(fileNames.getYarnBootstrapJobParamFilePath());
136      confFiles.add(fileNames.getYarnBootstrapAppParamFilePath());
137
138      // ------------------------------------------------------------------------
139      // Submit
140      submissionHelper
141          .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
142          .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS)
143          .setApplicationName(yarnSubmission.getJobId())
144          .setDriverMemory(yarnSubmission.getDriverMemory())
145          .setPriority(yarnSubmission.getPriority())
146          .setQueue(yarnSubmission.getQueue())
147          .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions())
148          .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0)
149          .setLauncherClass(YarnBootstrapREEFLauncher.class)
150          .setConfigurationFilePaths(confFiles)
151          .submit();
152      writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
153          submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
154    }
155  }
156
157  private static void writeSecurityTokenToUserCredential(
158      final YarnClusterSubmissionFromCS yarnSubmission) throws IOException {
159    final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
160    final REEFFileNames fileNames = new REEFFileNames();
161    final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile();
162    final String securityTokenPasswordFile = fileNames.getSecurityTokenPasswordFile();
163    final Text tokenKind = new Text(yarnSubmission.getTokenKind());
164    final Text tokenService = new Text(yarnSubmission.getTokenService());
165    byte[] identifier = Files.readAllBytes(Paths.get(securityTokenIdentifierFile));
166    byte[] password = Files.readAllBytes(Paths.get(securityTokenPasswordFile));
167    Token token = new Token(identifier, password, tokenKind, tokenService);
168    currentUser.addToken(token);
169  }
170
171  /**
172   * We leave a file behind in job submission directory so that clr client can figure out
173   * the applicationId and yarn rest endpoint.
174   * @param driverFolder
175   * @param applicationId
176   * @throws IOException
177   */
178  private void writeDriverHttpEndPoint(final File driverFolder,
179                                       final String applicationId,
180                                       final Path dfsPath) throws  IOException {
181    final FileSystem fs = FileSystem.get(yarnConfiguration);
182    final Path httpEndpointPath = new Path(dfsPath, fileNames.getDriverHttpEndpoint());
183
184    String trackingUri = null;
185    LOG.log(Level.INFO, "Attempt to reading " + httpEndpointPath.toString());
186    for (int i = 0; i < 60; i++) {
187      try {
188        LOG.log(Level.FINE, "Attempt " + i + " reading " + httpEndpointPath.toString());
189        if (fs.exists(httpEndpointPath)) {
190          final FSDataInputStream input = fs.open(httpEndpointPath);
191          final BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
192          trackingUri = reader.readLine();
193          reader.close();
194          break;
195        }
196      } catch (Exception ignored) {
197        // readLine might throw IOException although httpEndpointPath file exists.
198        // the for-loop waits until the actual content of file is written completely
199      }
200      try{
201        Thread.sleep(1000);
202      } catch(InterruptedException ex2) {
203        break;
204      }
205    }
206
207    if (null == trackingUri) {
208      trackingUri = "";
209      LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString());
210    } else {
211      LOG.log(Level.INFO, "Completed reading trackingUri :" + trackingUri);
212    }
213
214    final File driverHttpEndpointFile = new File(driverFolder, fileNames.getDriverHttpEndpoint());
215    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
216                   new FileOutputStream(driverHttpEndpointFile), StandardCharsets.UTF_8));
217    out.write(applicationId + "\n");
218    out.write(trackingUri + "\n");
219    String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address");
220    if (null == addr || addr.startsWith("0.0.0.0")) {
221      String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids");
222      if (null != str2) {
223        for (String rm : str2.split(",")) {
224          out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) +"\n");
225        }
226      }
227    } else {
228      out.write(addr +"\n");
229    }
230    out.close();
231  }
232
233  /**
234   * .NET client calls into this main method for job submission.
235   * For arguments detail:
236   * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File)
237   */
238  public static void main(final String[] args) throws InjectionException, IOException, YarnException {
239    final File jobSubmissionParametersFile = new File(args[0]);
240    final File appSubmissionParametersFile = new File(args[1]);
241
242    if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) {
243      throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath());
244    }
245
246    if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
247      throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
248    }
249
250    final YarnClusterSubmissionFromCS yarnSubmission =
251        YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(
252            appSubmissionParametersFile, jobSubmissionParametersFile);
253
254    LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission);
255    if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {
256      // We have to write security token to user credential before YarnJobSubmissionClient is created
257      // as that will need initialization of FileSystem which could need the token.
258      LOG.log(Level.INFO, "Writing security token to user credential");
259      writeSecurityTokenToUserCredential(yarnSubmission);
260    } else{
261      LOG.log(Level.FINE, "Did not find security token");
262    }
263
264    final List<String> launchCommandPrefix = new ArrayList<String>() {{
265          add(new REEFFileNames().getDriverLauncherExeFile().toString());
266      }};
267
268    final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
269        .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
270        .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
271        .bindNamedParameter(JobSubmissionDirectoryPrefix.class, yarnSubmission.getJobSubmissionDirectoryPrefix())
272        .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix)
273        .build();
274    final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig)
275        .getInstance(YarnJobSubmissionClient.class);
276
277    client.launch(yarnSubmission);
278
279    LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient");
280    System.exit(0);
281    LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient");
282  }
283}