This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.checkpoint.fs;
020
021import org.apache.hadoop.fs.FSDataInputStream;
022import org.apache.hadoop.fs.FSDataOutputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.reef.io.checkpoint.CheckpointID;
026import org.apache.reef.io.checkpoint.CheckpointNamingService;
027import org.apache.reef.io.checkpoint.CheckpointService;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.NamedParameter;
030import org.apache.reef.tang.annotations.Parameter;
031
032import javax.inject.Inject;
033import java.io.FileNotFoundException;
034import java.io.IOException;
035import java.nio.ByteBuffer;
036import java.nio.channels.Channels;
037import java.nio.channels.ReadableByteChannel;
038import java.nio.channels.WritableByteChannel;
039
040/**
041 * A FileSystem based CheckpointService.
042 */
043public class FSCheckpointService implements CheckpointService {
044
045  private final Path base;
046  private final FileSystem fs;
047  private final CheckpointNamingService namingPolicy;
048  private final short replication;
049
050  @Inject
051  FSCheckpointService(final FileSystem fs,
052                      @Parameter(PATH.class) final String basePath,
053                      final CheckpointNamingService namingPolicy,
054                      @Parameter(ReplicationFactor.class) final short replication) {
055    this.fs = fs;
056    this.base = new Path(basePath);
057    this.namingPolicy = namingPolicy;
058    this.replication = replication;
059  }
060
061  public FSCheckpointService(final FileSystem fs,
062                             final Path base,
063                             final CheckpointNamingService namingPolicy,
064                             final short replication) {
065    this.fs = fs;
066    this.base = base;
067    this.namingPolicy = namingPolicy;
068    this.replication = replication;
069  }
070
071  static final Path tmpfile(final Path p) {
072    return new Path(p.getParent(), p.getName() + ".tmp");
073  }
074
075  public CheckpointWriteChannel create()
076      throws IOException {
077
078    final String name = namingPolicy.getNewName();
079
080    final Path p = new Path(name);
081    if (p.isUriPathAbsolute()) {
082      throw new IOException("Checkpoint cannot be an absolute path");
083    }
084    return createInternal(new Path(base, p));
085  }
086
087  CheckpointWriteChannel createInternal(final Path name) throws IOException {
088
089    //create a temp file, fail if file exists
090    return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication));
091  }
092
093  @Override
094  public CheckpointReadChannel open(final CheckpointID id)
095      throws IOException, InterruptedException {
096    if (!(id instanceof FSCheckpointID)) {
097      throw new IllegalArgumentException(
098          "Mismatched checkpoint type: " + id.getClass());
099    }
100    return new FSCheckpointReadChannel(
101        fs.open(((FSCheckpointID) id).getPath()));
102  }
103
104  @Override
105  public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException,
106      InterruptedException {
107    if (ch.isOpen()) {
108      ch.close();
109    }
110    final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
111    final Path dst = hch.getDestination();
112    if (!fs.rename(tmpfile(dst), dst)) {
113      // attempt to clean up
114      abort(ch);
115      throw new IOException("Failed to promote checkpoint" +
116          tmpfile(dst) + " -> " + dst);
117    }
118    return new FSCheckpointID(hch.getDestination());
119  }
120
121  @Override
122  public void abort(final CheckpointWriteChannel ch) throws IOException {
123    if (ch.isOpen()) {
124      ch.close();
125    }
126    final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
127    final Path tmp = tmpfile(hch.getDestination());
128    try {
129      if (!fs.delete(tmp, false)) {
130        throw new IOException("Failed to delete checkpoint during abort");
131      }
132    } catch (final FileNotFoundException ignored) {
133      // IGNORE
134    }
135  }
136
137  @Override
138  public boolean delete(final CheckpointID id) throws IOException,
139      InterruptedException {
140    if (!(id instanceof FSCheckpointID)) {
141      throw new IllegalArgumentException(
142          "Mismatched checkpoint type: " + id.getClass());
143    }
144    final Path tmp = ((FSCheckpointID) id).getPath();
145    try {
146      return fs.delete(tmp, false);
147    } catch (final FileNotFoundException ignored) {
148      // IGNORE
149    }
150    return true;
151  }
152
153  @NamedParameter(doc = "The path to be used to store the checkpoints.")
154  static class PATH implements Name<String> {
155  }
156
157  @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3")
158  static class ReplicationFactor implements Name<Short> {
159  }
160
161  private static class FSCheckpointWriteChannel
162      implements CheckpointWriteChannel {
163    private final Path finalDst;
164    private final WritableByteChannel out;
165    private boolean isOpen = true;
166
167    FSCheckpointWriteChannel(final Path finalDst, final FSDataOutputStream out) {
168      this.finalDst = finalDst;
169      this.out = Channels.newChannel(out);
170    }
171
172    public int write(final ByteBuffer b) throws IOException {
173      return out.write(b);
174    }
175
176    public Path getDestination() {
177      return finalDst;
178    }
179
180    @Override
181    public void close() throws IOException {
182      isOpen = false;
183      out.close();
184    }
185
186    @Override
187    public boolean isOpen() {
188      return isOpen;
189    }
190
191  }
192
193  private static class FSCheckpointReadChannel
194      implements CheckpointReadChannel {
195
196    private final ReadableByteChannel in;
197    private boolean isOpen = true;
198
199    FSCheckpointReadChannel(final FSDataInputStream in) {
200      this.in = Channels.newChannel(in);
201    }
202
203    @Override
204    public int read(final ByteBuffer bb) throws IOException {
205      return in.read(bb);
206    }
207
208    @Override
209    public void close() throws IOException {
210      isOpen = false;
211      in.close();
212    }
213
214    @Override
215    public boolean isOpen() {
216      return isOpen;
217    }
218
219  }
220
221}