001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver.restart;
020
021import org.apache.hadoop.fs.FSDataOutputStream;
022import org.apache.hadoop.fs.FileStatus;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.io.IOUtils;
026import org.apache.reef.annotations.audience.Private;
027import org.apache.reef.util.CloseableIterable;
028
029import java.io.*;
030import java.nio.charset.StandardCharsets;
031
032/**
033 * The DFS evaluator logger that does not support append and does append by overwrite.
034 * dfs.support.append should be false.
035 */
036@Private
037public final class DFSEvaluatorLogOverwriteReaderWriter implements DFSEvaluatorLogReaderWriter {
038
039  private final FileSystem fileSystem;
040
041  private final DFSLineReader reader;
042  private final Path changeLogPath;
043  private final Path changeLogAltPath;
044
045  // This is the last path we will be writing to.
046  private Path pathToWriteTo = null;
047
048  private boolean fsClosed = false;
049
050  DFSEvaluatorLogOverwriteReaderWriter(final FileSystem fileSystem, final Path changeLogPath) {
051    this.fileSystem = fileSystem;
052    this.changeLogPath = changeLogPath;
053    this.changeLogAltPath = new Path(changeLogPath + ".alt");
054    this.reader = new DFSLineReader(fileSystem);
055  }
056
057  /**
058   * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log.
059   * The log is appended to by reading first, adding on the information, and then overwriting the entire log.
060   * Since the {@link FileSystem} does not support appends, this {@link DFSEvaluatorLogReaderWriter}
061   * uses a two-file approach, where when we write, we always overwrite the older file.
062   * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information).
063   * @throws IOException when file cannot be written.
064   */
065  @Override
066  public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException {
067    final Path writePath = getWritePath();
068
069    // readPath is always not the writePath.
070    final Path readPath = getAlternativePath(writePath);
071
072    try (final FSDataOutputStream outputStream = this.fileSystem.create(writePath, true)) {
073      InputStream inputStream = null;
074      try {
075        final InputStream newEntryInputStream = new ByteArrayInputStream(
076            formattedEntry.getBytes(StandardCharsets.UTF_8));
077
078        if (fileSystem.exists(readPath)) {
079          inputStream = new SequenceInputStream(
080              this.fileSystem.open(readPath), newEntryInputStream);
081        } else {
082          inputStream = newEntryInputStream;
083        }
084
085        IOUtils.copyBytes(inputStream, outputStream, 4096, false);
086      } finally {
087        outputStream.hsync();
088        if (inputStream != null) {
089          inputStream.close();
090        }
091      }
092    }
093  }
094
095  /**
096   * Since the {@link FileSystem} does not support appends, this {@link DFSEvaluatorLogReaderWriter}
097   * uses a two-file approach, where when we read, we always read from the newer file.
098   */
099  @Override
100  public synchronized CloseableIterable<String> readFromEvaluatorLog() throws IOException {
101    return reader.readLinesFromFile(getLongerFile());
102  }
103
104  /**
105   * Gets the alternative path. Returns one of changeLogPath and changeLogAltPath.
106   */
107  private synchronized Path getAlternativePath(final Path path) {
108    if (path.equals(changeLogPath)) {
109      return changeLogAltPath;
110    }
111
112    return changeLogPath;
113  }
114
115  /**
116   * Gets the path to write to.
117   */
118  private synchronized Path getWritePath() throws IOException {
119    if (pathToWriteTo == null) {
120      // If we have not yet written before, check existence of files.
121      final boolean originalExists = fileSystem.exists(changeLogPath);
122      final boolean altExists = fileSystem.exists(changeLogAltPath);
123
124      if (originalExists && altExists) {
125        final FileStatus originalStatus = fileSystem.getFileStatus(changeLogPath);
126        final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath);
127
128        // Return the shorter file.
129        // TODO[JIRA REEF-1413]: This approach will not be able to work in REEF-1413.
130        // TODO[JIRA REEF-1413]: Note that we cannot use last modified time because Azure blob's HDFS API only
131        // TODO[JIRA REEF-1413]: supports time resolution up to a second.
132        final long originalLen = originalStatus.getLen();
133        final long altLen = altStatus.getLen();
134
135        if (originalLen < altLen) {
136          pathToWriteTo = changeLogPath;
137        } else {
138          pathToWriteTo = changeLogAltPath;
139        }
140      } else if (originalExists) {
141        // Return the file that does not exist.
142        pathToWriteTo = changeLogAltPath;
143      } else {
144        pathToWriteTo = changeLogPath;
145      }
146    }
147
148    final Path returnPath = pathToWriteTo;
149    pathToWriteTo = getAlternativePath(pathToWriteTo);
150
151    return returnPath;
152  }
153
154  private synchronized Path getLongerFile() throws IOException {
155    final boolean originalExists = fileSystem.exists(changeLogPath);
156    final boolean altExists = fileSystem.exists(changeLogAltPath);
157
158    // If both files exist, return the newest file path.
159    if (originalExists && altExists) {
160      final FileStatus originalStatus = fileSystem.getFileStatus(changeLogPath);
161      final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath);
162
163      final long originalLastModTime = originalStatus.getLen();
164      final long altLastModTime = altStatus.getLen();
165
166      // Return the newer file.
167      if (originalLastModTime >= altLastModTime) {
168        return changeLogPath;
169      }
170
171      return changeLogAltPath;
172    } else if (altExists) {
173      // If only the alt file exists, return the alt file path.
174      return changeLogAltPath;
175    }
176
177    // If only the original file exists or if neither exist, return the original file path.
178    return changeLogPath;
179  }
180
181  /**
182   * Closes the FileSystem.
183   * @throws Exception
184   */
185  @Override
186  public synchronized void close() throws Exception {
187    if (this.fileSystem != null && !this.fsClosed) {
188      this.fileSystem.close();
189      this.fsClosed = true;
190    }
191  }
192}