001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver.restart; 020 021import org.apache.hadoop.fs.FSDataOutputStream; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.io.IOUtils; 025import org.apache.reef.annotations.audience.Private; 026 027import java.io.*; 028import java.nio.charset.StandardCharsets; 029 030/** 031 * The DFS evaluator logger that does not support append and does append by overwrite. 032 * dfs.support.append should be false. 033 */ 034@Private 035public final class DFSEvaluatorLogOverwriteWriter implements DFSEvaluatorLogWriter { 036 037 private final FileSystem fileSystem; 038 039 private final Path changelogPath; 040 041 private boolean fsClosed = false; 042 043 DFSEvaluatorLogOverwriteWriter(final FileSystem fileSystem, final Path changelogPath) { 044 this.fileSystem = fileSystem; 045 this.changelogPath = changelogPath; 046 } 047 048 /** 049 * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log. 050 * The log is appended to by reading first, adding on the information, and then overwriting the entire 051 * log. 052 * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information). 053 * @throws IOException when file cannot be written. 054 */ 055 @Override 056 public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException { 057 final boolean fileCreated = this.fileSystem.exists(this.changelogPath); 058 059 if (!fileCreated) { 060 try (final BufferedWriter bw = new BufferedWriter( 061 new OutputStreamWriter(this.fileSystem.create(this.changelogPath), StandardCharsets.UTF_8))) { 062 bw.write(formattedEntry); 063 } 064 } else { 065 this.appendByDeleteAndCreate(formattedEntry); 066 } 067 } 068 069 /** 070 * For certain HDFS implementation, the append operation may not be supported (e.g., Azure blob - wasb) 071 * in this case, we will emulate the append operation by reading the content, appending entry at the end, 072 * then recreating the file with appended content. 073 * 074 * @throws java.io.IOException when the file can't be written. 075 */ 076 private void appendByDeleteAndCreate(final String appendEntry) 077 throws IOException { 078 final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 079 080 try (final InputStream inputStream = this.fileSystem.open(this.changelogPath)) { 081 IOUtils.copyBytes(inputStream, outputStream, 4096, true); 082 } 083 084 final String newContent = outputStream.toString("UTF-8") + appendEntry; 085 this.fileSystem.delete(this.changelogPath, true); 086 087 try (final FSDataOutputStream newOutput = this.fileSystem.create(this.changelogPath); 088 final InputStream newInput = new ByteArrayInputStream(newContent.getBytes(StandardCharsets.UTF_8))) { 089 IOUtils.copyBytes(newInput, newOutput, 4096, true); 090 } 091 } 092 093 /** 094 * Closes the FileSystem. 095 * @throws Exception 096 */ 097 @Override 098 public synchronized void close() throws Exception { 099 if (this.fileSystem != null && !this.fsClosed) { 100 this.fileSystem.close(); 101 this.fsClosed = true; 102 } 103 } 104}