This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.webserver;
020
021import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
022import org.apache.reef.driver.parameters.ClientCloseHandlers;
023import org.apache.reef.runtime.common.files.REEFFileNames;
024import org.apache.reef.tang.Tang;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.tang.exceptions.InjectionException;
027import org.apache.reef.util.logging.LogLevelName;
028import org.apache.reef.util.logging.LogParser;
029import org.apache.reef.util.logging.LoggingScopeFactory;
030import org.apache.reef.util.logging.LoggingScopeImpl;
031import org.apache.reef.wake.EventHandler;
032import javax.inject.Inject;
033import javax.servlet.ServletException;
034import javax.servlet.http.HttpServletResponse;
035import java.io.IOException;
036import java.io.PrintWriter;
037import java.net.InetSocketAddress;
038import java.nio.charset.Charset;
039import java.nio.file.Files;
040import java.nio.file.Paths;
041import java.util.ArrayList;
042import java.util.List;
043import java.util.Map;
044import java.util.Set;
045import java.util.logging.Level;
046import java.util.logging.Logger;
047
048public final class HttpServerReefEventHandler implements HttpHandler {
049
050  private static final Logger LOG = Logger.getLogger(HttpServerReefEventHandler.class.getName());
051
052  private static final String ver = "v1";
053  private final String driverStdoutFile;
054  private final String driverStderrFile;
055
056  private final ReefEventStateManager reefStateManager;
057  private final Set<EventHandler<Void>> clientCloseHandlers;
058  private final LoggingScopeFactory loggingScopeFactory;
059
060  /**
061   * Log level string prefix in the log lines
062   */
063  private final String logLevelPrefix;
064
065  /**
066   * specification that would match URI request.
067   */
068  private String uriSpecification = "Reef";
069
070  @Inject
071  public HttpServerReefEventHandler(
072      final ReefEventStateManager reefStateManager,
073      final @Parameter(ClientCloseHandlers.class) Set<EventHandler<Void>> clientCloseHandlers,
074      final @Parameter(LogLevelName.class) String logLevel,
075      final LoggingScopeFactory loggingScopeFactory,
076      final REEFFileNames reefFileNames) {
077    this.reefStateManager = reefStateManager;
078    this.clientCloseHandlers = clientCloseHandlers;
079    this.loggingScopeFactory = loggingScopeFactory;
080    this.logLevelPrefix = new StringBuilder().append(logLevel).append(": ").toString();
081    driverStdoutFile = reefFileNames.getDriverStdoutFileName();
082    driverStderrFile = reefFileNames.getDriverStderrFileName();
083  }
084
085  /**
086   * read a file and output it as a String.
087   */
088  private static String readFile(final String fileName) throws IOException {
089    return new String(Files.readAllBytes(Paths.get(fileName)));
090  }
091
092  /**
093   * @return URI specification for the handler.
094   */
095  @Override
096  public String getUriSpecification() {
097    return uriSpecification;
098  }
099
100  /**
101   * set URI specification.
102   */
103  public void setUriSpecification(final String s) {
104    uriSpecification = s;
105  }
106
107  /**
108   * Event handler that is called when receiving a http request.
109   */
110  @Override
111  public void onHttpRequest(
112      final ParsedHttpRequest parsedHttpRequest,
113      final HttpServletResponse response) throws IOException, ServletException {
114
115    LOG.log(Level.INFO, "HttpServerReefEventHandler in webserver onHttpRequest is called: {0}",
116        parsedHttpRequest.getRequestUri());
117
118    final String version = parsedHttpRequest.getVersion().toLowerCase();
119    final String target = parsedHttpRequest.getTargetEntity().toLowerCase();
120
121    switch (target) {
122      case "evaluators": {
123        final String queryStr = parsedHttpRequest.getQueryString();
124        if (queryStr == null || queryStr.isEmpty()) {
125          if (version.equals(ver)) {
126            writeEvaluatorsJsonOutput(response);
127          } else {
128            writeEvaluatorsWebOutput(response);
129          }
130        } else {
131          handleQueries(response, parsedHttpRequest.getQueryMap(), version);
132        }
133        break;
134      }
135      case "driver":
136        if (version.equals(ver)) {
137          writeDriverJsonInformation(response);
138        } else {
139          writeDriverWebInformation(response);
140        }
141        break;
142      case "close":
143        for (final EventHandler<Void> e : clientCloseHandlers) {
144          e.onNext(null);
145        }
146        response.getWriter().println("Enforced closing");
147        break;
148      case "kill":
149        reefStateManager.OnClientKill();
150        response.getWriter().println("Killing");
151        break;
152      case "duration":
153        final ArrayList<String> lines = LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.DURATION, LoggingScopeImpl.TOKEN, null);
154        writeLines(response, lines, "Performance...");
155        break;
156      case "stages":
157        final ArrayList<String> starts = LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.START_PREFIX, logLevelPrefix, null);
158        final ArrayList<String> exits = LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.EXIT_PREFIX, logLevelPrefix, LoggingScopeImpl.DURATION);
159        final ArrayList<String> startsStages = LogParser.findStages(starts, LogParser.startIndicators);
160        final ArrayList<String> endStages = LogParser.findStages(exits, LogParser.endIndicators);
161        final ArrayList<String> result = LogParser.mergeStages(startsStages, endStages);
162        writeLines(response, result, "Current Stages...");
163        break;
164      case "logfile":
165        final List names = parsedHttpRequest.getQueryMap().get("filename");
166        if (names == null || names.size() == 0) {
167          response.getWriter().println(String.format("File name is not provided"));
168        }
169
170        final String fileName = (String)names.get(0);
171        if (!fileName.equals(driverStdoutFile) && !fileName.equals(driverStderrFile)) {
172          response.getWriter().println(String.format("Unsupported file names: [%s] ", fileName));
173        }
174        try {
175          final byte[] outputBody = readFile((String) names.get(0)).getBytes(Charset.forName("UTF-8"));
176          response.getOutputStream().write(outputBody);
177        } catch(IOException e) {
178          response.getWriter().println(String.format("Cannot find the log file: [%s].", fileName));
179        }
180        break;
181      default:
182        response.getWriter().println(String.format("Unsupported query for entity: [%s].", target));
183    }
184  }
185
186  /**
187   * handle HTTP queries
188   * Example of a query: http://localhost:8080/reef/Evaluators/?id=Node-2-1403225213803&id=Node-1-1403225213712
189   */
190  private void handleQueries(
191      final HttpServletResponse response,
192      final Map<String, List<String>> queries,
193      final String version) throws IOException {
194
195    LOG.log(Level.INFO, "HttpServerReefEventHandler handleQueries is called");
196
197    for (final Map.Entry<String, List<String>> entry : queries.entrySet()) {
198      final String queryTarget = entry.getKey().toLowerCase();
199
200      switch (queryTarget) {
201        case "id":
202          if (version.equals(ver)) {
203            writeEvaluatorInfoJsonOutput(response, entry.getValue());
204          } else {
205            writeEvaluatorInfoWebOutput(response, entry.getValue());
206          }
207          break;
208        default:
209          response.getWriter().println("Unsupported query : " + queryTarget);
210          break;
211      }
212    }
213  }
214
215  /**
216   * Write Evaluator info as JSON format to HTTP Response.
217   */
218  private void writeEvaluatorInfoJsonOutput(
219      final HttpServletResponse response, final List<String> ids) throws IOException {
220    try {
221      final EvaluatorInfoSerializer serializer =
222          Tang.Factory.getTang().newInjector().getInstance(EvaluatorInfoSerializer.class);
223      final AvroEvaluatorsInfo evaluatorsInfo =
224          serializer.toAvro(ids, this.reefStateManager.getEvaluators());
225      writeResponse(response, serializer.toString(evaluatorsInfo));
226    } catch (final InjectionException e) {
227      LOG.log(Level.SEVERE, "Error in injecting EvaluatorInfoSerializer.", e);
228      writeResponse(response, "Error in injecting EvaluatorInfoSerializer: " + e);
229    }
230  }
231
232  /**
233   * Write Evaluator info on the Response so that to display on web page directly.
234   * This is for direct browser queries.
235   */
236  private void writeEvaluatorInfoWebOutput(
237      final HttpServletResponse response, final List<String> ids) throws IOException {
238
239    for (final String id : ids) {
240
241      final EvaluatorDescriptor evaluatorDescriptor = this.reefStateManager.getEvaluators().get(id);
242      final PrintWriter writer = response.getWriter();
243
244      if (evaluatorDescriptor != null) {
245        final String nodeId = evaluatorDescriptor.getNodeDescriptor().getId();
246        final String nodeName = evaluatorDescriptor.getNodeDescriptor().getName();
247        final InetSocketAddress address =
248            evaluatorDescriptor.getNodeDescriptor().getInetSocketAddress();
249
250        writer.println("Evaluator Id: " + id);
251        writer.write("<br/>");
252        writer.println("Evaluator Node Id: " + nodeId);
253        writer.write("<br/>");
254        writer.println("Evaluator Node Name: " + nodeName);
255        writer.write("<br/>");
256        writer.println("Evaluator InternetAddress: " + address);
257        writer.write("<br/>");
258        writer.println("Evaluator Memory: " + evaluatorDescriptor.getMemory());
259        writer.write("<br/>");
260        writer.println("Evaluator Core: " + evaluatorDescriptor.getNumberOfCores());
261        writer.write("<br/>");
262        writer.println("Evaluator Type: " + evaluatorDescriptor.getType());
263        writer.write("<br/>");
264      } else {
265        writer.println("Incorrect Evaluator Id: " + id);
266      }
267    }
268  }
269
270  /**
271   * Get all evaluator ids and send it back to response as JSON.
272   */
273  private void writeEvaluatorsJsonOutput(final HttpServletResponse response) throws IOException {
274    LOG.log(Level.INFO, "HttpServerReefEventHandler getEvaluators is called");
275    try {
276      final EvaluatorListSerializer serializer =
277          Tang.Factory.getTang().newInjector().getInstance(EvaluatorListSerializer.class);
278      final AvroEvaluatorList evaluatorList = serializer.toAvro(
279          this.reefStateManager.getEvaluators(), this.reefStateManager.getEvaluators().size(),
280          this.reefStateManager.getStartTime());
281      writeResponse(response, serializer.toString(evaluatorList));
282    } catch (final InjectionException e) {
283      LOG.log(Level.SEVERE, "Error in injecting EvaluatorListSerializer.", e);
284      writeResponse(response, "Error in injecting EvaluatorListSerializer: " + e);
285    }
286  }
287
288  /**
289   * Get all evaluator ids and send it back to response so that can be displayed on web
290   *
291   * @param response
292   * @throws IOException
293   */
294  private void writeEvaluatorsWebOutput(final HttpServletResponse response) throws IOException {
295
296    LOG.log(Level.INFO, "HttpServerReefEventHandler getEvaluators is called");
297
298    final PrintWriter writer = response.getWriter();
299
300    writer.println("<h1>Evaluators:</h1>");
301
302    for (final Map.Entry<String, EvaluatorDescriptor> entry
303        : this.reefStateManager.getEvaluators().entrySet()) {
304
305      final String key = entry.getKey();
306      final EvaluatorDescriptor descriptor = entry.getValue();
307
308      writer.println("Evaluator Id: " + key);
309      writer.write("<br/>");
310      writer.println("Evaluator Name: " + descriptor.getNodeDescriptor().getName());
311      writer.write("<br/>");
312    }
313    writer.write("<br/>");
314    writer.println("Total number of Evaluators: " + this.reefStateManager.getEvaluators().size());
315    writer.write("<br/>");
316    writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime()));
317  }
318
319  /**
320   * Write Driver Info as JSON string to Response.
321   */
322  private void writeDriverJsonInformation(final HttpServletResponse response) throws IOException {
323    try {
324      final DriverInfoSerializer serializer =
325          Tang.Factory.getTang().newInjector().getInstance(DriverInfoSerializer.class);
326      final AvroDriverInfo driverInfo = serializer.toAvro(
327          this.reefStateManager.getDriverEndpointIdentifier(), this.reefStateManager.getStartTime(), this.reefStateManager.getServicesInfo());
328      writeResponse(response, serializer.toString(driverInfo));
329    } catch (final InjectionException e) {
330      LOG.log(Level.SEVERE, "Error in injecting DriverInfoSerializer.", e);
331      writeResponse(response, "Error in injecting DriverInfoSerializer: " + e);
332    }
333  }
334
335  /**
336   * Write a String to HTTP Response.
337   */
338  private void writeResponse(final HttpServletResponse response, final String data) throws IOException {
339    final byte[] outputBody = data.getBytes(Charset.forName("UTF-8"));
340    response.getOutputStream().write(outputBody);
341  }
342
343  /**
344   * Get driver information.
345   */
346  private void writeDriverWebInformation(final HttpServletResponse response) throws IOException {
347
348    LOG.log(Level.INFO, "HttpServerReefEventHandler writeDriverInformation invoked.");
349
350    final PrintWriter writer = response.getWriter();
351
352    writer.println("<h1>Driver Information:</h1>");
353
354    writer.println(String.format("Driver Remote Identifier:[%s]",
355        this.reefStateManager.getDriverEndpointIdentifier()));
356    writer.write("<br/><br/>");
357
358    writer.println(String.format("Services registered on Driver:"));
359    writer.write("<br/><br/>");
360    for (final AvroReefServiceInfo service : this.reefStateManager.getServicesInfo()) {
361      writer.println(String.format("Service: [%s] , Information: [%s]", service.getServiceName(), service.getServiceInfo()));
362      writer.write("<br/><br/>");
363    }
364
365    writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime()));
366  }
367
368  /**
369   * Write lines in ArrayList to the response writer
370   * @param response
371   * @param lines
372   * @param header
373   * @throws IOException
374   */
375  private void writeLines(final HttpServletResponse response, final ArrayList<String> lines, final String header) throws IOException {
376    LOG.log(Level.INFO, "HttpServerReefEventHandler writeLines is called");
377
378    final PrintWriter writer = response.getWriter();
379
380    writer.println("<h1>" + header + "</h1>");
381
382    for (final String line : lines) {
383      writer.println(line);
384      writer.write("<br/>");
385    }
386    writer.write("<br/>");
387  }
388}