001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.hdinsight.client; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.yarn.api.ApplicationConstants; 023import org.apache.reef.annotations.audience.ClientSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.runtime.common.client.DriverConfigurationProvider; 026import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; 027import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; 028import org.apache.reef.runtime.common.files.ClasspathProvider; 029import org.apache.reef.runtime.common.files.JobJarMaker; 030import org.apache.reef.runtime.common.files.REEFFileNames; 031import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 032import org.apache.reef.runtime.hdinsight.client.yarnrest.*; 033import org.apache.reef.tang.Configuration; 034 035import javax.inject.Inject; 036import java.io.File; 037import java.io.IOException; 038import java.util.Collections; 039import java.net.URI; 040import java.util.List; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Handles job submission to a HDInsight instance. 046 */ 047@ClientSide 048@Private 049public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler { 050 051 private static final Logger LOG = Logger.getLogger(HDInsightJobSubmissionHandler.class.getName()); 052 053 private final AzureUploader uploader; 054 private final JobJarMaker jobJarMaker; 055 private final HDInsightInstance hdInsightInstance; 056 private final REEFFileNames filenames; 057 private final ClasspathProvider classpath; 058 private final DriverConfigurationProvider driverConfigurationProvider; 059 060 private String applicationId; 061 062 @Inject 063 HDInsightJobSubmissionHandler(final AzureUploader uploader, 064 final JobJarMaker jobJarMaker, 065 final HDInsightInstance hdInsightInstance, 066 final REEFFileNames filenames, 067 final ClasspathProvider classpath, 068 final DriverConfigurationProvider driverConfigurationProvider) { 069 this.uploader = uploader; 070 this.jobJarMaker = jobJarMaker; 071 this.hdInsightInstance = hdInsightInstance; 072 this.filenames = filenames; 073 this.classpath = classpath; 074 this.driverConfigurationProvider = driverConfigurationProvider; 075 } 076 077 @Override 078 public void close() { 079 LOG.log(Level.WARNING, ".close() is inconsequential with the HDInsight runtime"); 080 } 081 082 @Override 083 public void onNext(final JobSubmissionEvent jobSubmissionEvent) { 084 085 try { 086 087 LOG.log(Level.FINE, "Requesting Application ID from HDInsight."); 088 final String appId = this.hdInsightInstance.getApplicationID().getApplicationId(); 089 090 LOG.log(Level.INFO, "Submitting application {0} to YARN.", appId); 091 092 LOG.log(Level.FINE, "Creating a job folder on Azure."); 093 final URI jobFolderURL = this.uploader.createJobFolder(appId); 094 095 LOG.log(Level.FINE, "Assembling Configuration for the Driver."); 096 final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, appId, jobFolderURL); 097 098 LOG.log(Level.FINE, "Making Job JAR."); 099 final File jobSubmissionJarFile = 100 this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration); 101 102 LOG.log(Level.FINE, "Uploading Job JAR to Azure."); 103 final LocalResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile); 104 105 LOG.log(Level.FINE, "Assembling application submission."); 106 final String command = getCommandString(jobSubmissionEvent); 107 108 final ApplicationSubmission applicationSubmission = new ApplicationSubmission() 109 .setApplicationId(appId) 110 .setApplicationName(jobSubmissionEvent.getIdentifier()) 111 .setResource(getResource(jobSubmissionEvent)) 112 .setAmContainerSpec(new AmContainerSpec() 113 .addLocalResource(this.filenames.getREEFFolderName(), uploadedFile) 114 .setCommand(command)); 115 116 this.hdInsightInstance.submitApplication(applicationSubmission); 117 this.applicationId = appId; 118 LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", appId); 119 120 } catch (final IOException ex) { 121 LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex); 122 throw new RuntimeException(ex); 123 } 124 } 125 126 /** 127 * Get the RM application ID. 128 * Return null if the application has not been submitted yet, or was submitted unsuccessfully. 129 * @return string application ID or null if no app has been submitted yet. 130 */ 131 @Override 132 public String getApplicationId() { 133 return this.applicationId; 134 } 135 136 /** 137 * Extracts the resource demands from the jobSubmissionEvent. 138 */ 139 private Resource getResource( 140 final JobSubmissionEvent jobSubmissionEvent) { 141 142 return new Resource() 143 .setMemory(jobSubmissionEvent.getDriverMemory().get()) 144 .setvCores(1); 145 } 146 147 /** 148 * Assembles the command to execute the Driver. 149 */ 150 private String getCommandString( 151 final JobSubmissionEvent jobSubmissionEvent) { 152 return StringUtils.join(getCommandList(jobSubmissionEvent), ' '); 153 } 154 155 /** 156 * Assembles the command to execute the Driver in list form. 157 */ 158 private List<String> getCommandList( 159 final JobSubmissionEvent jobSubmissionEvent) { 160 161 return new JavaLaunchCommandBuilder() 162 .setJavaPath("%JAVA_HOME%/bin/java") 163 .setConfigurationFilePaths(Collections.singletonList(this.filenames.getDriverConfigurationPath())) 164 .setClassPath(this.classpath.getDriverClasspath()) 165 .setMemory(jobSubmissionEvent.getDriverMemory().get()) 166 .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName()) 167 .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName()) 168 .build(); 169 } 170 171 private Configuration makeDriverConfiguration( 172 final JobSubmissionEvent jobSubmissionEvent, 173 final String appId, 174 final URI jobFolderURL) throws IOException { 175 176 return this.driverConfigurationProvider.getDriverConfiguration( 177 jobFolderURL, jobSubmissionEvent.getRemoteId(), appId, jobSubmissionEvent.getConfiguration()); 178 } 179}