001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver.restart; 020 021import org.apache.hadoop.fs.FileSystem; 022import org.apache.hadoop.fs.Path; 023import org.apache.reef.annotations.audience.Private; 024 025import java.io.BufferedWriter; 026import java.io.IOException; 027import java.io.OutputStreamWriter; 028import java.nio.charset.StandardCharsets; 029 030/** 031 * The DFS evaluator logger that performs regular append. dfs.support.append should be true. 032 */ 033@Private 034public final class DFSEvaluatorLogAppendWriter implements DFSEvaluatorLogWriter { 035 036 private final FileSystem fileSystem; 037 038 private final Path changelogPath; 039 040 private boolean fsClosed = false; 041 042 DFSEvaluatorLogAppendWriter(final FileSystem fileSystem, final Path changelogPath) { 043 this.fileSystem = fileSystem; 044 this.changelogPath = changelogPath; 045 } 046 047 /** 048 * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log. 049 * The entry is appended regularly by an FS that supports append. 050 * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information). 051 * @throws IOException 052 */ 053 @Override 054 public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException { 055 final boolean fileCreated = this.fileSystem.exists(this.changelogPath); 056 057 try ( 058 final BufferedWriter bw = fileCreated ? 059 new BufferedWriter(new OutputStreamWriter( 060 this.fileSystem.append(this.changelogPath), StandardCharsets.UTF_8)) : 061 new BufferedWriter(new OutputStreamWriter( 062 this.fileSystem.create(this.changelogPath), StandardCharsets.UTF_8)) 063 ) { 064 bw.write(formattedEntry); 065 } 066 } 067 068 /** 069 * Closes the FileSystem. 070 * @throws Exception 071 */ 072 @Override 073 public synchronized void close() throws Exception { 074 if (this.fileSystem != null && !this.fsClosed) { 075 this.fileSystem.close(); 076 this.fsClosed = true; 077 } 078 } 079}