001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.hdinsight.client.yarnrest; 020 021import org.apache.commons.io.IOUtils; 022import org.apache.http.Header; 023import org.apache.http.HttpHost; 024import org.apache.http.auth.AuthScope; 025import org.apache.http.auth.UsernamePasswordCredentials; 026import org.apache.http.client.AuthCache; 027import org.apache.http.client.CredentialsProvider; 028import org.apache.http.client.methods.CloseableHttpResponse; 029import org.apache.http.client.methods.HttpGet; 030import org.apache.http.client.methods.HttpPost; 031import org.apache.http.client.methods.HttpPut; 032import org.apache.http.client.protocol.HttpClientContext; 033import org.apache.http.entity.ContentType; 034import org.apache.http.entity.StringEntity; 035import org.apache.http.impl.auth.BasicScheme; 036import org.apache.http.impl.client.BasicAuthCache; 037import org.apache.http.impl.client.BasicCredentialsProvider; 038import org.apache.http.impl.client.CloseableHttpClient; 039import org.apache.http.message.BasicHeader; 040import org.apache.reef.runtime.hdinsight.parameters.HDInsightInstanceURL; 041import org.apache.reef.runtime.hdinsight.parameters.HDInsightPassword; 042import org.apache.reef.runtime.hdinsight.parameters.HDInsightUsername; 043import org.apache.reef.tang.annotations.Parameter; 044import org.codehaus.jackson.map.ObjectMapper; 045 046import javax.inject.Inject; 047import java.io.IOException; 048import java.io.StringWriter; 049import java.net.URI; 050import java.net.URISyntaxException; 051import java.util.List; 052import java.util.logging.Level; 053import java.util.logging.Logger; 054 055/** 056 * Represents an HDInsight instance. 057 */ 058public final class HDInsightInstance { 059 060 private static final Logger LOG = Logger.getLogger(HDInsightInstance.class.getName()); 061 private static final String APPLICATION_KILL_MESSAGE = "{\"state\":\"KILLED\"}"; 062 063 private final ObjectMapper objectMapper = new ObjectMapper(); 064 private final Header[] headers; 065 private final HttpClientContext httpClientContext; 066 067 private final String instanceUrl; 068 private final CloseableHttpClient httpClient; 069 070 @Inject 071 HDInsightInstance(@Parameter(HDInsightUsername.class) final String username, 072 @Parameter(HDInsightPassword.class) final String password, 073 @Parameter(HDInsightInstanceURL.class) final String instanceUrl, 074 final CloseableHttpClient client) throws URISyntaxException, IOException { 075 this.httpClient = client; 076 this.instanceUrl = instanceUrl.endsWith("/") ? instanceUrl : instanceUrl + "/"; 077 final String host = this.getHost(); 078 this.headers = new Header[]{ 079 new BasicHeader("Host", host) 080 }; 081 this.httpClientContext = getClientContext(host, username, password); 082 } 083 084 /** 085 * Request an ApplicationId from the cluster. 086 * 087 * @return 088 * @throws IOException 089 */ 090 public ApplicationID getApplicationID() throws IOException { 091 final String url = "ws/v1/cluster/apps/new-application"; 092 final HttpPost post = preparePost(url); 093 try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) { 094 final String message = IOUtils.toString(response.getEntity().getContent()); 095 final ApplicationID result = this.objectMapper.readValue(message, ApplicationID.class); 096 return result; 097 } 098 } 099 100 /** 101 * Submits an application for execution. 102 * 103 * @param applicationSubmission 104 * @throws IOException 105 */ 106 public void submitApplication(final ApplicationSubmission applicationSubmission) throws IOException { 107 final String url = "ws/v1/cluster/apps"; 108 final HttpPost post = preparePost(url); 109 110 final StringWriter writer = new StringWriter(); 111 try { 112 this.objectMapper.writeValue(writer, applicationSubmission); 113 } catch (final IOException e) { 114 throw new RuntimeException(e); 115 } 116 final String message = writer.toString(); 117 LOG.log(Level.FINE, "Sending:\n{0}", message.replace("\n", "\n\t")); 118 post.setEntity(new StringEntity(message, ContentType.APPLICATION_JSON)); 119 120 try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) { 121 final String responseMessage = IOUtils.toString(response.getEntity().getContent()); 122 LOG.log(Level.FINE, "Response: {0}", responseMessage.replace("\n", "\n\t")); 123 } 124 } 125 126 /** 127 * Issues a YARN kill command to the application. 128 * 129 * @param applicationId 130 */ 131 public void killApplication(final String applicationId) throws IOException { 132 final String url = this.getApplicationURL(applicationId) + "/state"; 133 final HttpPut put = preparePut(url); 134 put.setEntity(new StringEntity(APPLICATION_KILL_MESSAGE, ContentType.APPLICATION_JSON)); 135 this.httpClient.execute(put, this.httpClientContext); 136 } 137 138 /** 139 * Gets the application state given a YARN application ID. 140 * @param applicationId 141 * @return Application state of the requested application. 142 */ 143 public ApplicationState getApplication(final String applicationId) throws IOException { 144 final String url = this.getApplicationURL(applicationId); 145 final HttpGet get = prepareGet(url); 146 try (final CloseableHttpResponse response = this.httpClient.execute(get, this.httpClientContext)) { 147 final String message = IOUtils.toString(response.getEntity().getContent()); 148 final ApplicationResponse result = this.objectMapper.readValue(message, ApplicationResponse.class); 149 return result.getApplicationState(); 150 } 151 } 152 153 public List<ApplicationState> listApplications() throws IOException { 154 final String url = "ws/v1/cluster/apps"; 155 final HttpGet get = prepareGet(url); 156 try (final CloseableHttpResponse response = this.httpClient.execute(get, this.httpClientContext)) { 157 final String message = IOUtils.toString(response.getEntity().getContent()); 158 final ListApplicationResponse result = this.objectMapper.readValue(message, ListApplicationResponse.class); 159 return result.getApplicationStates(); 160 } 161 } 162 163 /** 164 * @param applicationId 165 * @return the URL that can be used to issue application level messages. 166 */ 167 public String getApplicationURL(final String applicationId) { 168 return "ws/v1/cluster/apps/" + applicationId; 169 } 170 171 private String getHost() throws URISyntaxException { 172 final URI uri = new URI(this.instanceUrl); 173 return uri.getHost(); 174 } 175 176 /** 177 * Creates a HttpGet request with all the common headers. 178 * 179 * @param url 180 * @return 181 */ 182 private HttpGet prepareGet(final String url) { 183 final HttpGet httpGet = new HttpGet(this.instanceUrl + url); 184 for (final Header header : this.headers) { 185 httpGet.addHeader(header); 186 } 187 return httpGet; 188 } 189 190 /** 191 * Creates a HttpPost request with all the common headers. 192 * 193 * @param url 194 * @return 195 */ 196 private HttpPost preparePost(final String url) { 197 final HttpPost httpPost = new HttpPost(this.instanceUrl + url); 198 for (final Header header : this.headers) { 199 httpPost.addHeader(header); 200 } 201 return httpPost; 202 } 203 204 /** 205 * Creates a HttpPut request with all the common headers. 206 * @param url 207 * @return 208 */ 209 private HttpPut preparePut(final String url) { 210 final HttpPut httpPut = new HttpPut(this.instanceUrl + url); 211 for (final Header header : this.headers) { 212 httpPut.addHeader(header); 213 } 214 return httpPut; 215 } 216 217 private HttpClientContext getClientContext(final String hostname, final String username, final String password) 218 throws IOException { 219 final HttpHost targetHost = new HttpHost(hostname, 443, "https"); 220 final HttpClientContext result = HttpClientContext.create(); 221 222 // Setup credentials provider 223 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 224 225 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); 226 result.setCredentialsProvider(credentialsProvider); 227 228 // Setup preemptive authentication 229 final AuthCache authCache = new BasicAuthCache(); 230 final BasicScheme basicAuth = new BasicScheme(); 231 authCache.put(targetHost, basicAuth); 232 result.setAuthCache(authCache); 233 final HttpGet httpget = new HttpGet("/"); 234 235 // Prime the cache 236 try (final CloseableHttpResponse response = this.httpClient.execute(targetHost, httpget, result)) { 237 // empty try block used to automatically close resources 238 } 239 return result; 240 } 241}