001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.watcher; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.io.watcher.util.RunnableExecutingHandler; 023import org.apache.reef.tang.annotations.Name; 024import org.apache.reef.tang.annotations.NamedParameter; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.wake.EStage; 027import org.apache.reef.wake.impl.ThreadPoolStage; 028 029import javax.inject.Inject; 030import java.io.File; 031import java.io.FileOutputStream; 032import java.io.IOException; 033import java.io.OutputStreamWriter; 034import java.io.PrintWriter; 035import java.nio.charset.Charset; 036import java.text.DateFormat; 037import java.text.SimpleDateFormat; 038import java.util.Date; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042/** 043 * Write events to a file in the root directory of the driver. 044 */ 045@Unstable 046public final class FileEventStream implements EventStream { 047 private static final Logger LOG = Logger.getLogger(FileEventStream.class.getName()); 048 049 private final DateFormat dateFormat; 050 private final PrintWriter printWriter; 051 private final EStage<Runnable> singleThreadedExecutor; 052 053 @Inject 054 private FileEventStream(@Parameter(Path.class) final String path) { 055 this.dateFormat = new SimpleDateFormat("[yyyy.MM.dd HH:mm:ss.SSSS]"); 056 this.singleThreadedExecutor = new ThreadPoolStage<>(new RunnableExecutingHandler(), 1); 057 058 try { 059 final OutputStreamWriter writer = new OutputStreamWriter( 060 new FileOutputStream(createFileWithPath(path)), Charset.forName("UTF-8")); 061 this.printWriter = new PrintWriter(writer); 062 } catch (final IOException e) { 063 throw new RuntimeException(e); 064 } 065 } 066 067 private File createFileWithPath(final String path) throws IOException { 068 final File file = new File(path); 069 final File parent = file.getParentFile(); 070 if (parent != null && !parent.exists() && !parent.mkdirs()) { 071 LOG.log(Level.WARNING, "Failed to create [{0}]", parent.getAbsolutePath()); 072 } 073 if (!file.exists() && !file.createNewFile()) { 074 LOG.log(Level.WARNING, "Failed to create [{0}]", file.getAbsolutePath()); 075 } 076 return file; 077 } 078 079 @Override 080 public void onEvent(final EventType type, final String jsonEncodedEvent) { 081 final long timestamp = System.currentTimeMillis(); 082 singleThreadedExecutor.onNext(new Runnable() { 083 @Override 084 public void run() { 085 final String eventDescription = new StringBuilder() 086 .append(dateFormat.format(new Date(timestamp))) 087 .append(" [") 088 .append(type) 089 .append("] ") 090 .append(jsonEncodedEvent) 091 .toString(); 092 093 printWriter.println(eventDescription); 094 095 if (type == EventType.RuntimeStop) { 096 onRuntimeStop(); 097 } 098 } 099 }); 100 } 101 102 private void onRuntimeStop() { 103 printWriter.flush(); 104 printWriter.close(); 105 } 106 107 @NamedParameter(doc = "The relative path of the reporting file.", default_value = "watcher_report.txt") 108 public static final class Path implements Name<String> { 109 } 110}