This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.hdinsight.client.yarnrest;
020
021import org.apache.commons.io.IOUtils;
022import org.apache.http.Header;
023import org.apache.http.HttpHost;
024import org.apache.http.auth.AuthScope;
025import org.apache.http.auth.UsernamePasswordCredentials;
026import org.apache.http.client.AuthCache;
027import org.apache.http.client.CredentialsProvider;
028import org.apache.http.client.methods.CloseableHttpResponse;
029import org.apache.http.client.methods.HttpGet;
030import org.apache.http.client.methods.HttpPost;
031import org.apache.http.client.methods.HttpPut;
032import org.apache.http.client.protocol.HttpClientContext;
033import org.apache.http.entity.ContentType;
034import org.apache.http.entity.StringEntity;
035import org.apache.http.impl.auth.BasicScheme;
036import org.apache.http.impl.client.BasicAuthCache;
037import org.apache.http.impl.client.BasicCredentialsProvider;
038import org.apache.http.impl.client.CloseableHttpClient;
039import org.apache.http.message.BasicHeader;
040import org.apache.reef.runtime.hdinsight.parameters.HDInsightInstanceURL;
041import org.apache.reef.runtime.hdinsight.parameters.HDInsightPassword;
042import org.apache.reef.runtime.hdinsight.parameters.HDInsightUsername;
043import org.apache.reef.tang.annotations.Parameter;
044import org.codehaus.jackson.map.ObjectMapper;
045
046import javax.inject.Inject;
047import java.io.IOException;
048import java.io.StringWriter;
049import java.net.URI;
050import java.net.URISyntaxException;
051import java.util.List;
052import java.util.logging.Level;
053import java.util.logging.Logger;
054
055/**
056 * Represents an HDInsight instance.
057 */
058public final class HDInsightInstance {
059
060  private static final Logger LOG = Logger.getLogger(HDInsightInstance.class.getName());
061  private static final String APPLICATION_KILL_MESSAGE = "{\"state\":\"KILLED\"}";
062
063  private final ObjectMapper objectMapper = new ObjectMapper();
064  private final Header[] headers;
065  private final HttpClientContext httpClientContext;
066
067  private final String instanceUrl;
068  private final CloseableHttpClient httpClient;
069
070  @Inject
071  HDInsightInstance(@Parameter(HDInsightUsername.class) final String username,
072                    @Parameter(HDInsightPassword.class) final String password,
073                    @Parameter(HDInsightInstanceURL.class) final String instanceUrl,
074                    final CloseableHttpClient client) throws URISyntaxException, IOException {
075    this.httpClient = client;
076    this.instanceUrl = instanceUrl.endsWith("/") ? instanceUrl : instanceUrl + "/";
077    final String host = this.getHost();
078    this.headers = new Header[]{
079        new BasicHeader("Host", host)
080    };
081    this.httpClientContext = getClientContext(host, username, password);
082  }
083
084  /**
085   * Request an ApplicationId from the cluster.
086   *
087   * @return
088   * @throws IOException
089   */
090  public ApplicationID getApplicationID() throws IOException {
091    final String url = "ws/v1/cluster/apps/new-application";
092    final HttpPost post = preparePost(url);
093    try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) {
094      final String message = IOUtils.toString(response.getEntity().getContent());
095      final ApplicationID result = this.objectMapper.readValue(message, ApplicationID.class);
096      return result;
097    }
098  }
099
100  /**
101   * Submits an application for execution.
102   *
103   * @param applicationSubmission
104   * @throws IOException
105   */
106  public void submitApplication(final ApplicationSubmission applicationSubmission) throws IOException {
107    final String url = "ws/v1/cluster/apps";
108    final HttpPost post = preparePost(url);
109
110    final StringWriter writer = new StringWriter();
111    try {
112      this.objectMapper.writeValue(writer, applicationSubmission);
113    } catch (final IOException e) {
114      throw new RuntimeException(e);
115    }
116    final String message = writer.toString();
117    LOG.log(Level.FINE, "Sending:\n{0}", message.replace("\n", "\n\t"));
118    post.setEntity(new StringEntity(message, ContentType.APPLICATION_JSON));
119
120    try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) {
121      final String responseMessage = IOUtils.toString(response.getEntity().getContent());
122      LOG.log(Level.FINE, "Response: {0}", responseMessage.replace("\n", "\n\t"));
123    }
124  }
125
126  /**
127   * Issues a YARN kill command to the application.
128   *
129   * @param applicationId
130   */
131  public void killApplication(final String applicationId) throws IOException {
132    final String url = this.getApplicationURL(applicationId) + "/state";
133    final HttpPut put = preparePut(url);
134    put.setEntity(new StringEntity(APPLICATION_KILL_MESSAGE, ContentType.APPLICATION_JSON));
135    this.httpClient.execute(put, this.httpClientContext);
136  }
137
138  /**
139   * Gets the application state given a YARN application ID.
140   * @param applicationId
141   * @return Application state of the requested application.
142   */
143  public ApplicationState getApplication(final String applicationId) throws IOException {
144    final String url = this.getApplicationURL(applicationId);
145    final HttpGet get = prepareGet(url);
146    try (final CloseableHttpResponse response = this.httpClient.execute(get, this.httpClientContext)) {
147      final String message = IOUtils.toString(response.getEntity().getContent());
148      final ApplicationResponse result = this.objectMapper.readValue(message, ApplicationResponse.class);
149      return result.getApplicationState();
150    }
151  }
152
153  public List<ApplicationState> listApplications() throws IOException {
154    final String url = "ws/v1/cluster/apps";
155    final HttpGet get = prepareGet(url);
156    try (final CloseableHttpResponse response = this.httpClient.execute(get, this.httpClientContext)) {
157      final String message = IOUtils.toString(response.getEntity().getContent());
158      final ListApplicationResponse result = this.objectMapper.readValue(message, ListApplicationResponse.class);
159      return result.getApplicationStates();
160    }
161  }
162
163  /**
164   * @param applicationId
165   * @return the URL that can be used to issue application level messages.
166   */
167  public String getApplicationURL(final String applicationId) {
168    return "ws/v1/cluster/apps/" + applicationId;
169  }
170
171  private String getHost() throws URISyntaxException {
172    final URI uri = new URI(this.instanceUrl);
173    return uri.getHost();
174  }
175
176  /**
177   * Creates a HttpGet request with all the common headers.
178   *
179   * @param url
180   * @return
181   */
182  private HttpGet prepareGet(final String url) {
183    final HttpGet httpGet = new HttpGet(this.instanceUrl + url);
184    for (final Header header : this.headers) {
185      httpGet.addHeader(header);
186    }
187    return httpGet;
188  }
189
190  /**
191   * Creates a HttpPost request with all the common headers.
192   *
193   * @param url
194   * @return
195   */
196  private HttpPost preparePost(final String url) {
197    final HttpPost httpPost = new HttpPost(this.instanceUrl + url);
198    for (final Header header : this.headers) {
199      httpPost.addHeader(header);
200    }
201    return httpPost;
202  }
203
204  /**
205   * Creates a HttpPut request with all the common headers.
206   * @param url
207   * @return
208   */
209  private HttpPut preparePut(final String url) {
210    final HttpPut httpPut = new HttpPut(this.instanceUrl + url);
211    for (final Header header : this.headers) {
212      httpPut.addHeader(header);
213    }
214    return httpPut;
215  }
216
217  private HttpClientContext getClientContext(final String hostname, final String username, final String password)
218      throws IOException {
219    final HttpHost targetHost = new HttpHost(hostname, 443, "https");
220    final HttpClientContext result = HttpClientContext.create();
221
222    // Setup credentials provider
223    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
224
225    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
226    result.setCredentialsProvider(credentialsProvider);
227
228    // Setup preemptive authentication
229    final AuthCache authCache = new BasicAuthCache();
230    final BasicScheme basicAuth = new BasicScheme();
231    authCache.put(targetHost, basicAuth);
232    result.setAuthCache(authCache);
233    final HttpGet httpget = new HttpGet("/");
234
235    // Prime the cache
236    try (final CloseableHttpResponse response = this.httpClient.execute(targetHost, httpget, result)) {
237      // empty try block used to automatically close resources
238    }
239    return result;
240  }
241}