This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.bridge.client;
020
021import org.apache.commons.lang.Validate;
022import org.apache.hadoop.fs.FSDataInputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.io.Text;
026import org.apache.hadoop.security.UserGroupInformation;
027import org.apache.hadoop.security.token.Token;
028import org.apache.hadoop.yarn.api.records.LocalResource;
029import org.apache.hadoop.yarn.api.records.LocalResourceType;
030import org.apache.hadoop.yarn.conf.YarnConfiguration;
031import org.apache.hadoop.yarn.exceptions.YarnException;
032import org.apache.reef.driver.parameters.DriverIsUnmanaged;
033import org.apache.reef.runtime.common.files.ClasspathProvider;
034import org.apache.reef.runtime.common.files.REEFFileNames;
035import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
036import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
037import org.apache.reef.runtime.yarn.YarnClasspathProvider;
038import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
039import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
040import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser;
041import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
042import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
043import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl;
044import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
045import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
046import org.apache.reef.tang.Configuration;
047import org.apache.reef.tang.Tang;
048import org.apache.reef.tang.annotations.Parameter;
049import org.apache.reef.tang.exceptions.InjectionException;
050import org.apache.reef.util.JARFileMaker;
051
052import javax.inject.Inject;
053import java.io.*;
054import java.nio.charset.StandardCharsets;
055import java.nio.file.Files;
056import java.nio.file.Paths;
057import java.util.ArrayList;
058import java.util.List;
059import java.util.logging.Level;
060import java.util.logging.Logger;
061
062/**
063 * The Java-side of the C# YARN Job Submission API.
064 */
065@SuppressWarnings("checkstyle:hideutilityclassconstructor")
066public final class YarnJobSubmissionClient {
067
068  private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
069
070  private final boolean isUnmanaged;
071  private final List<String> commandPrefixList;
072  private final JobUploader uploader;
073  private final REEFFileNames fileNames;
074  private final YarnConfiguration yarnConfiguration;
075  private final ClasspathProvider classpath;
076  private final YarnProxyUser yarnProxyUser;
077  private final SecurityTokenProvider tokenProvider;
078  private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
079
080  @Inject
081  YarnJobSubmissionClient(@Parameter(DriverIsUnmanaged.class) final boolean isUnmanaged,
082                          @Parameter(DriverLaunchCommandPrefix.class) final List<String> commandPrefixList,
083                          final JobUploader uploader,
084                          final YarnConfiguration yarnConfiguration,
085                          final REEFFileNames fileNames,
086                          final ClasspathProvider classpath,
087                          final YarnProxyUser yarnProxyUser,
088                          final SecurityTokenProvider tokenProvider,
089                          final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
090
091    this.isUnmanaged = isUnmanaged;
092    this.commandPrefixList = commandPrefixList;
093    this.uploader = uploader;
094    this.fileNames = fileNames;
095    this.yarnConfiguration = yarnConfiguration;
096    this.classpath = classpath;
097    this.yarnProxyUser = yarnProxyUser;
098    this.tokenProvider = tokenProvider;
099    this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
100  }
101
102  /**
103   * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
104   * @return the jar file
105   * @throws IOException
106   */
107  private File makeJar(final File driverFolder) throws IOException {
108    Validate.isTrue(driverFolder.exists());
109    final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar");
110    final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName());
111    if (!reefFolder.isDirectory()) {
112      throw new FileNotFoundException(reefFolder.getAbsolutePath());
113    }
114
115    new JARFileMaker(jarFile).addChildren(reefFolder).close();
116    return jarFile;
117  }
118
119  private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException {
120    // ------------------------------------------------------------------------
121    // Get an application ID
122    try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
123        this.yarnConfiguration, this.fileNames, this.classpath, this.yarnProxyUser,
124        this.tokenProvider, this.isUnmanaged, this.commandPrefixList)) {
125
126      // ------------------------------------------------------------------------
127      // Prepare the JAR
128      final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
129      this.jobSubmissionParametersGenerator.writeConfiguration(yarnSubmission, jobFolderOnDFS);
130      final File jarFile = makeJar(yarnSubmission.getDriverFolder());
131      LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
132
133      // ------------------------------------------------------------------------
134      // Upload the JAR
135      LOG.info("Uploading job submission JAR");
136      final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE);
137      LOG.info("Uploaded job submission JAR");
138
139      // ------------------------------------------------------------------------
140      // Upload the job file
141      final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(
142          new File(yarnSubmission.getDriverFolder(), fileNames.getYarnBootstrapJobParamFilePath()),
143          LocalResourceType.FILE);
144
145      final List<String> confFiles = new ArrayList<>();
146      confFiles.add(fileNames.getYarnBootstrapJobParamFilePath());
147      confFiles.add(fileNames.getYarnBootstrapAppParamFilePath());
148
149      // ------------------------------------------------------------------------
150      // Submit
151      submissionHelper
152          .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
153          .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), jobFileOnDFS)
154          .setApplicationName(yarnSubmission.getJobId())
155          .setDriverMemory(yarnSubmission.getDriverMemory())
156          .setPriority(yarnSubmission.getPriority())
157          .setQueue(yarnSubmission.getQueue())
158          .setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions())
159          .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0)
160          .setLauncherClass(YarnBootstrapREEFLauncher.class)
161          .setConfigurationFilePaths(confFiles)
162          .setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath())
163          .setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath())
164          .submit();
165      writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
166          submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
167    }
168  }
169
170  private static void writeSecurityTokenToUserCredential(
171      final YarnClusterSubmissionFromCS yarnSubmission) throws IOException {
172    final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
173    final REEFFileNames fileNames = new REEFFileNames();
174    final String securityTokenIdentifierFile = fileNames.getSecurityTokenIdentifierFile();
175    final String securityTokenPasswordFile = fileNames.getSecurityTokenPasswordFile();
176    final Text tokenKind = new Text(yarnSubmission.getTokenKind());
177    final Text tokenService = new Text(yarnSubmission.getTokenService());
178    byte[] identifier = Files.readAllBytes(Paths.get(securityTokenIdentifierFile));
179    byte[] password = Files.readAllBytes(Paths.get(securityTokenPasswordFile));
180    Token token = new Token(identifier, password, tokenKind, tokenService);
181    currentUser.addToken(token);
182  }
183
184  /**
185   * We leave a file behind in job submission directory so that clr client can figure out
186   * the applicationId and yarn rest endpoint.
187   * @param driverFolder
188   * @param applicationId
189   * @throws IOException
190   */
191  private void writeDriverHttpEndPoint(final File driverFolder,
192                                       final String applicationId,
193                                       final Path dfsPath) throws  IOException {
194    final FileSystem fs = FileSystem.get(yarnConfiguration);
195    final Path httpEndpointPath = new Path(dfsPath, fileNames.getDriverHttpEndpoint());
196
197    String trackingUri = null;
198    LOG.log(Level.INFO, "Attempt to reading " + httpEndpointPath.toString());
199    for (int i = 0; i < 60; i++) {
200      try {
201        LOG.log(Level.FINE, "Attempt " + i + " reading " + httpEndpointPath.toString());
202        if (fs.exists(httpEndpointPath)) {
203          final FSDataInputStream input = fs.open(httpEndpointPath);
204          final BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
205          trackingUri = reader.readLine();
206          reader.close();
207          break;
208        }
209      } catch (Exception ignored) {
210        // readLine might throw IOException although httpEndpointPath file exists.
211        // the for-loop waits until the actual content of file is written completely
212      }
213      try{
214        Thread.sleep(1000);
215      } catch(InterruptedException ex2) {
216        break;
217      }
218    }
219
220    if (null == trackingUri) {
221      trackingUri = "";
222      LOG.log(Level.WARNING, "Failed reading " + httpEndpointPath.toString());
223    } else {
224      LOG.log(Level.INFO, "Completed reading trackingUri :" + trackingUri);
225    }
226
227    final File driverHttpEndpointFile = new File(driverFolder, fileNames.getDriverHttpEndpoint());
228    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
229                   new FileOutputStream(driverHttpEndpointFile), StandardCharsets.UTF_8));
230    out.write(applicationId + "\n");
231    out.write(trackingUri + "\n");
232    String addr = yarnConfiguration.get("yarn.resourcemanager.webapp.address");
233    if (null == addr || addr.startsWith("0.0.0.0")) {
234      String str2 = yarnConfiguration.get("yarn.resourcemanager.ha.rm-ids");
235      if (null != str2) {
236        for (String rm : str2.split(",")) {
237          out.write(yarnConfiguration.get("yarn.resourcemanager.webapp.address." + rm) +"\n");
238        }
239      }
240    } else {
241      out.write(addr +"\n");
242    }
243    out.close();
244  }
245
246  /**
247   * .NET client calls into this main method for job submission.
248   * For arguments detail:
249   * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, File)
250   */
251  public static void main(final String[] args) throws InjectionException, IOException, YarnException {
252    final File jobSubmissionParametersFile = new File(args[0]);
253    final File appSubmissionParametersFile = new File(args[1]);
254
255    if (!(appSubmissionParametersFile.exists() && appSubmissionParametersFile.canRead())) {
256      throw new IOException("Unable to open and read " + appSubmissionParametersFile.getAbsolutePath());
257    }
258
259    if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
260      throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
261    }
262
263    final YarnClusterSubmissionFromCS yarnSubmission =
264        YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(
265            appSubmissionParametersFile, jobSubmissionParametersFile);
266
267    LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission);
268    if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {
269      // We have to write security token to user credential before YarnJobSubmissionClient is created
270      // as that will need initialization of FileSystem which could need the token.
271      LOG.log(Level.INFO, "Writing security token to user credential");
272      writeSecurityTokenToUserCredential(yarnSubmission);
273    } else{
274      LOG.log(Level.FINE, "Did not find security token");
275    }
276
277    if (!yarnSubmission.getFileSystemUrl().equalsIgnoreCase(FileSystemUrl.DEFAULT_VALUE)) {
278      LOG.log(Level.INFO, "getFileSystemUrl: {0}", yarnSubmission.getFileSystemUrl());
279    } else {
280      LOG.log(Level.INFO, "FileSystemUrl is not set, use default from the environment.");
281    }
282
283    final List<String> launchCommandPrefix = new ArrayList<String>() {{
284          add(new REEFFileNames().getDriverLauncherExeFile().toString());
285      }};
286
287    final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder()
288        .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
289        .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
290        .bindNamedParameter(JobSubmissionDirectoryPrefix.class, yarnSubmission.getJobSubmissionDirectoryPrefix())
291        .bindNamedParameter(FileSystemUrl.class, yarnSubmission.getFileSystemUrl())
292        .bindList(DriverLaunchCommandPrefix.class, launchCommandPrefix)
293        .build();
294    final YarnJobSubmissionClient client = Tang.Factory.getTang().newInjector(yarnJobSubmissionClientConfig)
295        .getInstance(YarnJobSubmissionClient.class);
296
297    client.launch(yarnSubmission);
298
299    LOG.log(Level.INFO, "Returned from launch in Java YarnJobSubmissionClient");
300    System.exit(0);
301    LOG.log(Level.INFO, "End of main in Java YarnJobSubmissionClient");
302  }
303}