This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.utils;
020
021import org.apache.reef.wake.remote.Decoder;
022
023import javax.inject.Inject;
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.IOException;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.logging.Level;
030import java.util.logging.Logger;
031
032/**
033 * Decode messages that was created by {@code ScatterEncoder}.
034 */
035public final class ScatterDecoder implements Decoder<ScatterData> {
036  private static final Logger LOG = Logger.getLogger(ScatterDecoder.class.getName());
037
038  @Inject
039  ScatterDecoder() {
040  }
041
042  public ScatterData decode(final byte[] data) {
043    try (final DataInputStream dstream = new DataInputStream(new ByteArrayInputStream(data))) {
044      final int elementCount = dstream.readInt();
045
046      // first read data that I should receive
047      final byte[][] myData = new byte[elementCount][];
048      for (int index = 0; index < elementCount; index++) {
049        final int encodedElementLength = dstream.readInt();
050        myData[index] =  new byte[encodedElementLength];
051        if (dstream.read(myData[index]) == -1) {
052          LOG.log(Level.FINE, "No data read because end of stream was reached");
053        }
054      }
055
056      // and then read the data intended for my children
057      final Map<String, byte[]> childDataMap = new HashMap<>();
058      while (dstream.available() > 0) {
059        final String childId = dstream.readUTF();
060        final byte[] childData = new byte[dstream.readInt()];
061        if (dstream.read(childData) == -1) {
062          LOG.log(Level.FINE, "No data read because end of stream was reached");
063        }
064        childDataMap.put(childId, childData);
065      }
066
067      return new ScatterData(myData, childDataMap);
068
069    } catch (final IOException e) {
070      throw new RuntimeException("IOException", e);
071    }
072  }
073}