001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.storage.local; 020 021import org.apache.reef.exception.evaluator.ServiceException; 022import org.apache.reef.exception.evaluator.ServiceRuntimeException; 023import org.apache.reef.io.Accumulable; 024import org.apache.reef.io.Accumulator; 025import org.apache.reef.io.Spool; 026import org.apache.reef.io.serialization.Deserializer; 027import org.apache.reef.io.serialization.Serializer; 028 029import java.io.*; 030import java.util.ConcurrentModificationException; 031import java.util.Iterator; 032 033/** 034 * A SpoolFile backed by the filesystem. 035 * 036 * @param <T> 037 */ 038public final class SerializerFileSpool<T> implements Spool<T> { 039 040 private final File file; 041 private final Accumulator<T> accumulator; 042 private final Deserializer<T, InputStream> deserializer; 043 private boolean canAppend = true; 044 private boolean canGetAccumulator = true; 045 046 public SerializerFileSpool(final LocalStorageService service, 047 final Serializer<T, OutputStream> out, final Deserializer<T, InputStream> in) 048 throws ServiceException { 049 this.file = service.getScratchSpace().newFile(); 050 final Accumulable<T> accumulable; 051 try { 052 accumulable = out.create(new BufferedOutputStream(new FileOutputStream( 053 file))); 054 } catch (final FileNotFoundException e) { 055 throw new IllegalStateException( 056 "Unable to create temporary file:" + file, e); 057 } 058 this.deserializer = in; 059 060 final Accumulator<T> acc = accumulable.accumulator(); 061 this.accumulator = new Accumulator<T>() { 062 @Override 063 public void add(final T datum) throws ServiceException { 064 if (!canAppend) { 065 throw new ConcurrentModificationException( 066 "Attempt to append after creating iterator!"); 067 } 068 acc.add(datum); 069 } 070 071 @Override 072 public void close() throws ServiceException { 073 canAppend = false; 074 acc.close(); 075 } 076 }; 077 } 078 079 @Override 080 public Iterator<T> iterator() { 081 try { 082 if (canAppend) { 083 throw new IllegalStateException( 084 "Need to call close() on accumulator before calling iterator()!"); 085 } 086 return deserializer.create( 087 new BufferedInputStream(new FileInputStream(file))).iterator(); 088 } catch (final IOException e) { 089 throw new ServiceRuntimeException(e); 090 } 091 } 092 093 @Override 094 public Accumulator<T> accumulator() { 095 if (!canGetAccumulator) { 096 throw new UnsupportedOperationException("Can only getAccumulator() once!"); 097 } 098 canGetAccumulator = false; 099 return this.accumulator; 100 } 101}