001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.common; 020 021import com.esotericsoftware.kryo.Kryo; 022import com.esotericsoftware.kryo.io.Input; 023import com.esotericsoftware.kryo.io.Output; 024import com.esotericsoftware.kryo.pool.KryoFactory; 025import com.esotericsoftware.kryo.pool.KryoPool; 026import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; 027import org.apache.reef.annotations.Unstable; 028import org.apache.reef.annotations.audience.Private; 029 030import javax.inject.Inject; 031import java.io.ByteArrayInputStream; 032import java.io.ByteArrayOutputStream; 033 034/** 035 * The one and only serializer for the Vortex protocol. 036 */ 037@Private 038@Unstable 039public final class KryoUtils { 040 /** 041 * For reducing Kryo object instantiation cost. 042 */ 043 private final KryoPool kryoPool; 044 045 @Inject 046 private KryoUtils() { 047 final KryoFactory factory = new KryoFactory() { 048 @Override 049 public Kryo create() { 050 final Kryo kryo = new Kryo(); 051 UnmodifiableCollectionsSerializer.registerSerializers(kryo); // Required to serialize/deserialize Throwable 052 return kryo; 053 } 054 }; 055 kryoPool = new KryoPool.Builder(factory).softReferences().build(); 056 } 057 058 public byte[] serialize(final Object object) { 059 try (final Output out = new Output(new ByteArrayOutputStream())) { 060 final Kryo kryo = kryoPool.borrow(); 061 kryo.writeClassAndObject(out, object); 062 kryoPool.release(kryo); 063 return out.toBytes(); 064 } 065 } 066 067 public Object deserialize(final byte[] bytes) { 068 try (final Input input = new Input(new ByteArrayInputStream(bytes))) { 069 final Kryo kryo = kryoPool.borrow(); 070 final Object object = kryo.readClassAndObject(input); 071 kryoPool.release(kryo); 072 return object; 073 } 074 } 075}