001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.hdinsight.client; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.yarn.api.ApplicationConstants; 023import org.apache.reef.annotations.audience.ClientSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.runtime.common.client.DriverConfigurationProvider; 026import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; 027import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; 028import org.apache.reef.runtime.common.files.ClasspathProvider; 029import org.apache.reef.runtime.common.files.JobJarMaker; 030import org.apache.reef.runtime.common.files.REEFFileNames; 031import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 032import org.apache.reef.runtime.hdinsight.client.yarnrest.*; 033import org.apache.reef.tang.Configuration; 034 035import javax.inject.Inject; 036import java.io.File; 037import java.io.IOException; 038import java.util.Collections; 039import java.net.URI; 040import java.util.List; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Handles job submission to a HDInsight instance. 046 */ 047@ClientSide 048@Private 049public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler { 050 051 private static final Logger LOG = Logger.getLogger(HDInsightJobSubmissionHandler.class.getName()); 052 053 private final AzureUploader uploader; 054 private final JobJarMaker jobJarMaker; 055 private final HDInsightInstance hdInsightInstance; 056 private final REEFFileNames filenames; 057 private final ClasspathProvider classpath; 058 private final DriverConfigurationProvider driverConfigurationProvider; 059 060 @Inject 061 HDInsightJobSubmissionHandler(final AzureUploader uploader, 062 final JobJarMaker jobJarMaker, 063 final HDInsightInstance hdInsightInstance, 064 final REEFFileNames filenames, 065 final ClasspathProvider classpath, 066 final DriverConfigurationProvider driverConfigurationProvider) { 067 this.uploader = uploader; 068 this.jobJarMaker = jobJarMaker; 069 this.hdInsightInstance = hdInsightInstance; 070 this.filenames = filenames; 071 this.classpath = classpath; 072 this.driverConfigurationProvider = driverConfigurationProvider; 073 } 074 075 @Override 076 public void close() { 077 LOG.log(Level.WARNING, ".close() is inconsequential with the HDInsight runtime"); 078 } 079 080 @Override 081 public void onNext(final JobSubmissionEvent jobSubmissionEvent) { 082 083 try { 084 085 LOG.log(Level.FINE, "Requesting Application ID from HDInsight."); 086 final ApplicationID applicationID = this.hdInsightInstance.getApplicationID(); 087 088 LOG.log(Level.INFO, "Submitting application {0} to YARN.", applicationID.getApplicationId()); 089 090 LOG.log(Level.FINE, "Creating a job folder on Azure."); 091 final URI jobFolderURL = this.uploader.createJobFolder(applicationID.getApplicationId()); 092 093 LOG.log(Level.FINE, "Assembling Configuration for the Driver."); 094 final Configuration driverConfiguration = 095 makeDriverConfiguration(jobSubmissionEvent, applicationID.getApplicationId(), jobFolderURL); 096 097 LOG.log(Level.FINE, "Making Job JAR."); 098 final File jobSubmissionJarFile = 099 this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration); 100 101 LOG.log(Level.FINE, "Uploading Job JAR to Azure."); 102 final LocalResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile); 103 104 LOG.log(Level.FINE, "Assembling application submission."); 105 final String command = getCommandString(jobSubmissionEvent); 106 107 final ApplicationSubmission applicationSubmission = new ApplicationSubmission() 108 .setApplicationId(applicationID.getApplicationId()) 109 .setApplicationName(jobSubmissionEvent.getIdentifier()) 110 .setResource(getResource(jobSubmissionEvent)) 111 .setAmContainerSpec(new AmContainerSpec() 112 .addLocalResource(this.filenames.getREEFFolderName(), uploadedFile) 113 .setCommand(command)); 114 115 this.hdInsightInstance.submitApplication(applicationSubmission); 116 LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", 117 applicationID.getApplicationId()); 118 119 } catch (final IOException ex) { 120 LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex); 121 throw new RuntimeException(ex); 122 } 123 } 124 125 /** 126 * Extracts the resource demands from the jobSubmissionEvent. 127 */ 128 private Resource getResource( 129 final JobSubmissionEvent jobSubmissionEvent) { 130 131 return new Resource() 132 .setMemory(jobSubmissionEvent.getDriverMemory().get()) 133 .setvCores(1); 134 } 135 136 /** 137 * Assembles the command to execute the Driver. 138 */ 139 private String getCommandString( 140 final JobSubmissionEvent jobSubmissionEvent) { 141 return StringUtils.join(getCommandList(jobSubmissionEvent), ' '); 142 } 143 144 /** 145 * Assembles the command to execute the Driver in list form. 146 */ 147 private List<String> getCommandList( 148 final JobSubmissionEvent jobSubmissionEvent) { 149 150 return new JavaLaunchCommandBuilder() 151 .setJavaPath("%JAVA_HOME%/bin/java") 152 .setConfigurationFilePaths(Collections.singletonList(this.filenames.getDriverConfigurationPath())) 153 .setClassPath(this.classpath.getDriverClasspath()) 154 .setMemory(jobSubmissionEvent.getDriverMemory().get()) 155 .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName()) 156 .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName()) 157 .build(); 158 } 159 160 private Configuration makeDriverConfiguration( 161 final JobSubmissionEvent jobSubmissionEvent, 162 final String applicationId, 163 final URI jobFolderURL) throws IOException { 164 165 return this.driverConfigurationProvider.getDriverConfiguration(jobFolderURL, 166 jobSubmissionEvent.getRemoteId(), 167 applicationId, 168 jobSubmissionEvent.getConfiguration()); 169 } 170}