001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver.restart; 020 021import org.apache.hadoop.fs.FileSystem; 022import org.apache.hadoop.fs.Path; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.util.CloseableIterable; 025 026import java.io.BufferedWriter; 027import java.io.IOException; 028import java.io.OutputStreamWriter; 029import java.nio.charset.StandardCharsets; 030 031/** 032 * The DFS evaluator logger that performs regular append. dfs.support.append should be true. 033 */ 034@Private 035public final class DFSEvaluatorLogAppendReaderWriter implements DFSEvaluatorLogReaderWriter { 036 037 private final FileSystem fileSystem; 038 private final Path changelogPath; 039 private final DFSLineReader reader; 040 041 private boolean fsClosed = false; 042 043 DFSEvaluatorLogAppendReaderWriter(final FileSystem fileSystem, final Path changelogPath) { 044 this.fileSystem = fileSystem; 045 this.changelogPath = changelogPath; 046 this.reader = new DFSLineReader(fileSystem); 047 } 048 049 /** 050 * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log. 051 * The entry is appended regularly by an FS that supports append. 052 * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information). 053 * @throws IOException 054 */ 055 @Override 056 public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException { 057 final boolean fileCreated = this.fileSystem.exists(this.changelogPath); 058 059 try ( 060 final BufferedWriter bw = fileCreated ? 061 new BufferedWriter(new OutputStreamWriter( 062 this.fileSystem.append(this.changelogPath), StandardCharsets.UTF_8)) : 063 new BufferedWriter(new OutputStreamWriter( 064 this.fileSystem.create(this.changelogPath), StandardCharsets.UTF_8)) 065 ) { 066 bw.write(formattedEntry); 067 } 068 } 069 070 @Override 071 public CloseableIterable<String> readFromEvaluatorLog() throws IOException { 072 return reader.readLinesFromFile(changelogPath); 073 } 074 075 /** 076 * Closes the FileSystem. 077 * @throws Exception 078 */ 079 @Override 080 public synchronized void close() throws Exception { 081 if (this.fileSystem != null && !this.fsClosed) { 082 this.fileSystem.close(); 083 this.fsClosed = true; 084 } 085 } 086}