001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.hdinsight.client.yarnrest; 020 021import org.apache.commons.io.IOUtils; 022import org.apache.commons.lang.NotImplementedException; 023import org.apache.http.Header; 024import org.apache.http.HttpHost; 025import org.apache.http.auth.AuthScope; 026import org.apache.http.auth.UsernamePasswordCredentials; 027import org.apache.http.client.AuthCache; 028import org.apache.http.client.CredentialsProvider; 029import org.apache.http.client.methods.CloseableHttpResponse; 030import org.apache.http.client.methods.HttpGet; 031import org.apache.http.client.methods.HttpPost; 032import org.apache.http.client.protocol.HttpClientContext; 033import org.apache.http.entity.ContentType; 034import org.apache.http.entity.StringEntity; 035import org.apache.http.impl.auth.BasicScheme; 036import org.apache.http.impl.client.BasicAuthCache; 037import org.apache.http.impl.client.BasicCredentialsProvider; 038import org.apache.http.impl.client.CloseableHttpClient; 039import org.apache.http.message.BasicHeader; 040import org.apache.reef.runtime.hdinsight.parameters.HDInsightInstanceURL; 041import org.apache.reef.runtime.hdinsight.parameters.HDInsightPassword; 042import org.apache.reef.runtime.hdinsight.parameters.HDInsightUsername; 043import org.apache.reef.tang.annotations.Parameter; 044import org.codehaus.jackson.map.ObjectMapper; 045 046import javax.inject.Inject; 047import java.io.IOException; 048import java.io.StringWriter; 049import java.net.URI; 050import java.net.URISyntaxException; 051import java.util.List; 052import java.util.logging.Level; 053import java.util.logging.Logger; 054 055/** 056 * Represents an HDInsight instance. 057 */ 058public final class HDInsightInstance { 059 060 private static final Logger LOG = Logger.getLogger(HDInsightInstance.class.getName()); 061 private static final String APPLICATION_KILL_MESSAGE = "{\"app:{\"state\":\"KILLED\"}}"; 062 063 private final ObjectMapper objectMapper = new ObjectMapper(); 064 private final Header[] headers; 065 private final HttpClientContext httpClientContext; 066 067 private final String instanceUrl; 068 private final String username; 069 private final CloseableHttpClient httpClient; 070 071 @Inject 072 HDInsightInstance(final @Parameter(HDInsightUsername.class) String username, 073 final @Parameter(HDInsightPassword.class) String password, 074 final @Parameter(HDInsightInstanceURL.class) String instanceUrl, 075 final CloseableHttpClient client) throws URISyntaxException, IOException { 076 this.httpClient = client; 077 this.instanceUrl = instanceUrl.endsWith("/") ? instanceUrl : instanceUrl + "/"; 078 this.username = username; 079 final String host = this.getHost(); 080 this.headers = new Header[]{ 081 new BasicHeader("Host", host) 082 }; 083 this.httpClientContext = getClientContext(host, username, password); 084 } 085 086 /** 087 * Request an ApplicationId from the cluster. 088 * 089 * @return 090 * @throws IOException 091 */ 092 public ApplicationID getApplicationID() throws IOException { 093 final String url = "ws/v1/cluster/appids?user.name=" + this.username; 094 final HttpPost post = preparePost(url); 095 try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) { 096 final String message = IOUtils.toString(response.getEntity().getContent()); 097 final ApplicationID result = this.objectMapper.readValue(message, ApplicationID.class); 098 return result; 099 } 100 } 101 102 /** 103 * Submits an application for execution. 104 * 105 * @param applicationSubmission 106 * @throws IOException 107 */ 108 public void submitApplication(final ApplicationSubmission applicationSubmission) throws IOException { 109 110 final String applicationId = applicationSubmission.getApplicationId(); 111 final String url = "ws/v1/cluster/apps/" + applicationId + "?user.name=" + this.username; 112 final HttpPost post = preparePost(url); 113 114 final StringWriter writer = new StringWriter(); 115 try { 116 this.objectMapper.writeValue(writer, applicationSubmission); 117 } catch (final IOException e) { 118 throw new RuntimeException(e); 119 } 120 final String message = writer.toString(); 121 LOG.log(Level.FINE, "Sending:\n{0}", message.replace("\n", "\n\t")); 122 post.setEntity(new StringEntity(message, ContentType.APPLICATION_JSON)); 123 124 try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) { 125 final String responseMessage = IOUtils.toString(response.getEntity().getContent()); 126 LOG.log(Level.FINE, "Response: {0}", responseMessage.replace("\n", "\n\t")); 127 } 128 } 129 130 /** 131 * Issues a YARN kill command to the application. 132 * 133 * @param applicationId 134 */ 135 public void killApplication(final String applicationId) { 136 throw new NotImplementedException(); 137 } 138 139 public List<ApplicationState> listApplications() throws IOException { 140 final String url = "ws/v1/cluster/apps"; 141 final HttpGet get = prepareGet(url); 142 try (final CloseableHttpResponse response = this.httpClient.execute(get, this.httpClientContext)) { 143 final String message = IOUtils.toString(response.getEntity().getContent()); 144 final ApplicationResponse result = this.objectMapper.readValue(message, ApplicationResponse.class); 145 return result.getApplicationStates(); 146 } 147 } 148 149 /** 150 * @param applicationId 151 * @return the URL that can be used to issue application level messages. 152 */ 153 public String getApplicationURL(final String applicationId) { 154 return "ws/v1/cluster/apps/" + applicationId; 155 } 156 157 private final String getHost() throws URISyntaxException { 158 final URI uri = new URI(this.instanceUrl); 159 return uri.getHost(); 160 } 161 162 /** 163 * Creates a HttpGet request with all the common headers. 164 * 165 * @param url 166 * @return 167 */ 168 private HttpGet prepareGet(final String url) { 169 final HttpGet httpGet = new HttpGet(this.instanceUrl + url); 170 for (final Header header : this.headers) { 171 httpGet.addHeader(header); 172 } 173 return httpGet; 174 } 175 176 /** 177 * Creates a HttpPost request with all the common headers. 178 * 179 * @param url 180 * @return 181 */ 182 private HttpPost preparePost(final String url) { 183 final HttpPost httpPost = new HttpPost(this.instanceUrl + url); 184 for (final Header header : this.headers) { 185 httpPost.addHeader(header); 186 } 187 return httpPost; 188 } 189 190 191 private HttpClientContext getClientContext(final String hostname, final String username, final String password) throws IOException { 192 final HttpHost targetHost = new HttpHost(hostname, 443, "https"); 193 final HttpClientContext result = HttpClientContext.create(); 194 // Setup credentials provider 195 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 196 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); 197 result.setCredentialsProvider(credentialsProvider); 198 199 // Setup preemptive authentication 200 final AuthCache authCache = new BasicAuthCache(); 201 final BasicScheme basicAuth = new BasicScheme(); 202 authCache.put(targetHost, basicAuth); 203 result.setAuthCache(authCache); 204 final HttpGet httpget = new HttpGet("/"); 205 206 // Prime the cache 207 try (final CloseableHttpResponse response = this.httpClient.execute(targetHost, httpget, result)) { 208 } 209 return result; 210 } 211}