This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.Naming;
022import org.apache.reef.io.network.Cache;
023import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
024import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
025import org.apache.reef.io.network.naming.serialization.NamingMessage;
026import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
027import org.apache.reef.wake.EventHandler;
028import org.apache.reef.wake.Identifier;
029import org.apache.reef.wake.IdentifierFactory;
030import org.apache.reef.wake.Stage;
031import org.apache.reef.wake.impl.SyncStage;
032import org.apache.reef.wake.remote.Codec;
033import org.apache.reef.wake.remote.NetUtils;
034import org.apache.reef.wake.remote.impl.TransportEvent;
035import org.apache.reef.wake.remote.transport.Transport;
036import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
037
038import java.io.IOException;
039import java.net.InetSocketAddress;
040import java.util.concurrent.BlockingQueue;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Naming client
047 */
048public class NameClient implements Stage, Naming {
049  private static final Logger LOG = Logger.getLogger(NameClient.class.getName());
050
051  private NameLookupClient lookupClient;
052  private NameRegistryClient registryClient;
053  private Transport transport;
054
055  /**
056   * Constructs a naming client
057   *
058   * @param serverAddr a server address
059   * @param serverPort a server port number
060   * @param factory    an identifier factory
061   * @param cache      a cache
062   */
063  public NameClient(String serverAddr, int serverPort,
064                    IdentifierFactory factory, int retryCount, int retryTimeout,
065                    Cache<Identifier, InetSocketAddress> cache) {
066    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
067  }
068
069  /**
070   * Constructs a naming client
071   *
072   * @param serverAddr a server address
073   * @param serverPort a server port number
074   * @param timeout    timeout in ms
075   * @param factory    an identifier factory
076   * @param cache      a cache
077   */
078  public NameClient(final String serverAddr, final int serverPort, final long timeout,
079                    final IdentifierFactory factory, final int retryCount, final int retryTimeout,
080                    final Cache<Identifier, InetSocketAddress> cache) {
081
082    final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>();
083    final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>();
084    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
085
086    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
087        new SyncStage<>(new NamingClientEventHandler(
088            new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)),
089        null, retryCount, retryTimeout);
090
091    this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout,
092        factory, retryCount, retryTimeout, replyLookupQueue, this.transport, cache);
093
094    this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout,
095        factory, replyRegisterQueue, this.transport);
096  }
097
098  /**
099   * Registers an (identifier, address) mapping
100   *
101   * @param id   an identifier
102   * @param addr an Internet socket address
103   */
104  @Override
105  public void register(final Identifier id, final InetSocketAddress addr)
106      throws Exception {
107    LOG.log(Level.FINE, "Refister {0} : {1}", new Object[]{id, addr});
108    this.registryClient.register(id, addr);
109  }
110
111  /**
112   * Unregisters an identifier
113   *
114   * @param id an identifier
115   */
116  @Override
117  public void unregister(final Identifier id) throws IOException {
118    this.registryClient.unregister(id);
119  }
120
121  /**
122   * Finds an address for an identifier
123   *
124   * @param id an identifier
125   * @return an Internet socket address
126   */
127  @Override
128  public InetSocketAddress lookup(final Identifier id) throws Exception {
129    return this.lookupClient.lookup(id);
130  }
131
132  /**
133   * Retrieves an address for an identifier remotely
134   *
135   * @param id an identifier
136   * @return an Internet socket address
137   * @throws Exception
138   */
139  public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
140    return this.lookupClient.remoteLookup(id);
141  }
142
143  /**
144   * Closes resources
145   */
146  @Override
147  public void close() throws Exception {
148
149    if (this.lookupClient != null) {
150      this.lookupClient.close();
151    }
152
153    if (this.registryClient != null) {
154      this.registryClient.close();
155    }
156
157    if (this.transport != null) {
158      this.transport.close();
159    }
160  }
161}
162
163/**
164 * Naming client transport event handler
165 */
166class NamingClientEventHandler implements EventHandler<TransportEvent> {
167
168  private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName());
169
170  private final EventHandler<NamingMessage> handler;
171  private final Codec<NamingMessage> codec;
172
173  public NamingClientEventHandler(
174      final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
175    this.handler = handler;
176    this.codec = codec;
177  }
178
179  @Override
180  public void onNext(final TransportEvent value) {
181    LOG.log(Level.FINE, "Transport: ", value);
182    this.handler.onNext(this.codec.decode(value.getData()));
183  }
184}
185
186/**
187 * Naming response message handler
188 */
189class NamingResponseHandler implements EventHandler<NamingMessage> {
190
191  private final BlockingQueue<NamingLookupResponse> replyLookupQueue;
192  private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue;
193
194  NamingResponseHandler(BlockingQueue<NamingLookupResponse> replyLookupQueue,
195                        BlockingQueue<NamingRegisterResponse> replyRegisterQueue) {
196    this.replyLookupQueue = replyLookupQueue;
197    this.replyRegisterQueue = replyRegisterQueue;
198  }
199
200  @Override
201  public void onNext(NamingMessage value) {
202    if (value instanceof NamingLookupResponse) {
203      replyLookupQueue.offer((NamingLookupResponse) value);
204    } else if (value instanceof NamingRegisterResponse) {
205      replyRegisterQueue.offer((NamingRegisterResponse) value);
206    } else {
207      throw new NamingRuntimeException("Unknown naming response message");
208    }
209
210  }
211
212}