This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.storage.ram;
020
021import org.apache.reef.exception.evaluator.StorageException;
022import org.apache.reef.io.Accumulator;
023import org.apache.reef.io.Spool;
024
025import java.util.Comparator;
026import java.util.Iterator;
027import java.util.PriorityQueue;
028
029public class SortingRamSpool<T> implements Spool<T> {
030  private final PriorityQueue<T> heap;
031  private boolean ready = false;
032  private Accumulator<T> acc = new Accumulator<T>() {
033    @Override
034    public void add(final T datum) throws StorageException {
035      if (ready) {
036        throw new IllegalStateException("add called after close!");
037      }
038      heap.add(datum);
039    }
040
041    @Override
042    public void close() throws StorageException {
043      ready = true;
044    }
045  };
046  private Iterator<T> it = new Iterator<T>() {
047
048    @Override
049    public boolean hasNext() {
050      return !heap.isEmpty();
051    }
052
053    @Override
054    public T next() {
055      return heap.remove();
056    }
057
058    @Override
059    public void remove() {
060      throw new UnsupportedOperationException(
061          "This iterator consumes the data it returns. remove() does not make any sense!");
062    }
063
064  };
065
066  public SortingRamSpool() {
067    heap = new PriorityQueue<>();
068  }
069
070  public SortingRamSpool(final Comparator<T> c) {
071    heap = new PriorityQueue<>(11, c);
072  }
073
074  @Override
075  public Iterator<T> iterator() {
076    if (!ready) {
077      throw new IllegalStateException("Cannot call iterator() while accumulator is still open!");
078    }
079    final Iterator<T> ret = it;
080    it = null;
081    return ret;
082  }
083
084  @Override
085  public Accumulator<T> accumulator() throws StorageException {
086    final Accumulator<T> ret = acc;
087    acc = null;
088    return ret;
089  }
090
091}