001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver.restart;
020
021import org.apache.hadoop.fs.FileSystem;
022import org.apache.hadoop.fs.Path;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.util.CloseableIterable;
025
026import java.io.BufferedWriter;
027import java.io.IOException;
028import java.io.OutputStreamWriter;
029import java.nio.charset.StandardCharsets;
030
031/**
032 * The DFS evaluator logger that performs regular append. dfs.support.append should be true.
033 */
034@Private
035public final class DFSEvaluatorLogAppendReaderWriter implements DFSEvaluatorLogReaderWriter {
036
037  private final FileSystem fileSystem;
038  private final Path changelogPath;
039  private final DFSLineReader reader;
040
041  private boolean fsClosed = false;
042
043  DFSEvaluatorLogAppendReaderWriter(final FileSystem fileSystem, final Path changelogPath) {
044    this.fileSystem = fileSystem;
045    this.changelogPath = changelogPath;
046    this.reader = new DFSLineReader(fileSystem);
047  }
048
049  /**
050   * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log.
051   * The entry is appended regularly by an FS that supports append.
052   * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information).
053   * @throws IOException
054   */
055  @Override
056  public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException {
057    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
058
059    try (
060        final BufferedWriter bw = fileCreated ?
061            new BufferedWriter(new OutputStreamWriter(
062                this.fileSystem.append(this.changelogPath), StandardCharsets.UTF_8)) :
063            new BufferedWriter(new OutputStreamWriter(
064                this.fileSystem.create(this.changelogPath), StandardCharsets.UTF_8))
065    ) {
066      bw.write(formattedEntry);
067    }
068  }
069
070  @Override
071  public CloseableIterable<String> readFromEvaluatorLog() throws IOException {
072    return reader.readLinesFromFile(changelogPath);
073  }
074
075  /**
076   * Closes the FileSystem.
077   * @throws Exception
078   */
079  @Override
080  public synchronized void close() throws Exception {
081    if (this.fileSystem != null && !this.fsClosed) {
082      this.fileSystem.close();
083      this.fsClosed = true;
084    }
085  }
086}