This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.impl;
020
021import org.apache.commons.codec.binary.Base64;
022import org.apache.hadoop.io.Writable;
023import org.apache.hadoop.mapred.JobConf;
024import org.apache.hadoop.util.ReflectionUtils;
025import org.apache.reef.io.serialization.Codec;
026
027import java.io.*;
028
029/**
030 * A serializer class that serializes {@link Writable}s
031 * into String using the below {@link Codec} that
032 * encodes and decodes {@link Writable}s
033 * By default this stores the class name in the serialized
034 * form so that the specific type can be instantiated on
035 * de-serialization. However, this also needs the jobconf
036 * to passed in while de-serialization
037 */
038public final class WritableSerializer {
039  public static <E extends Writable> String serialize(final E writable) {
040    final WritableCodec<E> writableCodec = new WritableCodec<>();
041    return Base64.encodeBase64String(writableCodec.encode(writable));
042  }
043
044  public static <E extends Writable> E deserialize(final String serializedWritable) {
045    final WritableCodec<E> writableCodec = new WritableCodec<>();
046    return writableCodec.decode(Base64.decodeBase64(serializedWritable));
047  }
048
049  public static <E extends Writable> E deserialize(final String serializedWritable, final JobConf jobConf) {
050    final WritableCodec<E> writableCodec = new WritableCodec<>(jobConf);
051    return writableCodec.decode(Base64.decodeBase64(serializedWritable));
052  }
053
054  static class WritableCodec<E extends Writable> implements Codec<E> {
055    private final JobConf jobConf;
056
057    WritableCodec(final JobConf jobConf) {
058      this.jobConf = jobConf;
059    }
060
061    WritableCodec() {
062      this.jobConf = new JobConf();
063    }
064
065    @Override
066    public E decode(final byte[] bytes) {
067      final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
068      try (DataInputStream dais = new DataInputStream(bais)) {
069        final String className = dais.readUTF();
070        final E writable = (E) ReflectionUtils.newInstance(Class.forName(className), jobConf);
071        writable.readFields(dais);
072        return writable;
073      } catch (final IOException e) {
074        throw new RuntimeException("Could not de-serialize JobConf", e);
075      } catch (final ClassNotFoundException e) {
076        throw new RuntimeException("Could not instantiate specific writable class", e);
077      }
078    }
079
080    @Override
081    public byte[] encode(final E writable) {
082      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
083      try (final DataOutputStream daos = new DataOutputStream(baos)) {
084        daos.writeUTF(writable.getClass().getName());
085        writable.write(daos);
086        return baos.toByteArray();
087      } catch (final IOException e) {
088        throw new RuntimeException("Could not serialize JobConf", e);
089      }
090    }
091  }
092
093  /**
094   * Empty private constructor to prohibit instantiation of utility class.
095   */
096  private WritableSerializer() {
097  }
098}