001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.storage; 020 021import org.apache.reef.exception.evaluator.ServiceException; 022import org.apache.reef.exception.evaluator.StorageException; 023import org.apache.reef.io.Accumulable; 024import org.apache.reef.io.Accumulator; 025import org.apache.reef.io.Tuple; 026import org.apache.reef.io.serialization.Serializer; 027 028import java.io.IOException; 029import java.io.OutputStream; 030 031public class FramingTupleSerializer<K, V> implements 032 Serializer<Tuple<K, V>, OutputStream> { 033 034 private final Serializer<K, OutputStream> keySerializer; 035 private final Serializer<V, OutputStream> valSerializer; 036 037 public FramingTupleSerializer( 038 final Serializer<K, OutputStream> keySerializer, 039 final Serializer<V, OutputStream> valSerializer) { 040 this.keySerializer = keySerializer; 041 this.valSerializer = valSerializer; 042 } 043 044 @Override 045 public Accumulable<Tuple<K, V>> create(final OutputStream os) { 046 final FramingOutputStream faos = new FramingOutputStream(os); 047 048 return new Accumulable<Tuple<K, V>>() { 049 050 @Override 051 public Accumulator<Tuple<K, V>> accumulator() throws ServiceException { 052 053 final Accumulator<K> keyAccumulator = keySerializer.create(faos) 054 .accumulator(); 055 final Accumulator<V> valAccumulator = valSerializer.create(faos) 056 .accumulator(); 057 return new Accumulator<Tuple<K, V>>() { 058 private boolean first = true; 059 060 @Override 061 public void add(final Tuple<K, V> datum) throws ServiceException { 062 try { 063 if (!first) { 064 faos.nextFrame(); 065 } 066 first = false; 067 keyAccumulator.add(datum.getKey()); 068 faos.nextFrame(); 069 valAccumulator.add(datum.getValue()); 070 } catch (final IOException e) { 071 throw new ServiceException(e); 072 } 073 } 074 075 @Override 076 public void close() throws ServiceException { 077 try { 078 keyAccumulator.close(); 079 valAccumulator.close(); 080 faos.close(); 081 } catch (final IOException e) { 082 throw new StorageException(e); 083 } 084 } 085 }; 086 } 087 }; 088 } 089 090}