001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming.serialization; 020 021import org.apache.reef.io.naming.NameAssignment; 022import org.apache.reef.io.network.naming.NameAssignmentTuple; 023import org.apache.reef.io.network.naming.avro.AvroNamingAssignment; 024import org.apache.reef.io.network.naming.avro.AvroNamingLookupResponse; 025import org.apache.reef.io.network.naming.exception.NamingRuntimeException; 026import org.apache.reef.wake.IdentifierFactory; 027import org.apache.reef.wake.remote.Codec; 028 029import javax.inject.Inject; 030import java.net.InetSocketAddress; 031import java.util.ArrayList; 032import java.util.List; 033 034/** 035 * Naming lookup response codec 036 */ 037public final class NamingLookupResponseCodec implements Codec<NamingLookupResponse> { 038 039 private final IdentifierFactory factory; 040 041 /** 042 * Constructs a naming lookup response codec 043 * 044 * @param factory the identifier factory 045 */ 046 @Inject 047 public NamingLookupResponseCodec(final IdentifierFactory factory) { 048 this.factory = factory; 049 } 050 051 /** 052 * Encodes name assignments to bytes 053 * 054 * @param obj the naming lookup response 055 * @return a byte array 056 */ 057 @Override 058 public byte[] encode(NamingLookupResponse obj) { 059 final List<AvroNamingAssignment> assignments = new ArrayList<>(obj.getNameAssignments().size()); 060 for (final NameAssignment nameAssignment : obj.getNameAssignments()) { 061 assignments.add(AvroNamingAssignment.newBuilder() 062 .setId(nameAssignment.getIdentifier().toString()) 063 .setHost(nameAssignment.getAddress().getHostName()) 064 .setPort(nameAssignment.getAddress().getPort()) 065 .build()); 066 } 067 return AvroUtils.toBytes( 068 AvroNamingLookupResponse.newBuilder().setTuples(assignments).build(), AvroNamingLookupResponse.class 069 ); 070 } 071 072 /** 073 * Decodes bytes to an iterable of name assignments 074 * 075 * @param buf the byte array 076 * @return a naming lookup response 077 * @throws NamingRuntimeException 078 */ 079 @Override 080 public NamingLookupResponse decode(final byte[] buf) { 081 final AvroNamingLookupResponse avroResponse = AvroUtils.fromBytes(buf, AvroNamingLookupResponse.class); 082 final List<NameAssignment> nas = new ArrayList<NameAssignment>(avroResponse.getTuples().size()); 083 for (final AvroNamingAssignment tuple : avroResponse.getTuples()) { 084 nas.add( 085 new NameAssignmentTuple( 086 factory.getNewInstance(tuple.getId().toString()), 087 new InetSocketAddress(tuple.getHost().toString(), tuple.getPort()) 088 ) 089 ); 090 } 091 return new NamingLookupResponse(nas); 092 } 093 094}