001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.io.network.group.api.driver.TaskNode; 022import org.apache.reef.io.network.util.Pair; 023import org.apache.reef.wake.Identifier; 024import org.apache.reef.wake.IdentifierFactory; 025 026import java.io.*; 027import java.util.LinkedList; 028import java.util.List; 029 030/** 031 * Utility class for encoding a Topology into a byte array and vice versa. 032 */ 033public final class TopologySerializer { 034 035 /** 036 * Shouldn't be instantiated. 037 */ 038 private TopologySerializer() { 039 } 040 041 /** 042 * Recursively encode TaskNodes of a Topology into a byte array. 043 * 044 * @param root the root node of the subtree to encode 045 * @return encoded byte array 046 */ 047 public static byte[] encode(final TaskNode root) { 048 try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream(); 049 final DataOutputStream dstream = new DataOutputStream(bstream)) { 050 encodeHelper(dstream, root); 051 return bstream.toByteArray(); 052 053 } catch (final IOException e) { 054 throw new RuntimeException("Exception while encoding topology of " + root.getTaskId(), e); 055 } 056 } 057 058 private static void encodeHelper(final DataOutputStream dstream, 059 final TaskNode node) throws IOException { 060 dstream.writeUTF(node.getTaskId()); 061 dstream.writeInt(node.getNumberOfChildren()); 062 for (final TaskNode child : node.getChildren()) { 063 encodeHelper(dstream, child); 064 } 065 } 066 067 /** 068 * Recursively translate a byte array into a TopologySimpleNode and a list of task Identifiers. 069 * 070 * @param data encoded byte array 071 * @param ifac IdentifierFactory needed to generate Identifiers from String Ids 072 * @return decoded TopologySimpleNode and a lexicographically sorted list of task Identifiers 073 */ 074 public static Pair<TopologySimpleNode, List<Identifier>> decode( 075 final byte[] data, 076 final IdentifierFactory ifac) { 077 078 try (final DataInputStream dstream = new DataInputStream(new ByteArrayInputStream(data))) { 079 final List<Identifier> activeSlaveTasks = new LinkedList<>(); 080 final TopologySimpleNode retNode = decodeHelper(dstream, activeSlaveTasks, ifac); 081 return new Pair<>(retNode, activeSlaveTasks); 082 083 } catch (final IOException e) { 084 throw new RuntimeException("Exception while decoding message", e); 085 } 086 } 087 088 private static TopologySimpleNode decodeHelper( 089 final DataInputStream dstream, 090 final List<Identifier> activeSlaveTasks, 091 final IdentifierFactory ifac) throws IOException { 092 final String taskId = dstream.readUTF(); 093 activeSlaveTasks.add(ifac.getNewInstance(taskId)); 094 final TopologySimpleNode retNode = new TopologySimpleNode(taskId); 095 096 final int children = dstream.readInt(); 097 for (int index = 0; index < children; index++) { 098 final TopologySimpleNode child = decodeHelper(dstream, activeSlaveTasks, ifac); 099 retNode.addChild(child); 100 } 101 102 return retNode; 103 } 104}