This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.hdinsight.client.yarnrest;
020
021import org.apache.commons.io.IOUtils;
022import org.apache.commons.lang.NotImplementedException;
023import org.apache.http.Header;
024import org.apache.http.HttpHost;
025import org.apache.http.auth.AuthScope;
026import org.apache.http.auth.UsernamePasswordCredentials;
027import org.apache.http.client.AuthCache;
028import org.apache.http.client.CredentialsProvider;
029import org.apache.http.client.methods.CloseableHttpResponse;
030import org.apache.http.client.methods.HttpGet;
031import org.apache.http.client.methods.HttpPost;
032import org.apache.http.client.protocol.HttpClientContext;
033import org.apache.http.entity.ContentType;
034import org.apache.http.entity.StringEntity;
035import org.apache.http.impl.auth.BasicScheme;
036import org.apache.http.impl.client.BasicAuthCache;
037import org.apache.http.impl.client.BasicCredentialsProvider;
038import org.apache.http.impl.client.CloseableHttpClient;
039import org.apache.http.message.BasicHeader;
040import org.apache.reef.runtime.hdinsight.parameters.HDInsightInstanceURL;
041import org.apache.reef.runtime.hdinsight.parameters.HDInsightPassword;
042import org.apache.reef.runtime.hdinsight.parameters.HDInsightUsername;
043import org.apache.reef.tang.annotations.Parameter;
044import org.codehaus.jackson.map.ObjectMapper;
045
046import javax.inject.Inject;
047import java.io.IOException;
048import java.io.StringWriter;
049import java.net.URI;
050import java.net.URISyntaxException;
051import java.util.List;
052import java.util.logging.Level;
053import java.util.logging.Logger;
054
055/**
056 * Represents an HDInsight instance.
057 */
058public final class HDInsightInstance {
059
060  private static final Logger LOG = Logger.getLogger(HDInsightInstance.class.getName());
061  private static final String APPLICATION_KILL_MESSAGE = "{\"app:{\"state\":\"KILLED\"}}";
062
063  private final ObjectMapper objectMapper = new ObjectMapper();
064  private final Header[] headers;
065  private final HttpClientContext httpClientContext;
066
067  private final String instanceUrl;
068  private final String username;
069  private final CloseableHttpClient httpClient;
070
071  @Inject
072  HDInsightInstance(final @Parameter(HDInsightUsername.class) String username,
073                    final @Parameter(HDInsightPassword.class) String password,
074                    final @Parameter(HDInsightInstanceURL.class) String instanceUrl,
075                    final CloseableHttpClient client) throws URISyntaxException, IOException {
076    this.httpClient = client;
077    this.instanceUrl = instanceUrl.endsWith("/") ? instanceUrl : instanceUrl + "/";
078    this.username = username;
079    final String host = this.getHost();
080    this.headers = new Header[]{
081        new BasicHeader("Host", host)
082    };
083    this.httpClientContext = getClientContext(host, username, password);
084  }
085
086  /**
087   * Request an ApplicationId from the cluster.
088   *
089   * @return
090   * @throws IOException
091   */
092  public ApplicationID getApplicationID() throws IOException {
093    final String url = "ws/v1/cluster/appids?user.name=" + this.username;
094    final HttpPost post = preparePost(url);
095    try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) {
096      final String message = IOUtils.toString(response.getEntity().getContent());
097      final ApplicationID result = this.objectMapper.readValue(message, ApplicationID.class);
098      return result;
099    }
100  }
101
102  /**
103   * Submits an application for execution.
104   *
105   * @param applicationSubmission
106   * @throws IOException
107   */
108  public void submitApplication(final ApplicationSubmission applicationSubmission) throws IOException {
109
110    final String applicationId = applicationSubmission.getApplicationId();
111    final String url = "ws/v1/cluster/apps/" + applicationId + "?user.name=" + this.username;
112    final HttpPost post = preparePost(url);
113
114    final StringWriter writer = new StringWriter();
115    try {
116      this.objectMapper.writeValue(writer, applicationSubmission);
117    } catch (final IOException e) {
118      throw new RuntimeException(e);
119    }
120    final String message = writer.toString();
121    LOG.log(Level.FINE, "Sending:\n{0}", message.replace("\n", "\n\t"));
122    post.setEntity(new StringEntity(message, ContentType.APPLICATION_JSON));
123
124    try (final CloseableHttpResponse response = this.httpClient.execute(post, this.httpClientContext)) {
125      final String responseMessage = IOUtils.toString(response.getEntity().getContent());
126      LOG.log(Level.FINE, "Response: {0}", responseMessage.replace("\n", "\n\t"));
127    }
128  }
129
130  /**
131   * Issues a YARN kill command to the application.
132   *
133   * @param applicationId
134   */
135  public void killApplication(final String applicationId) {
136    throw new NotImplementedException();
137  }
138
139  public List<ApplicationState> listApplications() throws IOException {
140    final String url = "ws/v1/cluster/apps";
141    final HttpGet get = prepareGet(url);
142    try (final CloseableHttpResponse response = this.httpClient.execute(get, this.httpClientContext)) {
143      final String message = IOUtils.toString(response.getEntity().getContent());
144      final ApplicationResponse result = this.objectMapper.readValue(message, ApplicationResponse.class);
145      return result.getApplicationStates();
146    }
147  }
148
149  /**
150   * @param applicationId
151   * @return the URL that can be used to issue application level messages.
152   */
153  public String getApplicationURL(final String applicationId) {
154    return "ws/v1/cluster/apps/" + applicationId;
155  }
156
157  private final String getHost() throws URISyntaxException {
158    final URI uri = new URI(this.instanceUrl);
159    return uri.getHost();
160  }
161
162  /**
163   * Creates a HttpGet request with all the common headers.
164   *
165   * @param url
166   * @return
167   */
168  private HttpGet prepareGet(final String url) {
169    final HttpGet httpGet = new HttpGet(this.instanceUrl + url);
170    for (final Header header : this.headers) {
171      httpGet.addHeader(header);
172    }
173    return httpGet;
174  }
175
176  /**
177   * Creates a HttpPost request with all the common headers.
178   *
179   * @param url
180   * @return
181   */
182  private HttpPost preparePost(final String url) {
183    final HttpPost httpPost = new HttpPost(this.instanceUrl + url);
184    for (final Header header : this.headers) {
185      httpPost.addHeader(header);
186    }
187    return httpPost;
188  }
189
190
191  private HttpClientContext getClientContext(final String hostname, final String username, final String password) throws IOException {
192    final HttpHost targetHost = new HttpHost(hostname, 443, "https");
193    final HttpClientContext result = HttpClientContext.create();
194    // Setup credentials provider
195    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
196    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
197    result.setCredentialsProvider(credentialsProvider);
198
199    // Setup preemptive authentication
200    final AuthCache authCache = new BasicAuthCache();
201    final BasicScheme basicAuth = new BasicScheme();
202    authCache.put(targetHost, basicAuth);
203    result.setAuthCache(authCache);
204    final HttpGet httpget = new HttpGet("/");
205
206    // Prime the cache
207    try (final CloseableHttpResponse response = this.httpClient.execute(targetHost, httpget, result)) {
208    }
209    return result;
210  }
211}