This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.storage;
020
021import org.apache.reef.exception.evaluator.ServiceRuntimeException;
022
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Iterator;
027
028public class FramingInputStream extends DataInputStream implements Iterable<byte[]> {
029
030  public FramingInputStream(final InputStream in) {
031    super(in);
032  }
033
034  public byte[] readFrame() throws IOException {
035    final int i = readInt();
036    if (i == -1) {
037      return null;
038    }
039    final byte[] b = new byte[i];
040    readFully(b);
041    return b;
042  }
043
044  @Override
045  public Iterator<byte[]> iterator() {
046    try {
047      return new Iterator<byte[]>() {
048        private byte[] cur = readFrame();
049
050        @Override
051        public boolean hasNext() {
052          return cur != null;
053        }
054
055        @Override
056        public byte[] next() {
057          final byte[] ret = cur;
058          try {
059            cur = readFrame();
060          } catch (final IOException e) {
061            throw new ServiceRuntimeException(e);
062          }
063          return ret;
064        }
065
066        @Override
067        public void remove() {
068          throw new UnsupportedOperationException();
069        }
070      };
071    } catch (final IOException e) {
072      throw new ServiceRuntimeException(e);
073    }
074  }
075
076}