This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming.serialization;
020
021import org.apache.reef.io.naming.NameAssignment;
022import org.apache.reef.io.network.naming.NameAssignmentTuple;
023import org.apache.reef.io.network.naming.avro.AvroNamingAssignment;
024import org.apache.reef.io.network.naming.avro.AvroNamingLookupResponse;
025import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
026import org.apache.reef.wake.IdentifierFactory;
027import org.apache.reef.wake.remote.Codec;
028
029import javax.inject.Inject;
030import java.net.InetSocketAddress;
031import java.util.ArrayList;
032import java.util.List;
033
034/**
035 * Naming lookup response codec
036 */
037public final class NamingLookupResponseCodec implements Codec<NamingLookupResponse> {
038
039  private final IdentifierFactory factory;
040
041  /**
042   * Constructs a naming lookup response codec
043   *
044   * @param factory the identifier factory
045   */
046  @Inject
047  public NamingLookupResponseCodec(final IdentifierFactory factory) {
048    this.factory = factory;
049  }
050
051  /**
052   * Encodes name assignments to bytes
053   *
054   * @param obj the naming lookup response
055   * @return a byte array
056   */
057  @Override
058  public byte[] encode(NamingLookupResponse obj) {
059    final List<AvroNamingAssignment> assignments = new ArrayList<>(obj.getNameAssignments().size());
060    for (final NameAssignment nameAssignment : obj.getNameAssignments()) {
061      assignments.add(AvroNamingAssignment.newBuilder()
062          .setId(nameAssignment.getIdentifier().toString())
063          .setHost(nameAssignment.getAddress().getHostName())
064          .setPort(nameAssignment.getAddress().getPort())
065          .build());
066    }
067    return AvroUtils.toBytes(
068        AvroNamingLookupResponse.newBuilder().setTuples(assignments).build(), AvroNamingLookupResponse.class
069    );
070  }
071
072  /**
073   * Decodes bytes to an iterable of name assignments
074   *
075   * @param buf the byte array
076   * @return a naming lookup response
077   * @throws NamingRuntimeException
078   */
079  @Override
080  public NamingLookupResponse decode(final byte[] buf) {
081    final AvroNamingLookupResponse avroResponse = AvroUtils.fromBytes(buf, AvroNamingLookupResponse.class);
082    final List<NameAssignment> nas = new ArrayList<NameAssignment>(avroResponse.getTuples().size());
083    for (final AvroNamingAssignment tuple : avroResponse.getTuples()) {
084      nas.add(
085          new NameAssignmentTuple(
086              factory.getNewInstance(tuple.getId().toString()),
087              new InetSocketAddress(tuple.getHost().toString(), tuple.getPort())
088          )
089      );
090    }
091    return new NamingLookupResponse(nas);
092  }
093
094}