This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.webserver;
020
021import org.apache.reef.driver.ProgressProvider;
022import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
023import org.apache.reef.driver.parameters.ClientCloseHandlers;
024import org.apache.reef.runtime.common.files.REEFFileNames;
025import org.apache.reef.tang.InjectionFuture;
026import org.apache.reef.tang.Tang;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.tang.exceptions.InjectionException;
029import org.apache.reef.util.logging.LogLevelName;
030import org.apache.reef.util.logging.LogParser;
031import org.apache.reef.util.logging.LoggingScopeFactory;
032import org.apache.reef.util.logging.LoggingScopeImpl;
033import org.apache.reef.wake.EventHandler;
034
035import javax.inject.Inject;
036import javax.servlet.ServletException;
037import javax.servlet.http.HttpServletResponse;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.net.InetSocketAddress;
041import java.nio.charset.StandardCharsets;
042import java.nio.file.Files;
043import java.nio.file.Paths;
044import java.util.Arrays;
045import java.util.ArrayList;
046import java.util.List;
047import java.util.Map;
048import java.util.Set;
049import java.util.logging.Level;
050import java.util.logging.Logger;
051
052/**
053 * Http handler for REEF events.
054 */
055public final class HttpServerReefEventHandler implements HttpHandler {
056
057  private static final Logger LOG = Logger.getLogger(HttpServerReefEventHandler.class.getName());
058
059  private static final String VER = "v1";
060  private final String driverStdoutFile;
061  private final String driverStderrFile;
062
063  private final ReefEventStateManager reefStateManager;
064  private final Set<EventHandler<Void>> clientCloseHandlers;
065  private final LoggingScopeFactory loggingScopeFactory;
066  private final InjectionFuture<ProgressProvider> progressProvider;
067
068  /**
069   * Log level string prefix in the log lines.
070   */
071  private final String logLevelPrefix;
072
073  /**
074   * specification that would match URI request.
075   */
076  private String uriSpecification = "Reef";
077
078  @Inject
079  public HttpServerReefEventHandler(
080      final ReefEventStateManager reefStateManager,
081      @Parameter(ClientCloseHandlers.class) final Set<EventHandler<Void>> clientCloseHandlers,
082      @Parameter(LogLevelName.class) final String logLevel,
083      final LoggingScopeFactory loggingScopeFactory,
084      final REEFFileNames reefFileNames,
085      final InjectionFuture<ProgressProvider> progressProvider) {
086    this.reefStateManager = reefStateManager;
087    this.clientCloseHandlers = clientCloseHandlers;
088    this.loggingScopeFactory = loggingScopeFactory;
089    this.logLevelPrefix = new StringBuilder().append(logLevel).append(": ").toString();
090    this.progressProvider = progressProvider;
091    driverStdoutFile = reefFileNames.getDriverStdoutFileName();
092    driverStderrFile = reefFileNames.getDriverStderrFileName();
093  }
094
095  /**
096   * read a file and output it as a String.
097   */
098  private static String readFile(final String fileName) throws IOException {
099    return new String(Files.readAllBytes(Paths.get(fileName)), StandardCharsets.UTF_8);
100  }
101
102  /**
103   * @return URI specification for the handler.
104   */
105  @Override
106  public String getUriSpecification() {
107    return uriSpecification;
108  }
109
110  /**
111   * set URI specification.
112   */
113  public void setUriSpecification(final String s) {
114    uriSpecification = s;
115  }
116
117  /**
118   * Event handler that is called when receiving a http request.
119   */
120  @Override
121  public void onHttpRequest(
122      final ParsedHttpRequest parsedHttpRequest,
123      final HttpServletResponse response) throws IOException, ServletException {
124
125    LOG.log(Level.INFO, "HttpServerReefEventHandler in webserver onHttpRequest is called: {0}",
126        parsedHttpRequest.getRequestUri());
127
128    final String version = parsedHttpRequest.getVersion().toLowerCase();
129    final String target = parsedHttpRequest.getTargetEntity().toLowerCase();
130
131    switch (target) {
132    case "evaluators": {
133      final String queryStr = parsedHttpRequest.getQueryString();
134      if (queryStr == null || queryStr.isEmpty()) {
135        if (version.equals(VER)) {
136          writeEvaluatorsJsonOutput(response);
137        } else {
138          writeEvaluatorsWebOutput(response);
139        }
140      } else {
141        handleQueries(response, parsedHttpRequest.getQueryMap(), version);
142      }
143      break;
144    }
145    case "driver":
146      if (version.equals(VER)) {
147        writeDriverJsonInformation(response);
148      } else {
149        writeDriverWebInformation(response);
150      }
151      break;
152    case "close":
153      for (final EventHandler<Void> e : clientCloseHandlers) {
154        e.onNext(null);
155      }
156      response.getWriter().println("Enforced closing");
157      break;
158    case "kill":
159      reefStateManager.onClientKill();
160      response.getWriter().println("Killing");
161      break;
162    case "duration":
163      final ArrayList<String> lines =
164          LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.DURATION, LoggingScopeImpl.TOKEN, null);
165      writeLines(response, lines, "Performance...");
166      break;
167    case "stages":
168      final ArrayList<String> starts =
169          LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.START_PREFIX, logLevelPrefix, null);
170      final ArrayList<String> exits =
171          LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.EXIT_PREFIX, logLevelPrefix,
172              LoggingScopeImpl.DURATION);
173      final ArrayList<String> startsStages = LogParser.findStages(starts, LogParser.START_INDICATORS);
174      final ArrayList<String> endStages = LogParser.findStages(exits, LogParser.END_INDICATORS);
175      final ArrayList<String> result = LogParser.mergeStages(startsStages, endStages);
176      writeLines(response, result, "Current Stages...");
177      break;
178    case "logfile":
179      final List<String> names = parsedHttpRequest.getQueryMap().get("filename");
180      final PrintWriter writer = response.getWriter();
181      if (names == null || names.size() == 0) {
182        writer.println("File name is not provided");
183      } else {
184        final String fileName = names.get(0);
185        if (!fileName.equals(driverStdoutFile) && !fileName.equals(driverStderrFile)) {
186          writer.println(String.format("Unsupported file names: [%s] ", fileName));
187        } else {
188          try {
189            final byte[] outputBody = readFile(fileName).getBytes(StandardCharsets.UTF_8);
190            writer.print(Arrays.toString(outputBody));
191          } catch (final IOException e) {
192            writer.println(String.format("Cannot find the log file: [%s].", fileName));
193          }
194        }
195      }
196      break;
197    case "progress":
198      response.getWriter().println(progressProvider.get().getProgress());
199      break;
200    default:
201      response.getWriter().println(String.format("Unsupported query for entity: [%s].", target));
202    }
203  }
204
205  /**
206   * handle HTTP queries.
207   * Example of a query: http://localhost:8080/reef/Evaluators/?id=Node-2-1403225213803&id=Node-1-1403225213712
208   */
209  private void handleQueries(
210      final HttpServletResponse response,
211      final Map<String, List<String>> queries,
212      final String version) throws IOException {
213
214    LOG.log(Level.INFO, "HttpServerReefEventHandler handleQueries is called");
215
216    for (final Map.Entry<String, List<String>> entry : queries.entrySet()) {
217      final String queryTarget = entry.getKey().toLowerCase();
218
219      switch (queryTarget) {
220      case "id":
221        if (version.equals(VER)) {
222          writeEvaluatorInfoJsonOutput(response, entry.getValue());
223        } else {
224          writeEvaluatorInfoWebOutput(response, entry.getValue());
225        }
226        break;
227      default:
228        response.getWriter().println("Unsupported query : " + queryTarget);
229        break;
230      }
231    }
232  }
233
234  /**
235   * Write Evaluator info as JSON format to HTTP Response.
236   */
237  private void writeEvaluatorInfoJsonOutput(
238      final HttpServletResponse response, final List<String> ids) throws IOException {
239    try {
240      final EvaluatorInfoSerializer serializer =
241          Tang.Factory.getTang().newInjector().getInstance(EvaluatorInfoSerializer.class);
242      final AvroEvaluatorsInfo evaluatorsInfo =
243          serializer.toAvro(ids, this.reefStateManager.getEvaluators());
244      writeResponse(response, serializer.toString(evaluatorsInfo));
245    } catch (final InjectionException e) {
246      LOG.log(Level.SEVERE, "Error in injecting EvaluatorInfoSerializer.", e);
247      writeResponse(response, "Error in injecting EvaluatorInfoSerializer: " + e);
248    }
249  }
250
251  /**
252   * Write Evaluator info on the Response so that to display on web page directly.
253   * This is for direct browser queries.
254   */
255  private void writeEvaluatorInfoWebOutput(
256      final HttpServletResponse response, final List<String> ids) throws IOException {
257
258    for (final String id : ids) {
259
260      final EvaluatorDescriptor evaluatorDescriptor = this.reefStateManager.getEvaluators().get(id);
261      final PrintWriter writer = response.getWriter();
262
263      if (evaluatorDescriptor != null) {
264        final String nodeId = evaluatorDescriptor.getNodeDescriptor().getId();
265        final String nodeName = evaluatorDescriptor.getNodeDescriptor().getName();
266        final InetSocketAddress address =
267            evaluatorDescriptor.getNodeDescriptor().getInetSocketAddress();
268
269        writer.println("Evaluator Id: " + id);
270        writer.write("<br/>");
271        writer.println("Evaluator Node Id: " + nodeId);
272        writer.write("<br/>");
273        writer.println("Evaluator Node Name: " + nodeName);
274        writer.write("<br/>");
275        writer.println("Evaluator InternetAddress: " + address);
276        writer.write("<br/>");
277        writer.println("Evaluator Memory: " + evaluatorDescriptor.getMemory());
278        writer.write("<br/>");
279        writer.println("Evaluator Core: " + evaluatorDescriptor.getNumberOfCores());
280        writer.write("<br/>");
281        writer.println("Evaluator Type: " + evaluatorDescriptor.getProcess());
282        writer.write("<br/>");
283        writer.println("Evaluator Runtime Name: " + evaluatorDescriptor.getRuntimeName());
284        writer.write("<br/>");
285      } else {
286        writer.println("Incorrect Evaluator Id: " + id);
287      }
288    }
289  }
290
291  /**
292   * Get all evaluator ids and send it back to response as JSON.
293   */
294  private void writeEvaluatorsJsonOutput(final HttpServletResponse response) throws IOException {
295    LOG.log(Level.INFO, "HttpServerReefEventHandler writeEvaluatorsJsonOutput is called");
296    try {
297      final EvaluatorListSerializer serializer =
298          Tang.Factory.getTang().newInjector().getInstance(EvaluatorListSerializer.class);
299      final AvroEvaluatorList evaluatorList = serializer.toAvro(
300          this.reefStateManager.getEvaluators(), this.reefStateManager.getEvaluators().size(),
301          this.reefStateManager.getStartTime());
302      writeResponse(response, serializer.toString(evaluatorList));
303    } catch (final InjectionException e) {
304      LOG.log(Level.SEVERE, "Error in injecting EvaluatorListSerializer.", e);
305      writeResponse(response, "Error in injecting EvaluatorListSerializer: " + e);
306    }
307  }
308
309  /**
310   * Get all evaluator ids and send it back to response so that can be displayed on web.
311   *
312   * @param response
313   * @throws IOException
314   */
315  private void writeEvaluatorsWebOutput(final HttpServletResponse response) throws IOException {
316
317    LOG.log(Level.INFO, "HttpServerReefEventHandler writeEvaluatorsWebOutput is called");
318
319    final PrintWriter writer = response.getWriter();
320
321    writer.println("<h1>Evaluators:</h1>");
322
323    for (final Map.Entry<String, EvaluatorDescriptor> entry
324        : this.reefStateManager.getEvaluators().entrySet()) {
325
326      final String key = entry.getKey();
327      final EvaluatorDescriptor descriptor = entry.getValue();
328
329      writer.println("Evaluator Id: " + key);
330      writer.write("<br/>");
331      writer.println("Evaluator Name: " + descriptor.getNodeDescriptor().getName());
332      writer.write("<br/>");
333    }
334    writer.write("<br/>");
335    writer.println("Total number of Evaluators: " + this.reefStateManager.getEvaluators().size());
336    writer.write("<br/>");
337    writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime()));
338  }
339
340  /**
341   * Write Driver Info as JSON string to Response.
342   */
343  private void writeDriverJsonInformation(final HttpServletResponse response) throws IOException {
344
345    LOG.log(Level.INFO, "HttpServerReefEventHandler writeDriverJsonInformation invoked.");
346
347    try {
348      final DriverInfoSerializer serializer =
349          Tang.Factory.getTang().newInjector().getInstance(DriverInfoSerializer.class);
350      final AvroDriverInfo driverInfo = serializer.toAvro(
351          this.reefStateManager.getDriverEndpointIdentifier(), this.reefStateManager.getStartTime(),
352          this.reefStateManager.getServicesInfo());
353      writeResponse(response, serializer.toString(driverInfo));
354    } catch (final InjectionException e) {
355      LOG.log(Level.SEVERE, "Error in injecting DriverInfoSerializer.", e);
356      writeResponse(response, "Error in injecting DriverInfoSerializer: " + e);
357    }
358  }
359
360  /**
361   * Write a String to HTTP Response.
362   */
363  private void writeResponse(final HttpServletResponse response, final String data) throws IOException {
364    final byte[] outputBody = data.getBytes(StandardCharsets.UTF_8);
365    response.getOutputStream().write(outputBody);
366  }
367
368  /**
369   * Get driver information.
370   */
371  private void writeDriverWebInformation(final HttpServletResponse response) throws IOException {
372
373    LOG.log(Level.INFO, "HttpServerReefEventHandler writeDriverWebInformation invoked.");
374
375    final PrintWriter writer = response.getWriter();
376
377    writer.println("<h1>Driver Information:</h1>");
378
379    writer.println(String.format("Driver Remote Identifier:[%s]",
380        this.reefStateManager.getDriverEndpointIdentifier()));
381    writer.write("<br/><br/>");
382
383    writer.println(String.format("Services registered on Driver:"));
384    writer.write("<br/><br/>");
385    for (final AvroReefServiceInfo service : this.reefStateManager.getServicesInfo()) {
386      writer.println(String.format("Service: [%s] , Information: [%s]", service.getServiceName(),
387          service.getServiceInfo()));
388      writer.write("<br/><br/>");
389    }
390
391    writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime()));
392  }
393
394  /**
395   * Write lines in ArrayList to the response writer.
396   * @param response
397   * @param lines
398   * @param header
399   * @throws IOException
400   */
401  private void writeLines(final HttpServletResponse response, final ArrayList<String> lines, final String header)
402      throws IOException {
403    LOG.log(Level.INFO, "HttpServerReefEventHandler writeLines is called");
404
405    final PrintWriter writer = response.getWriter();
406
407    writer.println("<h1>" + header + "</h1>");
408
409    for (final String line : lines) {
410      writer.println(line);
411      writer.write("<br/>");
412    }
413    writer.write("<br/>");
414  }
415}