This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.NameAssignment;
022import org.apache.reef.io.naming.NamingLookup;
023import org.apache.reef.io.network.Cache;
024import org.apache.reef.io.network.naming.exception.NamingException;
025import org.apache.reef.io.network.naming.serialization.NamingLookupRequest;
026import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
027import org.apache.reef.io.network.naming.serialization.NamingMessage;
028import org.apache.reef.tang.annotations.Name;
029import org.apache.reef.tang.annotations.NamedParameter;
030import org.apache.reef.wake.EventHandler;
031import org.apache.reef.wake.Identifier;
032import org.apache.reef.wake.IdentifierFactory;
033import org.apache.reef.wake.Stage;
034import org.apache.reef.wake.impl.SyncStage;
035import org.apache.reef.wake.remote.Codec;
036import org.apache.reef.wake.remote.NetUtils;
037import org.apache.reef.wake.remote.impl.TransportEvent;
038import org.apache.reef.wake.remote.transport.Link;
039import org.apache.reef.wake.remote.transport.Transport;
040import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
041import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
042
043import java.net.InetSocketAddress;
044import java.net.SocketAddress;
045import java.util.Arrays;
046import java.util.List;
047import java.util.concurrent.BlockingQueue;
048import java.util.concurrent.Callable;
049import java.util.concurrent.LinkedBlockingQueue;
050import java.util.concurrent.TimeUnit;
051import java.util.logging.Level;
052import java.util.logging.Logger;
053
054/**
055 * Naming lookup client
056 */
057public class NameLookupClient implements Stage, NamingLookup {
058
059  private static final Logger LOG = Logger.getLogger(NameLookupClient.class.getName());
060  private final SocketAddress serverSocketAddr;
061  private final Transport transport;
062  private final Codec<NamingMessage> codec;
063  private final BlockingQueue<NamingLookupResponse> replyQueue;
064  private final long timeout;
065  private final Cache<Identifier, InetSocketAddress> cache;
066  private final int retryCount;
067  private final int retryTimeout;
068
069  /**
070   * Constructs a naming lookup client
071   *
072   * @param serverAddr a server address
073   * @param serverPort a server port number
074   * @param factory    an identifier factory
075   * @param cache      an cache
076   */
077  public NameLookupClient(final String serverAddr, final int serverPort,
078                          final IdentifierFactory factory, final int retryCount, final int retryTimeout,
079                          final Cache<Identifier, InetSocketAddress> cache) {
080    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
081  }
082
083  /**
084   * Constructs a naming lookup client
085   *
086   * @param serverAddr a server address
087   * @param serverPort a server port number
088   * @param timeout    request timeout in ms
089   * @param factory    an identifier factory
090   * @param cache      an cache
091   */
092  public NameLookupClient(final String serverAddr, final int serverPort, final long timeout,
093                          final IdentifierFactory factory, final int retryCount, final int retryTimeout,
094                          final Cache<Identifier, InetSocketAddress> cache) {
095
096    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
097    this.timeout = timeout;
098    this.cache = cache;
099    this.codec = NamingCodecFactory.createLookupCodec(factory);
100    this.replyQueue = new LinkedBlockingQueue<>();
101
102    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
103        new SyncStage<>(new NamingLookupClientHandler(
104            new NamingLookupResponseHandler(this.replyQueue), this.codec)),
105        null, retryCount, retryTimeout);
106
107    this.retryCount = retryCount;
108    this.retryTimeout = retryTimeout;
109  }
110
111  NameLookupClient(final String serverAddr, final int serverPort, final long timeout,
112                   final IdentifierFactory factory, final int retryCount, final int retryTimeout,
113                   final BlockingQueue<NamingLookupResponse> replyQueue, final Transport transport,
114                   final Cache<Identifier, InetSocketAddress> cache) {
115
116    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
117    this.timeout = timeout;
118    this.cache = cache;
119    this.codec = NamingCodecFactory.createFullCodec(factory);
120    this.replyQueue = replyQueue;
121    this.transport = transport;
122    this.retryCount = retryCount;
123    this.retryTimeout = retryTimeout;
124  }
125
126  /**
127   * Finds an address for an identifier
128   *
129   * @param id an identifier
130   * @return an Internet socket address
131   */
132  @Override
133  public InetSocketAddress lookup(final Identifier id) throws Exception {
134
135    return cache.get(id, new Callable<InetSocketAddress>() {
136
137      @Override
138      public InetSocketAddress call() throws Exception {
139        final int origRetryCount = NameLookupClient.this.retryCount;
140        int retryCount = origRetryCount;
141        while (true) {
142          try {
143            return remoteLookup(id);
144          } catch (final NamingException e) {
145            if (retryCount <= 0) {
146              throw e;
147            } else {
148              final int retryTimeout = NameLookupClient.this.retryTimeout
149                  * (origRetryCount - retryCount + 1);
150              LOG.log(Level.WARNING,
151                  "Caught Naming Exception while looking up " + id
152                      + " with Name Server. Will retry " + retryCount
153                      + " time(s) after waiting for " + retryTimeout + " msec.");
154              Thread.sleep(retryTimeout * retryCount);
155              --retryCount;
156            }
157          }
158        }
159      }
160
161    });
162  }
163
164  /**
165   * Retrieves an address for an identifier remotely
166   *
167   * @param id an identifier
168   * @return an Internet socket address
169   * @throws Exception
170   */
171  public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
172    // the lookup is not thread-safe, because concurrent replies may
173    // be read by the wrong thread.
174    // TODO: better fix uses a map of id's after REEF-198
175    synchronized (this) {
176
177      LOG.log(Level.INFO, "Looking up {0} on NameServer {1}", new Object[]{id, serverSocketAddr});
178
179      final List<Identifier> ids = Arrays.asList(id);
180      final Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
181          new LoggingLinkListener<NamingMessage>());
182      link.write(new NamingLookupRequest(ids));
183
184      final NamingLookupResponse resp;
185      for (; ; ) {
186        try {
187          resp = replyQueue.poll(timeout, TimeUnit.MILLISECONDS);
188          break;
189        } catch (final InterruptedException e) {
190          LOG.log(Level.INFO, "Lookup interrupted", e);
191          throw new NamingException(e);
192        }
193      }
194
195      final List<NameAssignment> list = resp.getNameAssignments();
196      if (list.isEmpty()) {
197        throw new NamingException("Cannot find " + id + " from the name server");
198      } else {
199        return list.get(0).getAddress();
200      }
201    }
202  }
203
204  /**
205   * Closes resources
206   */
207  @Override
208  public void close() throws Exception {
209    // Should not close transport as we did not
210    // create it
211  }
212
213  @NamedParameter(doc = "When should a retry timeout(msec)?", short_name = "retryTimeout", default_value = "100")
214  public static class RetryTimeout implements Name<Integer> {
215  }
216
217  @NamedParameter(doc = "How many times should I retry?", short_name = "retryCount", default_value = "10")
218  public static class RetryCount implements Name<Integer> {
219  }
220}
221
222/**
223 * Naming lookup client transport event handler
224 */
225class NamingLookupClientHandler implements EventHandler<TransportEvent> {
226
227  private final EventHandler<NamingLookupResponse> handler;
228  private final Codec<NamingMessage> codec;
229
230  NamingLookupClientHandler(final EventHandler<NamingLookupResponse> handler, final Codec<NamingMessage> codec) {
231    this.handler = handler;
232    this.codec = codec;
233  }
234
235  @Override
236  public void onNext(final TransportEvent value) {
237    handler.onNext((NamingLookupResponse) codec.decode(value.getData()));
238  }
239
240}
241
242/**
243 * Naming lookup response handler
244 */
245class NamingLookupResponseHandler implements EventHandler<NamingLookupResponse> {
246
247  private final BlockingQueue<NamingLookupResponse> replyQueue;
248
249  NamingLookupResponseHandler(final BlockingQueue<NamingLookupResponse> replyQueue) {
250    this.replyQueue = replyQueue;
251  }
252
253  @Override
254  public void onNext(final NamingLookupResponse value) {
255    replyQueue.offer(value);
256  }
257}