001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.storage; 020 021import org.apache.reef.exception.evaluator.ServiceException; 022import org.apache.reef.exception.evaluator.StorageException; 023import org.apache.reef.io.Accumulable; 024import org.apache.reef.io.Accumulator; 025import org.apache.reef.io.Tuple; 026import org.apache.reef.io.serialization.Serializer; 027 028import java.io.IOException; 029import java.io.OutputStream; 030 031public class FramingTupleSerializer<K, V> implements 032 Serializer<Tuple<K, V>, OutputStream> { 033 034 private final Serializer<K, OutputStream> keySerializer; 035 private final Serializer<V, OutputStream> valSerializer; 036 037 public FramingTupleSerializer( 038 final Serializer<K, OutputStream> keySerializer, 039 final Serializer<V, OutputStream> valSerializer) { 040 this.keySerializer = keySerializer; 041 this.valSerializer = valSerializer; 042 } 043 044 @Override 045 public Accumulable<Tuple<K, V>> create(final OutputStream os) { 046 final FramingOutputStream faos = new FramingOutputStream(os); 047 048 return new Accumulable<Tuple<K, V>>() { 049 050 @Override 051 public Accumulator<Tuple<K, V>> accumulator() throws ServiceException { 052 053 final Accumulator<K> keyAccumulator = keySerializer.create(faos) 054 .accumulator(); 055 final Accumulator<V> valAccumulator = valSerializer.create(faos) 056 .accumulator(); 057 return new Accumulator<Tuple<K, V>>() { 058 boolean first = true; 059 060 @Override 061 public void add(Tuple<K, V> datum) throws ServiceException { 062 if (!first) { 063 faos.nextFrame(); 064 } 065 first = false; 066 keyAccumulator.add(datum.getKey()); 067 faos.nextFrame(); 068 valAccumulator.add(datum.getValue()); 069 } 070 071 @Override 072 public void close() throws ServiceException { 073 try { 074 keyAccumulator.close(); 075 valAccumulator.close(); 076 faos.close(); 077 } catch (IOException e) { 078 throw new StorageException(e); 079 } 080 } 081 }; 082 } 083 }; 084 } 085 086}