001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.storage; 020 021import org.apache.reef.exception.evaluator.ServiceException; 022import org.apache.reef.exception.evaluator.ServiceRuntimeException; 023import org.apache.reef.io.Tuple; 024import org.apache.reef.io.serialization.Deserializer; 025 026import java.io.DataInputStream; 027import java.io.IOException; 028import java.io.InputStream; 029import java.util.Iterator; 030import java.util.NoSuchElementException; 031 032public class FramingTupleDeserializer<K, V> implements 033 Deserializer<Tuple<K, V>, InputStream> { 034 035 private final Deserializer<K, InputStream> keyDeserializer; 036 private final Deserializer<V, InputStream> valDeserializer; 037 038 public FramingTupleDeserializer(final Deserializer<K, InputStream> keyDeserializer, 039 final Deserializer<V, InputStream> valDeserializer) { 040 this.keyDeserializer = keyDeserializer; 041 this.valDeserializer = valDeserializer; 042 } 043 044 @Override 045 public Iterable<Tuple<K, V>> create(final InputStream ins) { 046 final DataInputStream in = new DataInputStream(ins); 047 final Iterable<K> keyItbl = keyDeserializer.create(in); 048 final Iterable<V> valItbl = valDeserializer.create(in); 049 return new Iterable<Tuple<K, V>>() { 050 @Override 051 public Iterator<Tuple<K, V>> iterator() { 052 final Iterator<K> keyIt = keyItbl.iterator(); 053 final Iterator<V> valIt = valItbl.iterator(); 054 try { 055 return new Iterator<Tuple<K, V>>() { 056 057 private int readFrameLength() throws ServiceException { 058 try { 059 return in.readInt(); 060 } catch (final IOException e) { 061 throw new ServiceRuntimeException(e); 062 } 063 } 064 065 private int nextFrameLength = readFrameLength(); 066 067 @Override 068 public boolean hasNext() { 069 return nextFrameLength != -1; 070 } 071 072 @Override 073 public Tuple<K, V> next() { 074 try { 075 if (nextFrameLength == -1) { 076 throw new NoSuchElementException(); 077 } 078 final K k = keyIt.next(); 079 readFrameLength(); 080 final V v = valIt.next(); 081 nextFrameLength = readFrameLength(); 082 return new Tuple<>(k, v); 083 } catch (final ServiceException e) { 084 throw new ServiceRuntimeException(e); 085 } 086 } 087 088 @Override 089 public void remove() { 090 throw new UnsupportedOperationException(); 091 } 092 }; 093 } catch (final ServiceException e) { 094 throw new ServiceRuntimeException(e); 095 } 096 } 097 }; 098 } 099 100}