This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver.restart;
020
021import org.apache.hadoop.fs.FSDataOutputStream;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.io.IOUtils;
025import org.apache.reef.annotations.audience.Private;
026
027import java.io.*;
028import java.nio.charset.StandardCharsets;
029
030/**
031 * The DFS evaluator logger that does not support append and does append by overwrite.
032 * dfs.support.append should be false.
033 */
034@Private
035public final class DFSEvaluatorLogOverwriteWriter implements DFSEvaluatorLogWriter {
036
037  private final FileSystem fileSystem;
038
039  private final Path changelogPath;
040
041  private boolean fsClosed = false;
042
043  DFSEvaluatorLogOverwriteWriter(final FileSystem fileSystem, final Path changelogPath) {
044    this.fileSystem = fileSystem;
045    this.changelogPath = changelogPath;
046  }
047
048  /**
049   * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log.
050   * The log is appended to by reading first, adding on the information, and then overwriting the entire
051   * log.
052   * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information).
053   * @throws IOException when file cannot be written.
054   */
055  @Override
056  public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException {
057    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
058
059    if (!fileCreated) {
060      try (final BufferedWriter bw = new BufferedWriter(
061              new OutputStreamWriter(this.fileSystem.create(this.changelogPath), StandardCharsets.UTF_8))) {
062        bw.write(formattedEntry);
063      }
064    } else {
065      this.appendByDeleteAndCreate(formattedEntry);
066    }
067  }
068
069  /**
070   * For certain HDFS implementation, the append operation may not be supported (e.g., Azure blob - wasb)
071   * in this case, we will emulate the append operation by reading the content, appending entry at the end,
072   * then recreating the file with appended content.
073   *
074   * @throws java.io.IOException when the file can't be written.
075   */
076  private void appendByDeleteAndCreate(final String appendEntry)
077      throws IOException {
078    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
079
080    try (final InputStream inputStream = this.fileSystem.open(this.changelogPath)) {
081      IOUtils.copyBytes(inputStream, outputStream, 4096, true);
082    }
083
084    final String newContent = outputStream.toString("UTF-8") + appendEntry;
085    this.fileSystem.delete(this.changelogPath, true);
086
087    try (final FSDataOutputStream newOutput = this.fileSystem.create(this.changelogPath);
088         final InputStream newInput = new ByteArrayInputStream(newContent.getBytes(StandardCharsets.UTF_8))) {
089      IOUtils.copyBytes(newInput, newOutput, 4096, true);
090    }
091  }
092
093  /**
094   * Closes the FileSystem.
095   * @throws Exception
096   */
097  @Override
098  public synchronized void close() throws Exception {
099    if (this.fileSystem != null && !this.fsClosed) {
100      this.fileSystem.close();
101      this.fsClosed = true;
102    }
103  }
104}