This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.hdinsight.client;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.yarn.api.ApplicationConstants;
023import org.apache.reef.annotations.audience.ClientSide;
024import org.apache.reef.annotations.audience.Private;
025import org.apache.reef.proto.ClientRuntimeProtocol;
026import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
027import org.apache.reef.runtime.common.files.ClasspathProvider;
028import org.apache.reef.runtime.common.files.JobJarMaker;
029import org.apache.reef.runtime.common.files.REEFFileNames;
030import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
031import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
032import org.apache.reef.runtime.hdinsight.client.yarnrest.*;
033import org.apache.reef.tang.Configuration;
034import org.apache.reef.tang.Configurations;
035import org.apache.reef.tang.annotations.Parameter;
036import org.apache.reef.tang.formats.ConfigurationSerializer;
037
038import javax.inject.Inject;
039import java.io.File;
040import java.io.IOException;
041import java.util.List;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Handles job submission to a HDInsight instance.
047 */
048@ClientSide
049@Private
050public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler {
051
052  private static final Logger LOG = Logger.getLogger(HDInsightJobSubmissionHandler.class.getName());
053
054  private final AzureUploader uploader;
055  private final JobJarMaker jobJarMaker;
056  private final HDInsightInstance hdInsightInstance;
057  private final ConfigurationSerializer configurationSerializer;
058  private final REEFFileNames filenames;
059  private final ClasspathProvider classpath;
060  private final double jvmHeapSlack;
061
062  @Inject
063  HDInsightJobSubmissionHandler(final AzureUploader uploader,
064                                final JobJarMaker jobJarMaker,
065                                final HDInsightInstance hdInsightInstance,
066                                final ConfigurationSerializer configurationSerializer,
067                                final REEFFileNames filenames,
068                                final ClasspathProvider classpath,
069                                final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
070    this.uploader = uploader;
071    this.jobJarMaker = jobJarMaker;
072    this.hdInsightInstance = hdInsightInstance;
073    this.configurationSerializer = configurationSerializer;
074    this.filenames = filenames;
075    this.classpath = classpath;
076    this.jvmHeapSlack = jvmHeapSlack;
077  }
078
079  @Override
080  public void close() {
081    LOG.log(Level.WARNING, ".close() is inconsequential with the HDInsight runtime");
082  }
083
084  @Override
085  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
086
087    try {
088
089      LOG.log(Level.FINE, "Requesting Application ID from HDInsight.");
090      final ApplicationID applicationID = this.hdInsightInstance.getApplicationID();
091
092      LOG.log(Level.INFO, "Submitting application {0} to YARN.", applicationID.getId());
093
094      LOG.log(Level.FINE, "Creating a job folder on Azure.");
095      final String jobFolderURL = this.uploader.createJobFolder(applicationID.getId());
096
097      LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
098      final Configuration driverConfiguration =
099          makeDriverConfiguration(jobSubmissionProto, applicationID.getId(), jobFolderURL);
100
101      LOG.log(Level.FINE, "Making Job JAR.");
102      final File jobSubmissionJarFile =
103          this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
104
105      LOG.log(Level.FINE, "Uploading Job JAR to Azure.");
106      final FileResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile);
107
108      LOG.log(Level.FINE, "Assembling application submission.");
109      final String command = getCommandString(jobSubmissionProto);
110
111      final ApplicationSubmission applicationSubmission = new ApplicationSubmission()
112          .setApplicationId(applicationID.getId())
113          .setApplicationName(jobSubmissionProto.getIdentifier())
114          .setResource(getResource(jobSubmissionProto))
115          .setContainerInfo(new ContainerInfo()
116              .addFileResource(this.filenames.getREEFFolderName(), uploadedFile)
117              .addCommand(command));
118
119      this.hdInsightInstance.submitApplication(applicationSubmission);
120      LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", applicationID.getId());
121
122    } catch (final IOException ex) {
123      LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex);
124      throw new RuntimeException(ex);
125    }
126  }
127
128  /**
129   * Extracts the resource demands from the jobSubmissionProto.
130   */
131  private final Resource getResource(
132      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
133
134    return new Resource()
135        .setMemory(String.valueOf(jobSubmissionProto.getDriverMemory()))
136        .setvCores("1");
137  }
138
139  /**
140   * Assembles the command to execute the Driver.
141   */
142  private String getCommandString(
143      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
144    return StringUtils.join(getCommandList(jobSubmissionProto), ' ');
145  }
146
147  /**
148   * Assembles the command to execute the Driver in list form.
149   */
150  private List<String> getCommandList(
151      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
152
153    return new JavaLaunchCommandBuilder()
154        .setJavaPath("%JAVA_HOME%/bin/java")
155        .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
156        .setLaunchID(jobSubmissionProto.getIdentifier())
157        .setConfigurationFileName(this.filenames.getDriverConfigurationPath())
158        .setClassPath(this.classpath.getDriverClasspath())
159        .setMemory(jobSubmissionProto.getDriverMemory())
160        .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
161        .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
162        .build();
163  }
164
165  private Configuration makeDriverConfiguration(
166      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
167      final String applicationId,
168      final String jobFolderURL) throws IOException {
169
170    final Configuration hdinsightDriverConfiguration = HDInsightDriverConfiguration.CONF
171        .set(HDInsightDriverConfiguration.JOB_IDENTIFIER, applicationId)
172        .set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobFolderURL)
173        .set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack)
174        .build();
175
176    return Configurations.merge(
177        this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()),
178        hdinsightDriverConfiguration);
179  }
180}