001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.checkpoint.fs; 020 021import org.apache.hadoop.fs.FSDataInputStream; 022import org.apache.hadoop.fs.FSDataOutputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.reef.io.checkpoint.CheckpointID; 026import org.apache.reef.io.checkpoint.CheckpointNamingService; 027import org.apache.reef.io.checkpoint.CheckpointService; 028import org.apache.reef.tang.annotations.Name; 029import org.apache.reef.tang.annotations.NamedParameter; 030import org.apache.reef.tang.annotations.Parameter; 031 032import javax.inject.Inject; 033import java.io.FileNotFoundException; 034import java.io.IOException; 035import java.nio.ByteBuffer; 036import java.nio.channels.Channels; 037import java.nio.channels.ReadableByteChannel; 038import java.nio.channels.WritableByteChannel; 039 040/** 041 * A FileSystem based CheckpointService. 042 */ 043public class FSCheckpointService implements CheckpointService { 044 045 private final Path base; 046 private final FileSystem fs; 047 private final CheckpointNamingService namingPolicy; 048 private final short replication; 049 050 @Inject 051 FSCheckpointService(final FileSystem fs, 052 @Parameter(PATH.class) final String basePath, 053 final CheckpointNamingService namingPolicy, 054 @Parameter(ReplicationFactor.class) final short replication) { 055 this.fs = fs; 056 this.base = new Path(basePath); 057 this.namingPolicy = namingPolicy; 058 this.replication = replication; 059 } 060 061 public FSCheckpointService(final FileSystem fs, 062 final Path base, 063 final CheckpointNamingService namingPolicy, 064 final short replication) { 065 this.fs = fs; 066 this.base = base; 067 this.namingPolicy = namingPolicy; 068 this.replication = replication; 069 } 070 071 static final Path tmpfile(final Path p) { 072 return new Path(p.getParent(), p.getName() + ".tmp"); 073 } 074 075 public CheckpointWriteChannel create() 076 throws IOException { 077 078 final String name = namingPolicy.getNewName(); 079 080 final Path p = new Path(name); 081 if (p.isUriPathAbsolute()) { 082 throw new IOException("Checkpoint cannot be an absolute path"); 083 } 084 return createInternal(new Path(base, p)); 085 } 086 087 CheckpointWriteChannel createInternal(final Path name) throws IOException { 088 089 //create a temp file, fail if file exists 090 return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication)); 091 } 092 093 @Override 094 public CheckpointReadChannel open(final CheckpointID id) 095 throws IOException, InterruptedException { 096 if (!(id instanceof FSCheckpointID)) { 097 throw new IllegalArgumentException( 098 "Mismatched checkpoint type: " + id.getClass()); 099 } 100 return new FSCheckpointReadChannel( 101 fs.open(((FSCheckpointID) id).getPath())); 102 } 103 104 @Override 105 public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException, 106 InterruptedException { 107 if (ch.isOpen()) { 108 ch.close(); 109 } 110 final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch; 111 final Path dst = hch.getDestination(); 112 if (!fs.rename(tmpfile(dst), dst)) { 113 // attempt to clean up 114 abort(ch); 115 throw new IOException("Failed to promote checkpoint" + 116 tmpfile(dst) + " -> " + dst); 117 } 118 return new FSCheckpointID(hch.getDestination()); 119 } 120 121 @Override 122 public void abort(final CheckpointWriteChannel ch) throws IOException { 123 if (ch.isOpen()) { 124 ch.close(); 125 } 126 final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch; 127 final Path tmp = tmpfile(hch.getDestination()); 128 try { 129 if (!fs.delete(tmp, false)) { 130 throw new IOException("Failed to delete checkpoint during abort"); 131 } 132 } catch (final FileNotFoundException ignored) { 133 // IGNORE 134 } 135 } 136 137 @Override 138 public boolean delete(final CheckpointID id) throws IOException, 139 InterruptedException { 140 if (!(id instanceof FSCheckpointID)) { 141 throw new IllegalArgumentException( 142 "Mismatched checkpoint type: " + id.getClass()); 143 } 144 final Path tmp = ((FSCheckpointID) id).getPath(); 145 try { 146 return fs.delete(tmp, false); 147 } catch (final FileNotFoundException ignored) { 148 // IGNORE 149 } 150 return true; 151 } 152 153 @NamedParameter(doc = "The path to be used to store the checkpoints.") 154 static class PATH implements Name<String> { 155 } 156 157 @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3") 158 static class ReplicationFactor implements Name<Short> { 159 } 160 161 private static class FSCheckpointWriteChannel 162 implements CheckpointWriteChannel { 163 private final Path finalDst; 164 private final WritableByteChannel out; 165 private boolean isOpen = true; 166 167 FSCheckpointWriteChannel(final Path finalDst, final FSDataOutputStream out) { 168 this.finalDst = finalDst; 169 this.out = Channels.newChannel(out); 170 } 171 172 public int write(final ByteBuffer b) throws IOException { 173 return out.write(b); 174 } 175 176 public Path getDestination() { 177 return finalDst; 178 } 179 180 @Override 181 public void close() throws IOException { 182 isOpen = false; 183 out.close(); 184 } 185 186 @Override 187 public boolean isOpen() { 188 return isOpen; 189 } 190 191 } 192 193 private static class FSCheckpointReadChannel 194 implements CheckpointReadChannel { 195 196 private final ReadableByteChannel in; 197 private boolean isOpen = true; 198 199 FSCheckpointReadChannel(final FSDataInputStream in) { 200 this.in = Channels.newChannel(in); 201 } 202 203 @Override 204 public int read(final ByteBuffer bb) throws IOException { 205 return in.read(bb); 206 } 207 208 @Override 209 public void close() throws IOException { 210 isOpen = false; 211 in.close(); 212 } 213 214 @Override 215 public boolean isOpen() { 216 return isOpen; 217 } 218 219 } 220 221}