This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.checkpoint.fs;
020
021import org.apache.hadoop.fs.FSDataInputStream;
022import org.apache.hadoop.fs.FSDataOutputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.reef.io.checkpoint.CheckpointID;
026import org.apache.reef.io.checkpoint.CheckpointNamingService;
027import org.apache.reef.io.checkpoint.CheckpointService;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.NamedParameter;
030import org.apache.reef.tang.annotations.Parameter;
031
032import javax.inject.Inject;
033import java.io.FileNotFoundException;
034import java.io.IOException;
035import java.nio.ByteBuffer;
036import java.nio.channels.Channels;
037import java.nio.channels.ReadableByteChannel;
038import java.nio.channels.WritableByteChannel;
039
040/**
041 * A FileSystem based CheckpointService.
042 *
043 * Note that this implementation creates a temporary file first and moves it to final destination at commit time.
044 */
045public class FSCheckpointService implements CheckpointService {
046
047  private final Path basePath;
048  private final FileSystem fs;
049  private final CheckpointNamingService namingPolicy;
050  private final short replication;
051
052  @Inject
053  FSCheckpointService(final FileSystem fs,
054                      @Parameter(PATH.class) final String basePath,
055                      final CheckpointNamingService namingPolicy,
056                      @Parameter(ReplicationFactor.class) final short replication) {
057    this.fs = fs;
058    this.basePath = new Path(basePath);
059    this.namingPolicy = namingPolicy;
060    this.replication = replication;
061  }
062
063  public FSCheckpointService(final FileSystem fs,
064                             final Path base,
065                             final CheckpointNamingService namingPolicy,
066                             final short replication) {
067    this.fs = fs;
068    this.basePath = base;
069    this.namingPolicy = namingPolicy;
070    this.replication = replication;
071  }
072
073  static final Path tmpfile(final Path p) {
074    return new Path(p.getParent(), p.getName() + ".tmp");
075  }
076
077  public CheckpointWriteChannel create()
078      throws IOException {
079
080    final String name = namingPolicy.getNewName();
081    final Path p = new Path(name);
082    if (p.isUriPathAbsolute()) {
083      throw new IOException("Checkpoint name cannot be an absolute path.");
084    }
085
086    return createInternal(new Path(basePath, p));
087  }
088
089
090  CheckpointWriteChannel createInternal(final Path name) throws IOException {
091
092   /*  Create a temp file, fail if file exists.
093       The likely reason to do so (I am not the original author) is to check that the file is indeed writable.
094       Checking this directly via a file system call may lead to a time-of-check/time-of-use race condition.
095       See the pull request for REEF-1659 for discussion.
096   */
097    return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication));
098  }
099
100  @Override
101  public CheckpointReadChannel open(final CheckpointID id)
102      throws IOException, InterruptedException {
103
104    if (!(id instanceof FSCheckpointID)) {
105      throw new IllegalArgumentException(
106          "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass());
107    }
108
109    return new FSCheckpointReadChannel(
110        fs.open(((FSCheckpointID) id).getPath()));
111  }
112
113  @Override
114  public CheckpointID commit(final CheckpointWriteChannel ch)
115          throws IOException, InterruptedException {
116
117    if (ch.isOpen()) {
118      ch.close();
119    }
120
121    final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
122    final Path dst = hch.getDestination();
123
124    if (!fs.rename(tmpfile(dst), dst)) {
125      // attempt to clean up
126      abort(ch);
127      throw new IOException("Failed to promote checkpoint" +
128          tmpfile(dst) + " -> " + dst);
129    }
130
131    return new FSCheckpointID(hch.getDestination());
132  }
133
134  @Override
135  public void abort(final CheckpointWriteChannel ch) throws IOException {
136
137    if (ch.isOpen()) {
138      ch.close();
139    }
140
141    final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
142    final Path tmp = tmpfile(hch.getDestination());
143
144    try {
145      if (!fs.delete(tmp, false)) {
146        throw new IOException("Failed to delete a temporary checkpoint file during abort. Path: " + tmp);
147      }
148    } catch (final FileNotFoundException ignored) {
149      // IGNORE
150    }
151
152  }
153
154  @Override
155  public boolean delete(final CheckpointID id)
156          throws IOException, InterruptedException {
157
158    if (!(id instanceof FSCheckpointID)) {
159      throw new IllegalArgumentException(
160              "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass());
161    }
162
163    final Path tmp = ((FSCheckpointID) id).getPath();
164    try {
165      return fs.delete(tmp, false);
166    } catch (final FileNotFoundException ignored) {
167      // IGNORE
168    }
169
170    return true;
171  }
172
173  @NamedParameter(doc = "The path to be used to store the checkpoints.")
174  static class PATH implements Name<String> {
175  }
176
177  @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3")
178  static class ReplicationFactor implements Name<Short> {
179  }
180
181  private static class FSCheckpointWriteChannel
182      implements CheckpointWriteChannel {
183
184    private final Path finalDst;
185    private final WritableByteChannel out;
186    private boolean isOpen = true;
187
188    FSCheckpointWriteChannel(final Path finalDst, final FSDataOutputStream out) {
189      this.finalDst = finalDst;
190      this.out = Channels.newChannel(out);
191    }
192
193    public int write(final ByteBuffer b) throws IOException {
194      return out.write(b);
195    }
196
197    public Path getDestination() {
198      return finalDst;
199    }
200
201    @Override
202    public void close() throws IOException {
203      isOpen = false;
204      out.close();
205    }
206
207    @Override
208    public boolean isOpen() {
209      return isOpen;
210    }
211
212  }
213
214  private static class FSCheckpointReadChannel
215      implements CheckpointReadChannel {
216
217    private final ReadableByteChannel in;
218    private boolean isOpen = true;
219
220    FSCheckpointReadChannel(final FSDataInputStream in) {
221      this.in = Channels.newChannel(in);
222    }
223
224    @Override
225    public int read(final ByteBuffer bb) throws IOException {
226      return in.read(bb);
227    }
228
229    @Override
230    public void close() throws IOException {
231      isOpen = false;
232      in.close();
233    }
234
235    @Override
236    public boolean isOpen() {
237      return isOpen;
238    }
239
240  }
241
242}