001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl; 020 021 022import org.apache.reef.io.network.impl.StreamingCodec; 023import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type; 024 025import javax.inject.Inject; 026import java.io.*; 027 028/** 029 * Codec for {@link org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage}. 030 */ 031public class GroupCommunicationMessageCodec implements StreamingCodec<GroupCommunicationMessage> { 032 033 @Inject 034 public GroupCommunicationMessageCodec() { 035 // Intentionally Blank 036 } 037 038 @Override 039 public GroupCommunicationMessage decode(final byte[] data) { 040 try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) { 041 try (DataInputStream dais = new DataInputStream(bais)) { 042 return decodeFromStream(dais); 043 } 044 } catch (final IOException e) { 045 throw new RuntimeException("IOException", e); 046 } 047 } 048 049 @Override 050 public GroupCommunicationMessage decodeFromStream(final DataInputStream stream) { 051 try { 052 final String groupName = stream.readUTF(); 053 final String operName = stream.readUTF(); 054 final Type msgType = Type.valueOf(stream.readInt()); 055 final String from = stream.readUTF(); 056 final int srcVersion = stream.readInt(); 057 final String to = stream.readUTF(); 058 final int dstVersion = stream.readInt(); 059 final byte[][] gcmData = new byte[stream.readInt()][]; 060 for (int i = 0; i < gcmData.length; i++) { 061 gcmData[i] = new byte[stream.readInt()]; 062 stream.readFully(gcmData[i]); 063 } 064 return new GroupCommunicationMessage( 065 groupName, 066 operName, 067 msgType, 068 from, 069 srcVersion, 070 to, 071 dstVersion, 072 gcmData); 073 } catch (final IOException e) { 074 throw new RuntimeException("IOException", e); 075 } 076 } 077 078 @Override 079 public byte[] encode(final GroupCommunicationMessage msg) { 080 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { 081 try (DataOutputStream daos = new DataOutputStream(baos)) { 082 encodeToStream(msg, daos); 083 } 084 return baos.toByteArray(); 085 } catch (final IOException e) { 086 throw new RuntimeException("IOException", e); 087 } 088 } 089 090 @Override 091 public void encodeToStream(final GroupCommunicationMessage msg, final DataOutputStream stream) { 092 try { 093 stream.writeUTF(msg.getGroupname()); 094 stream.writeUTF(msg.getOperatorname()); 095 stream.writeInt(msg.getType().getNumber()); 096 stream.writeUTF(msg.getSrcid()); 097 stream.writeInt(msg.getSrcVersion()); 098 stream.writeUTF(msg.getDestid()); 099 stream.writeInt(msg.getVersion()); 100 stream.writeInt(msg.getMsgsCount()); 101 for (final byte[] b : msg.getData()) { 102 stream.writeInt(b.length); 103 stream.write(b); 104 } 105 } catch (final IOException e) { 106 throw new RuntimeException("IOException", e); 107 } 108 } 109 110}