001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.hdinsight.client; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.yarn.api.ApplicationConstants; 023import org.apache.reef.annotations.audience.ClientSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.proto.ClientRuntimeProtocol; 026import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; 027import org.apache.reef.runtime.common.files.ClasspathProvider; 028import org.apache.reef.runtime.common.files.JobJarMaker; 029import org.apache.reef.runtime.common.files.REEFFileNames; 030import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 031import org.apache.reef.runtime.common.parameters.JVMHeapSlack; 032import org.apache.reef.runtime.hdinsight.client.yarnrest.*; 033import org.apache.reef.tang.Configuration; 034import org.apache.reef.tang.Configurations; 035import org.apache.reef.tang.annotations.Parameter; 036import org.apache.reef.tang.formats.ConfigurationSerializer; 037 038import javax.inject.Inject; 039import java.io.File; 040import java.io.IOException; 041import java.util.List; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Handles job submission to a HDInsight instance. 047 */ 048@ClientSide 049@Private 050public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler { 051 052 private static final Logger LOG = Logger.getLogger(HDInsightJobSubmissionHandler.class.getName()); 053 054 private final AzureUploader uploader; 055 private final JobJarMaker jobJarMaker; 056 private final HDInsightInstance hdInsightInstance; 057 private final ConfigurationSerializer configurationSerializer; 058 private final REEFFileNames filenames; 059 private final ClasspathProvider classpath; 060 private final double jvmHeapSlack; 061 062 @Inject 063 HDInsightJobSubmissionHandler(final AzureUploader uploader, 064 final JobJarMaker jobJarMaker, 065 final HDInsightInstance hdInsightInstance, 066 final ConfigurationSerializer configurationSerializer, 067 final REEFFileNames filenames, 068 final ClasspathProvider classpath, 069 final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) { 070 this.uploader = uploader; 071 this.jobJarMaker = jobJarMaker; 072 this.hdInsightInstance = hdInsightInstance; 073 this.configurationSerializer = configurationSerializer; 074 this.filenames = filenames; 075 this.classpath = classpath; 076 this.jvmHeapSlack = jvmHeapSlack; 077 } 078 079 @Override 080 public void close() { 081 LOG.log(Level.WARNING, ".close() is inconsequential with the HDInsight runtime"); 082 } 083 084 @Override 085 public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { 086 087 try { 088 089 LOG.log(Level.FINE, "Requesting Application ID from HDInsight."); 090 final ApplicationID applicationID = this.hdInsightInstance.getApplicationID(); 091 092 LOG.log(Level.INFO, "Submitting application {0} to YARN.", applicationID.getId()); 093 094 LOG.log(Level.FINE, "Creating a job folder on Azure."); 095 final String jobFolderURL = this.uploader.createJobFolder(applicationID.getId()); 096 097 LOG.log(Level.FINE, "Assembling Configuration for the Driver."); 098 final Configuration driverConfiguration = 099 makeDriverConfiguration(jobSubmissionProto, applicationID.getId(), jobFolderURL); 100 101 LOG.log(Level.FINE, "Making Job JAR."); 102 final File jobSubmissionJarFile = 103 this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration); 104 105 LOG.log(Level.FINE, "Uploading Job JAR to Azure."); 106 final FileResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile); 107 108 LOG.log(Level.FINE, "Assembling application submission."); 109 final String command = getCommandString(jobSubmissionProto); 110 111 final ApplicationSubmission applicationSubmission = new ApplicationSubmission() 112 .setApplicationId(applicationID.getId()) 113 .setApplicationName(jobSubmissionProto.getIdentifier()) 114 .setResource(getResource(jobSubmissionProto)) 115 .setContainerInfo(new ContainerInfo() 116 .addFileResource(this.filenames.getREEFFolderName(), uploadedFile) 117 .addCommand(command)); 118 119 this.hdInsightInstance.submitApplication(applicationSubmission); 120 LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", applicationID.getId()); 121 122 } catch (final IOException ex) { 123 LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex); 124 throw new RuntimeException(ex); 125 } 126 } 127 128 /** 129 * Extracts the resource demands from the jobSubmissionProto. 130 */ 131 private final Resource getResource( 132 final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { 133 134 return new Resource() 135 .setMemory(String.valueOf(jobSubmissionProto.getDriverMemory())) 136 .setvCores("1"); 137 } 138 139 /** 140 * Assembles the command to execute the Driver. 141 */ 142 private String getCommandString( 143 final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { 144 return StringUtils.join(getCommandList(jobSubmissionProto), ' '); 145 } 146 147 /** 148 * Assembles the command to execute the Driver in list form. 149 */ 150 private List<String> getCommandList( 151 final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { 152 153 return new JavaLaunchCommandBuilder() 154 .setJavaPath("%JAVA_HOME%/bin/java") 155 .setErrorHandlerRID(jobSubmissionProto.getRemoteId()) 156 .setLaunchID(jobSubmissionProto.getIdentifier()) 157 .setConfigurationFileName(this.filenames.getDriverConfigurationPath()) 158 .setClassPath(this.classpath.getDriverClasspath()) 159 .setMemory(jobSubmissionProto.getDriverMemory()) 160 .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName()) 161 .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName()) 162 .build(); 163 } 164 165 private Configuration makeDriverConfiguration( 166 final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, 167 final String applicationId, 168 final String jobFolderURL) throws IOException { 169 170 final Configuration hdinsightDriverConfiguration = HDInsightDriverConfiguration.CONF 171 .set(HDInsightDriverConfiguration.JOB_IDENTIFIER, applicationId) 172 .set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobFolderURL) 173 .set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack) 174 .build(); 175 176 return Configurations.merge( 177 this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()), 178 hdinsightDriverConfiguration); 179 } 180}