This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.watcher;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.io.watcher.util.RunnableExecutingHandler;
023import org.apache.reef.tang.annotations.Name;
024import org.apache.reef.tang.annotations.NamedParameter;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.wake.EStage;
027import org.apache.reef.wake.impl.ThreadPoolStage;
028
029import javax.inject.Inject;
030import java.io.File;
031import java.io.FileOutputStream;
032import java.io.IOException;
033import java.io.OutputStreamWriter;
034import java.io.PrintWriter;
035import java.nio.charset.Charset;
036import java.text.DateFormat;
037import java.text.SimpleDateFormat;
038import java.util.Date;
039import java.util.logging.Level;
040import java.util.logging.Logger;
041
042/**
043 * Write events to a file in the root directory of the driver.
044 */
045@Unstable
046public final class FileEventStream implements EventStream {
047  private static final Logger LOG = Logger.getLogger(FileEventStream.class.getName());
048
049  private final DateFormat dateFormat;
050  private final PrintWriter printWriter;
051  private final EStage<Runnable> singleThreadedExecutor;
052
053  @Inject
054  private FileEventStream(@Parameter(Path.class) final String path) {
055    this.dateFormat = new SimpleDateFormat("[yyyy.MM.dd HH:mm:ss.SSSS]");
056    this.singleThreadedExecutor = new ThreadPoolStage<>(new RunnableExecutingHandler(), 1);
057
058    try {
059      final OutputStreamWriter writer = new OutputStreamWriter(
060          new FileOutputStream(createFileWithPath(path)), Charset.forName("UTF-8"));
061      this.printWriter = new PrintWriter(writer);
062    } catch (final IOException e) {
063      throw new RuntimeException(e);
064    }
065  }
066
067  private File createFileWithPath(final String path) throws IOException {
068    final File file = new File(path);
069    final File parent = file.getParentFile();
070    if (parent != null && !parent.exists() && !parent.mkdirs()) {
071      LOG.log(Level.WARNING, "Failed to create [{0}]", parent.getAbsolutePath());
072    }
073    if (!file.exists() && !file.createNewFile()) {
074      LOG.log(Level.WARNING, "Failed to create [{0}]", file.getAbsolutePath());
075    }
076    return file;
077  }
078
079  @Override
080  public void onEvent(final EventType type, final String jsonEncodedEvent) {
081    final long timestamp = System.currentTimeMillis();
082    singleThreadedExecutor.onNext(new Runnable() {
083      @Override
084      public void run() {
085        final String eventDescription = new StringBuilder()
086            .append(dateFormat.format(new Date(timestamp)))
087            .append(" [")
088            .append(type)
089            .append("] ")
090            .append(jsonEncodedEvent)
091            .toString();
092
093        printWriter.println(eventDescription);
094
095        if (type == EventType.RuntimeStop) {
096          onRuntimeStop();
097        }
098      }
099    });
100  }
101
102  private void onRuntimeStop() {
103    printWriter.flush();
104    printWriter.close();
105  }
106
107  @NamedParameter(doc = "The relative path of the reporting file.", default_value = "watcher_report.txt")
108  public static final class Path implements Name<String> {
109  }
110}