This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.bridge.client;
020
021import org.apache.commons.lang.Validate;
022import org.apache.hadoop.fs.FSDataInputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.io.Text;
026import org.apache.hadoop.security.UserGroupInformation;
027import org.apache.hadoop.security.token.Token;
028import org.apache.hadoop.yarn.api.records.LocalResource;
029import org.apache.hadoop.yarn.api.records.LocalResourceType;
030import org.apache.hadoop.yarn.conf.YarnConfiguration;
031import org.apache.hadoop.yarn.exceptions.YarnException;
032import org.apache.reef.runtime.common.files.ClasspathProvider;
033import org.apache.reef.runtime.common.files.REEFFileNames;
034import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
035import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
036import org.apache.reef.runtime.yarn.YarnClasspathProvider;
037import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
038import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
039import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
040import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
041import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
042import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
043import org.apache.reef.tang.Configuration;
044import org.apache.reef.tang.Tang;
045import org.apache.reef.tang.annotations.Parameter;
046import org.apache.reef.tang.exceptions.InjectionException;
047import org.apache.reef.util.JARFileMaker;
048
049import javax.inject.Inject;
050import java.io.*;
051import java.nio.charset.StandardCharsets;
052import java.nio.file.Files;
053import java.nio.file.Paths;
054import java.util.ArrayList;
055import java.util.List;
056import java.util.logging.Level;
057import java.util.logging.Logger;
058
059/**
060 * The Java-side of the C# YARN Job Submission API.
061 */
062@SuppressWarnings("checkstyle:hideutilityclassconstructor")
063public final class YarnJobSubmissionClient {
064
065  private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
066  private final JobUploader uploader;
067  private final REEFFileNames fileNames;
068  private final YarnConfiguration yarnConfiguration;
069  private final ClasspathProvider classpath;
070  private final SecurityTokenProvider tokenProvider;
071  private final List<String> commandPrefixList;
072  private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
073
074  @Inject
075  YarnJobSubmissionClient(final JobUploader uploader,
076                          final YarnConfiguration yarnConfiguration,
077                          final REEFFileNames fileNames,
078                          final ClasspathProvider classpath,
079                          @Parameter(DriverLaunchCommandPrefix.class)
080                          final List<String> commandPrefixList,
081                          final SecurityTokenProvider tokenProvider,
082                          final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
083    this.uploader = uploader;
084    this.fileNames = fileNames;
085    this.yarnConfiguration = yarnConfiguration;
086    this.classpath = classpath;
087    this.tokenProvider = tokenProvider;
088    this.commandPrefixList = commandPrefixList;
089    this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
090  }
091
092  /**
093   * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
094   * @return the jar file
095   * @throws IOException
096   */
097  private File makeJar(final File driverFolder) throws IOException {
098    Validate.isTrue(driverFolder.exists());
099    final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar");
100    final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName());
101    if (!reefFolder.isDirectory()) {
102      throw new FileNotFoundException(reefFolder.getAbsolutePath());
103    }
104
105    new JARFileMaker(jarFile).addChildren(reefFolder).close();
106    return jarFile;
107  }
108
109  private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException {
110    // ------------------------------------------------------------------------
111    // Get an application ID
112    try (final YarnSubmissionHelper submissionHelper =
113             new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider, commandPrefixList)) {
114
115      // ------------------------------------------------------------------------
116      // Prepare the JAR
117      final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
118      this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS);
119      final File jarFile = makeJar(yarnSubmission.getDriverFolder());
120      LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
121
122      // ------------------------------------------------------------------------
123      // Upload the JAR
124      LOG.info("Uploading job submission JAR");
125      final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE);
126      LOG.info("Uploaded job submission JAR");
127
128      // ------------------------------------------------------------------------
129      // Upload the job file
130      final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(
131          new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()),
132          LocalResourceType.FILE);
133
134      final List<String> confFiles = new ArrayList<>();
135      confFiles.add(fileNames.getYarnBootstrapJobParamFilePath());
136      confFiles.add(fileNames.getYarnBootstrapAppParamFilePath());
137
138      // ------------------------------------------------------------------------
139      // Submit
140      submissionHelper
141          .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
142          .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS)
143          .setApplicationName(yarnSubmission.getJobId())
144          .setDriverMemory(yarnSubmission.getDriverMemory())
145          .setPriority(yarnSubmission.getPriority())
146          .setQueue(yarnSubmission.getQueue())
147          .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions())
148          .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0)
149          .setLauncherClass(YarnBootstrapREEFLauncher.class)
150          .setConfigurationFilePaths(confFiles)
151          .setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath())
152          .setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath())
153          .submit();
154      writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
155          submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
156    }
157  }
158
159  private static void writeSecurityTokenToUserCredential(
160      final YarnClusterSubmissionFromCS yarnSubmission) throws IOException {
161    final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
162    final REEFFileNames fileNames = new REEFFileNames();
163    final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile();
164    final String securityTokenPasswordFile = fileNames.getSecurityTokenPasswordFile();
165    final Text tokenKind = new Text(yarnSubmission.getTokenKind());
166    final Text tokenService = new Text(yarnSubmission.getTokenService());
167    byte[] identifier = Files.readAllBytes(Paths.get(securityTokenIdentifierFile));
168    byte[] password = Files.readAllBytes(Paths.get(securityTokenPasswordFile));
169    Token token = new Token(identifier, password, tokenKind, tokenService);
170    currentUser.addToken(token);
171  }
172
173  /**
174   * We leave a file behind in job submission directory so that clr client can figure out
175   * the applicationId and yarn rest endpoint.
176   * @param driverFolder
177   * @param applicationId
178   * @throws IOException
179   */
180  private void writeDriverHttpEndPoint(final File driverFolder,
181                                       final String applicationId,
182                                       final Path dfsPath) throws  IOException {
183    final FileSystem fs = FileSystem.get(yarnConfiguration);
184    final Path httpEndpointPath = new Path(dfsPath, fileNames.getDriverHttpEndpoint());
185
186    String trackingUri = null;
187    LOG.log(Level.INFO, "Attempt to reading " + httpEndpointPath.toString());
188    for (int i = 0; i < 60; i++) {
189      try {
190        LOG.log(Level.FINE, "Attempt " + i + " reading " + httpEndpointPath.toString());
191        if (fs.exists(httpEndpointPath)) {
192          final FSDataInputStream input = fs.open(httpEndpointPath);
193          final BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
194          trackingUri = reader.readLine();
195          reader.close();
196          break;
197        }
198      } catch (Exception ignored) {
199        // readLine might throw IOException although httpEndpointPath file exists.
200        // the for-loop waits until the actual content of file is written completely
201      }
202      try{
203        Thread.sleep(1000);
204      } catch(InterruptedException ex2) {
205        break;
206      }
207    }
208
209    if (null == trackingUri) {
210      trackingUri = "";
211      LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString());
212    } else {
213      LOG.log(Level.INFO, "Completed reading trackingUri :" + trackingUri);
214    }
215
216    final File driverHttpEndpointFile = new File(driverFolder, fileNames.getDriverHttpEndpoint());
217    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
218                   new FileOutputStream(driverHttpEndpointFile), StandardCharsets.UTF_8));
219    out.write(applicationId + "\n");
220    out.write(trackingUri + "\n");
221    String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address");
222    if (null == addr || addr.startsWith("0.0.0.0")) {
223      String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids");
224      if (null != str2) {
225        for (String rm : str2.split(",")) {
226          out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) +"\n");
227        }
228      }
229    } else {
230      out.write(addr +"\n");
231    }
232    out.close();
233  }
234
235  /**
236   * .NET client calls into this main method for job submission.
237   * For arguments detail:
238   * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File)
239   */
240  public static void main(final String[] args) throws InjectionException, IOException, YarnException {
241    final File jobSubmissionParametersFile = new File(args[0]);
242    final File appSubmissionParametersFile = new File(args[1]);
243
244    if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) {
245      throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath());
246    }
247
248    if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
249      throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
250    }
251
252    final YarnClusterSubmissionFromCS yarnSubmission =
253        YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(
254            appSubmissionParametersFile, jobSubmissionParametersFile);
255
256    LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission);
257    if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {
258      // We have to write security token to user credential before YarnJobSubmissionClient is created
259      // as that will need initialization of FileSystem which could need the token.
260      LOG.log(Level.INFO, "Writing security token to user credential");
261      writeSecurityTokenToUserCredential(yarnSubmission);
262    } else{
263      LOG.log(Level.FINE, "Did not find security token");
264    }
265
266    final List<String> launchCommandPrefix = new ArrayList<String>() {{
267          add(new REEFFileNames().getDriverLauncherExeFile().toString());
268      }};
269
270    final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
271        .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
272        .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
273        .bindNamedParameter(JobSubmissionDirectoryPrefix.class, yarnSubmission.getJobSubmissionDirectoryPrefix())
274        .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix)
275        .build();
276    final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig)
277        .getInstance(YarnJobSubmissionClient.class);
278
279    client.launch(yarnSubmission);
280
281    LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient");
282    System.exit(0);
283    LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient");
284  }
285}