This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl;
020
021
022import org.apache.reef.io.network.impl.StreamingCodec;
023import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type;
024
025import javax.inject.Inject;
026import java.io.*;
027
028/**
029 * Codec for {@link org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage}.
030 */
031public class GroupCommunicationMessageCodec implements StreamingCodec<GroupCommunicationMessage> {
032
033  @Inject
034  public GroupCommunicationMessageCodec() {
035    // Intentionally Blank
036  }
037
038  @Override
039  public GroupCommunicationMessage decode(final byte[] data) {
040    try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) {
041      try (DataInputStream dais = new DataInputStream(bais)) {
042        return decodeFromStream(dais);
043      }
044    } catch (final IOException e) {
045      throw new RuntimeException("IOException", e);
046    }
047  }
048
049  @Override
050  public GroupCommunicationMessage decodeFromStream(final DataInputStream stream) {
051    try {
052      final String groupName = stream.readUTF();
053      final String operName = stream.readUTF();
054      final Type msgType = Type.valueOf(stream.readInt());
055      final String from = stream.readUTF();
056      final int srcVersion = stream.readInt();
057      final String to = stream.readUTF();
058      final int dstVersion = stream.readInt();
059      final byte[][] gcmData = new byte[stream.readInt()][];
060      for (int i = 0; i < gcmData.length; i++) {
061        gcmData[i] = new byte[stream.readInt()];
062        stream.readFully(gcmData[i]);
063      }
064      return new GroupCommunicationMessage(
065          groupName,
066          operName,
067          msgType,
068          from,
069          srcVersion,
070          to,
071          dstVersion,
072          gcmData);
073    } catch (final IOException e) {
074      throw new RuntimeException("IOException", e);
075    }
076  }
077
078  @Override
079  public byte[] encode(final GroupCommunicationMessage msg) {
080    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
081      try (DataOutputStream daos = new DataOutputStream(baos)) {
082        encodeToStream(msg, daos);
083      }
084      return baos.toByteArray();
085    } catch (final IOException e) {
086      throw new RuntimeException("IOException", e);
087    }
088  }
089
090  @Override
091  public void encodeToStream(final GroupCommunicationMessage msg, final DataOutputStream stream) {
092    try {
093      stream.writeUTF(msg.getGroupname());
094      stream.writeUTF(msg.getOperatorname());
095      stream.writeInt(msg.getType().getNumber());
096      stream.writeUTF(msg.getSrcid());
097      stream.writeInt(msg.getSrcVersion());
098      stream.writeUTF(msg.getDestid());
099      stream.writeInt(msg.getVersion());
100      stream.writeInt(msg.getMsgsCount());
101      for (final byte[] b : msg.getData()) {
102        stream.writeInt(b.length);
103        stream.write(b);
104      }
105    } catch (final IOException e) {
106      throw new RuntimeException("IOException", e);
107    }
108  }
109
110}