This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.storage;
020
021import org.apache.reef.exception.evaluator.ServiceException;
022import org.apache.reef.exception.evaluator.StorageException;
023import org.apache.reef.io.Accumulable;
024import org.apache.reef.io.Accumulator;
025import org.apache.reef.io.Tuple;
026import org.apache.reef.io.serialization.Serializer;
027
028import java.io.IOException;
029import java.io.OutputStream;
030
031public class FramingTupleSerializer<K, V> implements
032    Serializer<Tuple<K, V>, OutputStream> {
033
034  private final Serializer<K, OutputStream> keySerializer;
035  private final Serializer<V, OutputStream> valSerializer;
036
037  public FramingTupleSerializer(
038      final Serializer<K, OutputStream> keySerializer,
039      final Serializer<V, OutputStream> valSerializer) {
040    this.keySerializer = keySerializer;
041    this.valSerializer = valSerializer;
042  }
043
044  @Override
045  public Accumulable<Tuple<K, V>> create(final OutputStream os) {
046    final FramingOutputStream faos = new FramingOutputStream(os);
047
048    return new Accumulable<Tuple<K, V>>() {
049
050      @Override
051      public Accumulator<Tuple<K, V>> accumulator() throws ServiceException {
052
053        final Accumulator<K> keyAccumulator = keySerializer.create(faos)
054            .accumulator();
055        final Accumulator<V> valAccumulator = valSerializer.create(faos)
056            .accumulator();
057        return new Accumulator<Tuple<K, V>>() {
058          private boolean first = true;
059
060          @Override
061          public void add(final Tuple<K, V> datum) throws ServiceException {
062            try {
063              if (!first) {
064                faos.nextFrame();
065              }
066              first = false;
067              keyAccumulator.add(datum.getKey());
068              faos.nextFrame();
069              valAccumulator.add(datum.getValue());
070            } catch (final IOException e) {
071              throw new ServiceException(e);
072            }
073          }
074
075          @Override
076          public void close() throws ServiceException {
077            try {
078              keyAccumulator.close();
079              valAccumulator.close();
080              faos.close();
081            } catch (final IOException e) {
082              throw new StorageException(e);
083            }
084          }
085        };
086      }
087    };
088  }
089
090}