001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.checkpoint.fs; 020 021import org.apache.hadoop.fs.FSDataInputStream; 022import org.apache.hadoop.fs.FSDataOutputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.reef.io.checkpoint.CheckpointID; 026import org.apache.reef.io.checkpoint.CheckpointNamingService; 027import org.apache.reef.io.checkpoint.CheckpointService; 028import org.apache.reef.tang.annotations.Name; 029import org.apache.reef.tang.annotations.NamedParameter; 030import org.apache.reef.tang.annotations.Parameter; 031 032import javax.inject.Inject; 033import java.io.FileNotFoundException; 034import java.io.IOException; 035import java.nio.ByteBuffer; 036import java.nio.channels.Channels; 037import java.nio.channels.ReadableByteChannel; 038import java.nio.channels.WritableByteChannel; 039 040/** 041 * A FileSystem based CheckpointService. 042 * 043 * Note that this implementation creates a temporary file first and moves it to final destination at commit time. 044 */ 045public class FSCheckpointService implements CheckpointService { 046 047 private final Path basePath; 048 private final FileSystem fs; 049 private final CheckpointNamingService namingPolicy; 050 private final short replication; 051 052 @Inject 053 FSCheckpointService(final FileSystem fs, 054 @Parameter(PATH.class) final String basePath, 055 final CheckpointNamingService namingPolicy, 056 @Parameter(ReplicationFactor.class) final short replication) { 057 this.fs = fs; 058 this.basePath = new Path(basePath); 059 this.namingPolicy = namingPolicy; 060 this.replication = replication; 061 } 062 063 public FSCheckpointService(final FileSystem fs, 064 final Path base, 065 final CheckpointNamingService namingPolicy, 066 final short replication) { 067 this.fs = fs; 068 this.basePath = base; 069 this.namingPolicy = namingPolicy; 070 this.replication = replication; 071 } 072 073 static final Path tmpfile(final Path p) { 074 return new Path(p.getParent(), p.getName() + ".tmp"); 075 } 076 077 public CheckpointWriteChannel create() 078 throws IOException { 079 080 final String name = namingPolicy.getNewName(); 081 final Path p = new Path(name); 082 if (p.isUriPathAbsolute()) { 083 throw new IOException("Checkpoint name cannot be an absolute path."); 084 } 085 086 return createInternal(new Path(basePath, p)); 087 } 088 089 090 CheckpointWriteChannel createInternal(final Path name) throws IOException { 091 092 /* Create a temp file, fail if file exists. 093 The likely reason to do so (I am not the original author) is to check that the file is indeed writable. 094 Checking this directly via a file system call may lead to a time-of-check/time-of-use race condition. 095 See the pull request for REEF-1659 for discussion. 096 */ 097 return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication)); 098 } 099 100 @Override 101 public CheckpointReadChannel open(final CheckpointID id) 102 throws IOException, InterruptedException { 103 104 if (!(id instanceof FSCheckpointID)) { 105 throw new IllegalArgumentException( 106 "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass()); 107 } 108 109 return new FSCheckpointReadChannel( 110 fs.open(((FSCheckpointID) id).getPath())); 111 } 112 113 @Override 114 public CheckpointID commit(final CheckpointWriteChannel ch) 115 throws IOException, InterruptedException { 116 117 if (ch.isOpen()) { 118 ch.close(); 119 } 120 121 final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch; 122 final Path dst = hch.getDestination(); 123 124 if (!fs.rename(tmpfile(dst), dst)) { 125 // attempt to clean up 126 abort(ch); 127 throw new IOException("Failed to promote checkpoint" + 128 tmpfile(dst) + " -> " + dst); 129 } 130 131 return new FSCheckpointID(hch.getDestination()); 132 } 133 134 @Override 135 public void abort(final CheckpointWriteChannel ch) throws IOException { 136 137 if (ch.isOpen()) { 138 ch.close(); 139 } 140 141 final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch; 142 final Path tmp = tmpfile(hch.getDestination()); 143 144 try { 145 if (!fs.delete(tmp, false)) { 146 throw new IOException("Failed to delete a temporary checkpoint file during abort. Path: " + tmp); 147 } 148 } catch (final FileNotFoundException ignored) { 149 // IGNORE 150 } 151 152 } 153 154 @Override 155 public boolean delete(final CheckpointID id) 156 throws IOException, InterruptedException { 157 158 if (!(id instanceof FSCheckpointID)) { 159 throw new IllegalArgumentException( 160 "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass()); 161 } 162 163 final Path tmp = ((FSCheckpointID) id).getPath(); 164 try { 165 return fs.delete(tmp, false); 166 } catch (final FileNotFoundException ignored) { 167 // IGNORE 168 } 169 170 return true; 171 } 172 173 @NamedParameter(doc = "The path to be used to store the checkpoints.") 174 static class PATH implements Name<String> { 175 } 176 177 @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3") 178 static class ReplicationFactor implements Name<Short> { 179 } 180 181 private static class FSCheckpointWriteChannel 182 implements CheckpointWriteChannel { 183 184 private final Path finalDst; 185 private final WritableByteChannel out; 186 private boolean isOpen = true; 187 188 FSCheckpointWriteChannel(final Path finalDst, final FSDataOutputStream out) { 189 this.finalDst = finalDst; 190 this.out = Channels.newChannel(out); 191 } 192 193 public int write(final ByteBuffer b) throws IOException { 194 return out.write(b); 195 } 196 197 public Path getDestination() { 198 return finalDst; 199 } 200 201 @Override 202 public void close() throws IOException { 203 isOpen = false; 204 out.close(); 205 } 206 207 @Override 208 public boolean isOpen() { 209 return isOpen; 210 } 211 212 } 213 214 private static class FSCheckpointReadChannel 215 implements CheckpointReadChannel { 216 217 private final ReadableByteChannel in; 218 private boolean isOpen = true; 219 220 FSCheckpointReadChannel(final FSDataInputStream in) { 221 this.in = Channels.newChannel(in); 222 } 223 224 @Override 225 public int read(final ByteBuffer bb) throws IOException { 226 return in.read(bb); 227 } 228 229 @Override 230 public void close() throws IOException { 231 isOpen = false; 232 in.close(); 233 } 234 235 @Override 236 public boolean isOpen() { 237 return isOpen; 238 } 239 240 } 241 242}