This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver.restart;
020
021import org.apache.hadoop.fs.FileSystem;
022import org.apache.hadoop.fs.Path;
023import org.apache.reef.annotations.audience.Private;
024
025import java.io.BufferedWriter;
026import java.io.IOException;
027import java.io.OutputStreamWriter;
028import java.nio.charset.StandardCharsets;
029
030/**
031 * The DFS evaluator logger that performs regular append. dfs.support.append should be true.
032 */
033@Private
034public final class DFSEvaluatorLogAppendWriter implements DFSEvaluatorLogWriter {
035
036  private final FileSystem fileSystem;
037
038  private final Path changelogPath;
039
040  private boolean fsClosed = false;
041
042  DFSEvaluatorLogAppendWriter(final FileSystem fileSystem, final Path changelogPath) {
043    this.fileSystem = fileSystem;
044    this.changelogPath = changelogPath;
045  }
046
047  /**
048   * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log.
049   * The entry is appended regularly by an FS that supports append.
050   * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information).
051   * @throws IOException
052   */
053  @Override
054  public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException {
055    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
056
057    try (
058        final BufferedWriter bw = fileCreated ?
059            new BufferedWriter(new OutputStreamWriter(
060                this.fileSystem.append(this.changelogPath), StandardCharsets.UTF_8)) :
061            new BufferedWriter(new OutputStreamWriter(
062                this.fileSystem.create(this.changelogPath), StandardCharsets.UTF_8))
063    ) {
064      bw.write(formattedEntry);
065    }
066  }
067
068  /**
069   * Closes the FileSystem.
070   * @throws Exception
071   */
072  @Override
073  public synchronized void close() throws Exception {
074    if (this.fileSystem != null && !this.fsClosed) {
075      this.fileSystem.close();
076      this.fsClosed = true;
077    }
078  }
079}