001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.storage.ram; 020 021import org.apache.reef.exception.evaluator.StorageException; 022import org.apache.reef.io.Accumulator; 023import org.apache.reef.io.Spool; 024 025import java.util.Comparator; 026import java.util.Iterator; 027import java.util.PriorityQueue; 028 029public class SortingRamSpool<T> implements Spool<T> { 030 private final PriorityQueue<T> heap; 031 private boolean ready = false; 032 private Accumulator<T> acc = new Accumulator<T>() { 033 @Override 034 public void add(final T datum) throws StorageException { 035 if (ready) { 036 throw new IllegalStateException("add called after close!"); 037 } 038 heap.add(datum); 039 } 040 041 @Override 042 public void close() throws StorageException { 043 ready = true; 044 } 045 }; 046 private Iterator<T> it = new Iterator<T>() { 047 048 @Override 049 public boolean hasNext() { 050 return !heap.isEmpty(); 051 } 052 053 @Override 054 public T next() { 055 return heap.remove(); 056 } 057 058 @Override 059 public void remove() { 060 throw new UnsupportedOperationException( 061 "This iterator consumes the data it returns. remove() does not make any sense!"); 062 } 063 064 }; 065 066 public SortingRamSpool() { 067 heap = new PriorityQueue<>(); 068 } 069 070 public SortingRamSpool(final Comparator<T> c) { 071 heap = new PriorityQueue<>(11, c); 072 } 073 074 @Override 075 public Iterator<T> iterator() { 076 if (!ready) { 077 throw new IllegalStateException("Cannot call iterator() while accumulator is still open!"); 078 } 079 final Iterator<T> ret = it; 080 it = null; 081 return ret; 082 } 083 084 @Override 085 public Accumulator<T> accumulator() throws StorageException { 086 final Accumulator<T> ret = acc; 087 acc = null; 088 return ret; 089 } 090 091}