001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.webserver; 020 021import org.apache.reef.driver.ProgressProvider; 022import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 023import org.apache.reef.driver.parameters.ClientCloseHandlers; 024import org.apache.reef.runtime.common.files.REEFFileNames; 025import org.apache.reef.tang.InjectionFuture; 026import org.apache.reef.tang.Tang; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.tang.exceptions.InjectionException; 029import org.apache.reef.util.logging.LogLevelName; 030import org.apache.reef.util.logging.LogParser; 031import org.apache.reef.util.logging.LoggingScopeFactory; 032import org.apache.reef.util.logging.LoggingScopeImpl; 033import org.apache.reef.wake.EventHandler; 034 035import javax.inject.Inject; 036import javax.servlet.ServletException; 037import javax.servlet.http.HttpServletResponse; 038import java.io.IOException; 039import java.io.PrintWriter; 040import java.net.InetSocketAddress; 041import java.nio.charset.StandardCharsets; 042import java.nio.file.Files; 043import java.nio.file.Paths; 044import java.util.Arrays; 045import java.util.ArrayList; 046import java.util.List; 047import java.util.Map; 048import java.util.Set; 049import java.util.logging.Level; 050import java.util.logging.Logger; 051 052/** 053 * Http handler for REEF events. 054 */ 055public final class HttpServerReefEventHandler implements HttpHandler { 056 057 private static final Logger LOG = Logger.getLogger(HttpServerReefEventHandler.class.getName()); 058 059 private static final String VER = "v1"; 060 private final String driverStdoutFile; 061 private final String driverStderrFile; 062 063 private final ReefEventStateManager reefStateManager; 064 private final Set<EventHandler<Void>> clientCloseHandlers; 065 private final LoggingScopeFactory loggingScopeFactory; 066 private final InjectionFuture<ProgressProvider> progressProvider; 067 068 /** 069 * Log level string prefix in the log lines. 070 */ 071 private final String logLevelPrefix; 072 073 /** 074 * specification that would match URI request. 075 */ 076 private String uriSpecification = "Reef"; 077 078 @Inject 079 public HttpServerReefEventHandler( 080 final ReefEventStateManager reefStateManager, 081 @Parameter(ClientCloseHandlers.class) final Set<EventHandler<Void>> clientCloseHandlers, 082 @Parameter(LogLevelName.class) final String logLevel, 083 final LoggingScopeFactory loggingScopeFactory, 084 final REEFFileNames reefFileNames, 085 final InjectionFuture<ProgressProvider> progressProvider) { 086 this.reefStateManager = reefStateManager; 087 this.clientCloseHandlers = clientCloseHandlers; 088 this.loggingScopeFactory = loggingScopeFactory; 089 this.logLevelPrefix = new StringBuilder().append(logLevel).append(": ").toString(); 090 this.progressProvider = progressProvider; 091 driverStdoutFile = reefFileNames.getDriverStdoutFileName(); 092 driverStderrFile = reefFileNames.getDriverStderrFileName(); 093 } 094 095 /** 096 * read a file and output it as a String. 097 */ 098 private static String readFile(final String fileName) throws IOException { 099 return new String(Files.readAllBytes(Paths.get(fileName)), StandardCharsets.UTF_8); 100 } 101 102 /** 103 * @return URI specification for the handler. 104 */ 105 @Override 106 public String getUriSpecification() { 107 return uriSpecification; 108 } 109 110 /** 111 * set URI specification. 112 */ 113 public void setUriSpecification(final String s) { 114 uriSpecification = s; 115 } 116 117 /** 118 * Event handler that is called when receiving a http request. 119 */ 120 @Override 121 public void onHttpRequest( 122 final ParsedHttpRequest parsedHttpRequest, 123 final HttpServletResponse response) throws IOException, ServletException { 124 125 LOG.log(Level.INFO, "HttpServerReefEventHandler in webserver onHttpRequest is called: {0}", 126 parsedHttpRequest.getRequestUri()); 127 128 final String version = parsedHttpRequest.getVersion().toLowerCase(); 129 final String target = parsedHttpRequest.getTargetEntity().toLowerCase(); 130 131 switch (target) { 132 case "evaluators": { 133 final String queryStr = parsedHttpRequest.getQueryString(); 134 if (queryStr == null || queryStr.isEmpty()) { 135 if (version.equals(VER)) { 136 writeEvaluatorsJsonOutput(response); 137 } else { 138 writeEvaluatorsWebOutput(response); 139 } 140 } else { 141 handleQueries(response, parsedHttpRequest.getQueryMap(), version); 142 } 143 break; 144 } 145 case "driver": 146 if (version.equals(VER)) { 147 writeDriverJsonInformation(response); 148 } else { 149 writeDriverWebInformation(response); 150 } 151 break; 152 case "close": 153 for (final EventHandler<Void> e : clientCloseHandlers) { 154 e.onNext(null); 155 } 156 response.getWriter().println("Enforced closing"); 157 break; 158 case "kill": 159 reefStateManager.onClientKill(); 160 response.getWriter().println("Killing"); 161 break; 162 case "duration": 163 final ArrayList<String> lines = 164 LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.DURATION, LoggingScopeImpl.TOKEN, null); 165 writeLines(response, lines, "Performance..."); 166 break; 167 case "stages": 168 final ArrayList<String> starts = 169 LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.START_PREFIX, logLevelPrefix, null); 170 final ArrayList<String> exits = 171 LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.EXIT_PREFIX, logLevelPrefix, 172 LoggingScopeImpl.DURATION); 173 final ArrayList<String> startsStages = LogParser.findStages(starts, LogParser.START_INDICATORS); 174 final ArrayList<String> endStages = LogParser.findStages(exits, LogParser.END_INDICATORS); 175 final ArrayList<String> result = LogParser.mergeStages(startsStages, endStages); 176 writeLines(response, result, "Current Stages..."); 177 break; 178 case "logfile": 179 final List<String> names = parsedHttpRequest.getQueryMap().get("filename"); 180 final PrintWriter writer = response.getWriter(); 181 if (names == null || names.size() == 0) { 182 writer.println("File name is not provided"); 183 } else { 184 final String fileName = names.get(0); 185 if (!fileName.equals(driverStdoutFile) && !fileName.equals(driverStderrFile)) { 186 writer.println(String.format("Unsupported file names: [%s] ", fileName)); 187 } else { 188 try { 189 final byte[] outputBody = readFile(fileName).getBytes(StandardCharsets.UTF_8); 190 writer.print(Arrays.toString(outputBody)); 191 } catch (final IOException e) { 192 writer.println(String.format("Cannot find the log file: [%s].", fileName)); 193 } 194 } 195 } 196 break; 197 case "progress": 198 response.getWriter().println(progressProvider.get().getProgress()); 199 break; 200 default: 201 response.getWriter().println(String.format("Unsupported query for entity: [%s].", target)); 202 } 203 } 204 205 /** 206 * handle HTTP queries. 207 * Example of a query: http://localhost:8080/reef/Evaluators/?id=Node-2-1403225213803&id=Node-1-1403225213712 208 */ 209 private void handleQueries( 210 final HttpServletResponse response, 211 final Map<String, List<String>> queries, 212 final String version) throws IOException { 213 214 LOG.log(Level.INFO, "HttpServerReefEventHandler handleQueries is called"); 215 216 for (final Map.Entry<String, List<String>> entry : queries.entrySet()) { 217 final String queryTarget = entry.getKey().toLowerCase(); 218 219 switch (queryTarget) { 220 case "id": 221 if (version.equals(VER)) { 222 writeEvaluatorInfoJsonOutput(response, entry.getValue()); 223 } else { 224 writeEvaluatorInfoWebOutput(response, entry.getValue()); 225 } 226 break; 227 default: 228 response.getWriter().println("Unsupported query : " + queryTarget); 229 break; 230 } 231 } 232 } 233 234 /** 235 * Write Evaluator info as JSON format to HTTP Response. 236 */ 237 private void writeEvaluatorInfoJsonOutput( 238 final HttpServletResponse response, final List<String> ids) throws IOException { 239 try { 240 final EvaluatorInfoSerializer serializer = 241 Tang.Factory.getTang().newInjector().getInstance(EvaluatorInfoSerializer.class); 242 final AvroEvaluatorsInfo evaluatorsInfo = 243 serializer.toAvro(ids, this.reefStateManager.getEvaluators()); 244 writeResponse(response, serializer.toString(evaluatorsInfo)); 245 } catch (final InjectionException e) { 246 LOG.log(Level.SEVERE, "Error in injecting EvaluatorInfoSerializer.", e); 247 writeResponse(response, "Error in injecting EvaluatorInfoSerializer: " + e); 248 } 249 } 250 251 /** 252 * Write Evaluator info on the Response so that to display on web page directly. 253 * This is for direct browser queries. 254 */ 255 private void writeEvaluatorInfoWebOutput( 256 final HttpServletResponse response, final List<String> ids) throws IOException { 257 258 for (final String id : ids) { 259 260 final EvaluatorDescriptor evaluatorDescriptor = this.reefStateManager.getEvaluators().get(id); 261 final PrintWriter writer = response.getWriter(); 262 263 if (evaluatorDescriptor != null) { 264 final String nodeId = evaluatorDescriptor.getNodeDescriptor().getId(); 265 final String nodeName = evaluatorDescriptor.getNodeDescriptor().getName(); 266 final InetSocketAddress address = 267 evaluatorDescriptor.getNodeDescriptor().getInetSocketAddress(); 268 269 writer.println("Evaluator Id: " + id); 270 writer.write("<br/>"); 271 writer.println("Evaluator Node Id: " + nodeId); 272 writer.write("<br/>"); 273 writer.println("Evaluator Node Name: " + nodeName); 274 writer.write("<br/>"); 275 writer.println("Evaluator InternetAddress: " + address); 276 writer.write("<br/>"); 277 writer.println("Evaluator Memory: " + evaluatorDescriptor.getMemory()); 278 writer.write("<br/>"); 279 writer.println("Evaluator Core: " + evaluatorDescriptor.getNumberOfCores()); 280 writer.write("<br/>"); 281 writer.println("Evaluator Type: " + evaluatorDescriptor.getProcess()); 282 writer.write("<br/>"); 283 writer.println("Evaluator Runtime Name: " + evaluatorDescriptor.getRuntimeName()); 284 writer.write("<br/>"); 285 } else { 286 writer.println("Incorrect Evaluator Id: " + id); 287 } 288 } 289 } 290 291 /** 292 * Get all evaluator ids and send it back to response as JSON. 293 */ 294 private void writeEvaluatorsJsonOutput(final HttpServletResponse response) throws IOException { 295 LOG.log(Level.INFO, "HttpServerReefEventHandler writeEvaluatorsJsonOutput is called"); 296 try { 297 final EvaluatorListSerializer serializer = 298 Tang.Factory.getTang().newInjector().getInstance(EvaluatorListSerializer.class); 299 final AvroEvaluatorList evaluatorList = serializer.toAvro( 300 this.reefStateManager.getEvaluators(), this.reefStateManager.getEvaluators().size(), 301 this.reefStateManager.getStartTime()); 302 writeResponse(response, serializer.toString(evaluatorList)); 303 } catch (final InjectionException e) { 304 LOG.log(Level.SEVERE, "Error in injecting EvaluatorListSerializer.", e); 305 writeResponse(response, "Error in injecting EvaluatorListSerializer: " + e); 306 } 307 } 308 309 /** 310 * Get all evaluator ids and send it back to response so that can be displayed on web. 311 * 312 * @param response 313 * @throws IOException 314 */ 315 private void writeEvaluatorsWebOutput(final HttpServletResponse response) throws IOException { 316 317 LOG.log(Level.INFO, "HttpServerReefEventHandler writeEvaluatorsWebOutput is called"); 318 319 final PrintWriter writer = response.getWriter(); 320 321 writer.println("<h1>Evaluators:</h1>"); 322 323 for (final Map.Entry<String, EvaluatorDescriptor> entry 324 : this.reefStateManager.getEvaluators().entrySet()) { 325 326 final String key = entry.getKey(); 327 final EvaluatorDescriptor descriptor = entry.getValue(); 328 329 writer.println("Evaluator Id: " + key); 330 writer.write("<br/>"); 331 writer.println("Evaluator Name: " + descriptor.getNodeDescriptor().getName()); 332 writer.write("<br/>"); 333 } 334 writer.write("<br/>"); 335 writer.println("Total number of Evaluators: " + this.reefStateManager.getEvaluators().size()); 336 writer.write("<br/>"); 337 writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime())); 338 } 339 340 /** 341 * Write Driver Info as JSON string to Response. 342 */ 343 private void writeDriverJsonInformation(final HttpServletResponse response) throws IOException { 344 345 LOG.log(Level.INFO, "HttpServerReefEventHandler writeDriverJsonInformation invoked."); 346 347 try { 348 final DriverInfoSerializer serializer = 349 Tang.Factory.getTang().newInjector().getInstance(DriverInfoSerializer.class); 350 final AvroDriverInfo driverInfo = serializer.toAvro( 351 this.reefStateManager.getDriverEndpointIdentifier(), this.reefStateManager.getStartTime(), 352 this.reefStateManager.getServicesInfo()); 353 writeResponse(response, serializer.toString(driverInfo)); 354 } catch (final InjectionException e) { 355 LOG.log(Level.SEVERE, "Error in injecting DriverInfoSerializer.", e); 356 writeResponse(response, "Error in injecting DriverInfoSerializer: " + e); 357 } 358 } 359 360 /** 361 * Write a String to HTTP Response. 362 */ 363 private void writeResponse(final HttpServletResponse response, final String data) throws IOException { 364 final byte[] outputBody = data.getBytes(StandardCharsets.UTF_8); 365 response.getOutputStream().write(outputBody); 366 } 367 368 /** 369 * Get driver information. 370 */ 371 private void writeDriverWebInformation(final HttpServletResponse response) throws IOException { 372 373 LOG.log(Level.INFO, "HttpServerReefEventHandler writeDriverWebInformation invoked."); 374 375 final PrintWriter writer = response.getWriter(); 376 377 writer.println("<h1>Driver Information:</h1>"); 378 379 writer.println(String.format("Driver Remote Identifier:[%s]", 380 this.reefStateManager.getDriverEndpointIdentifier())); 381 writer.write("<br/><br/>"); 382 383 writer.println(String.format("Services registered on Driver:")); 384 writer.write("<br/><br/>"); 385 for (final AvroReefServiceInfo service : this.reefStateManager.getServicesInfo()) { 386 writer.println(String.format("Service: [%s] , Information: [%s]", service.getServiceName(), 387 service.getServiceInfo())); 388 writer.write("<br/><br/>"); 389 } 390 391 writer.println(String.format("Driver Start Time:[%s]", this.reefStateManager.getStartTime())); 392 } 393 394 /** 395 * Write lines in ArrayList to the response writer. 396 * @param response 397 * @param lines 398 * @param header 399 * @throws IOException 400 */ 401 private void writeLines(final HttpServletResponse response, final ArrayList<String> lines, final String header) 402 throws IOException { 403 LOG.log(Level.INFO, "HttpServerReefEventHandler writeLines is called"); 404 405 final PrintWriter writer = response.getWriter(); 406 407 writer.println("<h1>" + header + "</h1>"); 408 409 for (final String line : lines) { 410 writer.println(line); 411 writer.write("<br/>"); 412 } 413 writer.write("<br/>"); 414 } 415}