This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.storage.local;
020
021import org.apache.reef.exception.evaluator.ServiceException;
022import org.apache.reef.exception.evaluator.ServiceRuntimeException;
023import org.apache.reef.io.Accumulable;
024import org.apache.reef.io.Accumulator;
025import org.apache.reef.io.Spool;
026import org.apache.reef.io.serialization.Deserializer;
027import org.apache.reef.io.serialization.Serializer;
028
029import java.io.*;
030import java.util.ConcurrentModificationException;
031import java.util.Iterator;
032
033/**
034 * A SpoolFile backed by the filesystem.
035 *
036 * @param <T>
037 */
038public final class SerializerFileSpool<T> implements Spool<T> {
039
040  private final File file;
041  private final Accumulator<T> accumulator;
042  private final Deserializer<T, InputStream> deserializer;
043  private boolean canAppend = true;
044  private boolean canGetAccumulator = true;
045
046  public SerializerFileSpool(final LocalStorageService service,
047                             final Serializer<T, OutputStream> out, final Deserializer<T, InputStream> in)
048      throws ServiceException {
049    this.file = service.getScratchSpace().newFile();
050    final Accumulable<T> accumulable;
051    try {
052      accumulable = out.create(new BufferedOutputStream(new FileOutputStream(
053          file)));
054    } catch (final FileNotFoundException e) {
055      throw new IllegalStateException(
056          "Unable to create temporary file:" + file, e);
057    }
058    this.deserializer = in;
059
060    final Accumulator<T> acc = accumulable.accumulator();
061    this.accumulator = new Accumulator<T>() {
062      @Override
063      public void add(final T datum) throws ServiceException {
064        if (!canAppend) {
065          throw new ConcurrentModificationException(
066              "Attempt to append after creating iterator!");
067        }
068        acc.add(datum);
069      }
070
071      @Override
072      public void close() throws ServiceException {
073        canAppend = false;
074        acc.close();
075      }
076    };
077  }
078
079  @Override
080  public Iterator<T> iterator() {
081    try {
082      if (canAppend) {
083        throw new IllegalStateException(
084            "Need to call close() on accumulator before calling iterator()!");
085      }
086      return deserializer.create(
087          new BufferedInputStream(new FileInputStream(file))).iterator();
088    } catch (final IOException e) {
089      throw new ServiceRuntimeException(e);
090    }
091  }
092
093  @Override
094  public Accumulator<T> accumulator() {
095    if (!canGetAccumulator) {
096      throw new UnsupportedOperationException("Can only getAccumulator() once!");
097    }
098    canGetAccumulator = false;
099    return this.accumulator;
100  }
101}