001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver.restart; 020 021import org.apache.hadoop.fs.FSDataOutputStream; 022import org.apache.hadoop.fs.FileStatus; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.io.IOUtils; 026import org.apache.reef.annotations.audience.Private; 027import org.apache.reef.util.CloseableIterable; 028 029import java.io.*; 030import java.nio.charset.StandardCharsets; 031 032/** 033 * The DFS evaluator logger that does not support append and does append by overwrite. 034 * dfs.support.append should be false. 035 */ 036@Private 037public final class DFSEvaluatorLogOverwriteReaderWriter implements DFSEvaluatorLogReaderWriter { 038 039 private final FileSystem fileSystem; 040 041 private final DFSLineReader reader; 042 private final Path changeLogPath; 043 private final Path changeLogAltPath; 044 045 // This is the last path we will be writing to. 046 private Path pathToWriteTo = null; 047 048 private boolean fsClosed = false; 049 050 DFSEvaluatorLogOverwriteReaderWriter(final FileSystem fileSystem, final Path changeLogPath) { 051 this.fileSystem = fileSystem; 052 this.changeLogPath = changeLogPath; 053 this.changeLogAltPath = new Path(changeLogPath + ".alt"); 054 this.reader = new DFSLineReader(fileSystem); 055 } 056 057 /** 058 * Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log. 059 * The log is appended to by reading first, adding on the information, and then overwriting the entire log. 060 * Since the {@link FileSystem} does not support appends, this {@link DFSEvaluatorLogReaderWriter} 061 * uses a two-file approach, where when we write, we always overwrite the older file. 062 * @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information). 063 * @throws IOException when file cannot be written. 064 */ 065 @Override 066 public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException { 067 final Path writePath = getWritePath(); 068 069 // readPath is always not the writePath. 070 final Path readPath = getAlternativePath(writePath); 071 072 try (final FSDataOutputStream outputStream = this.fileSystem.create(writePath, true)) { 073 InputStream inputStream = null; 074 try { 075 final InputStream newEntryInputStream = new ByteArrayInputStream( 076 formattedEntry.getBytes(StandardCharsets.UTF_8)); 077 078 if (fileSystem.exists(readPath)) { 079 inputStream = new SequenceInputStream( 080 this.fileSystem.open(readPath), newEntryInputStream); 081 } else { 082 inputStream = newEntryInputStream; 083 } 084 085 IOUtils.copyBytes(inputStream, outputStream, 4096, false); 086 } finally { 087 outputStream.hsync(); 088 if (inputStream != null) { 089 inputStream.close(); 090 } 091 } 092 } 093 } 094 095 /** 096 * Since the {@link FileSystem} does not support appends, this {@link DFSEvaluatorLogReaderWriter} 097 * uses a two-file approach, where when we read, we always read from the newer file. 098 */ 099 @Override 100 public synchronized CloseableIterable<String> readFromEvaluatorLog() throws IOException { 101 return reader.readLinesFromFile(getLongerFile()); 102 } 103 104 /** 105 * Gets the alternative path. Returns one of changeLogPath and changeLogAltPath. 106 */ 107 private synchronized Path getAlternativePath(final Path path) { 108 if (path.equals(changeLogPath)) { 109 return changeLogAltPath; 110 } 111 112 return changeLogPath; 113 } 114 115 /** 116 * Gets the path to write to. 117 */ 118 private synchronized Path getWritePath() throws IOException { 119 if (pathToWriteTo == null) { 120 // If we have not yet written before, check existence of files. 121 final boolean originalExists = fileSystem.exists(changeLogPath); 122 final boolean altExists = fileSystem.exists(changeLogAltPath); 123 124 if (originalExists && altExists) { 125 final FileStatus originalStatus = fileSystem.getFileStatus(changeLogPath); 126 final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath); 127 128 // Return the shorter file. 129 // TODO[JIRA REEF-1413]: This approach will not be able to work in REEF-1413. 130 // TODO[JIRA REEF-1413]: Note that we cannot use last modified time because Azure blob's HDFS API only 131 // TODO[JIRA REEF-1413]: supports time resolution up to a second. 132 final long originalLen = originalStatus.getLen(); 133 final long altLen = altStatus.getLen(); 134 135 if (originalLen < altLen) { 136 pathToWriteTo = changeLogPath; 137 } else { 138 pathToWriteTo = changeLogAltPath; 139 } 140 } else if (originalExists) { 141 // Return the file that does not exist. 142 pathToWriteTo = changeLogAltPath; 143 } else { 144 pathToWriteTo = changeLogPath; 145 } 146 } 147 148 final Path returnPath = pathToWriteTo; 149 pathToWriteTo = getAlternativePath(pathToWriteTo); 150 151 return returnPath; 152 } 153 154 private synchronized Path getLongerFile() throws IOException { 155 final boolean originalExists = fileSystem.exists(changeLogPath); 156 final boolean altExists = fileSystem.exists(changeLogAltPath); 157 158 // If both files exist, return the newest file path. 159 if (originalExists && altExists) { 160 final FileStatus originalStatus = fileSystem.getFileStatus(changeLogPath); 161 final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath); 162 163 final long originalLastModTime = originalStatus.getLen(); 164 final long altLastModTime = altStatus.getLen(); 165 166 // Return the newer file. 167 if (originalLastModTime >= altLastModTime) { 168 return changeLogPath; 169 } 170 171 return changeLogAltPath; 172 } else if (altExists) { 173 // If only the alt file exists, return the alt file path. 174 return changeLogAltPath; 175 } 176 177 // If only the original file exists or if neither exist, return the original file path. 178 return changeLogPath; 179 } 180 181 /** 182 * Closes the FileSystem. 183 * @throws Exception 184 */ 185 @Override 186 public synchronized void close() throws Exception { 187 if (this.fileSystem != null && !this.fsClosed) { 188 this.fileSystem.close(); 189 this.fsClosed = true; 190 } 191 } 192}