This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.NameAssignment;
022import org.apache.reef.io.naming.NamingLookup;
023import org.apache.reef.io.network.naming.exception.NamingException;
024import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout;
025import org.apache.reef.io.network.naming.parameters.NameResolverIdentifierFactory;
026import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr;
027import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort;
028import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
029import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
030import org.apache.reef.io.network.naming.serialization.NamingLookupRequest;
031import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
032import org.apache.reef.io.network.naming.serialization.NamingMessage;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.util.cache.Cache;
035import org.apache.reef.wake.EventHandler;
036import org.apache.reef.wake.Identifier;
037import org.apache.reef.wake.IdentifierFactory;
038import org.apache.reef.wake.Stage;
039import org.apache.reef.wake.impl.SyncStage;
040import org.apache.reef.wake.remote.Codec;
041import org.apache.reef.wake.remote.address.LocalAddressProvider;
042import org.apache.reef.wake.remote.impl.TransportEvent;
043import org.apache.reef.wake.remote.transport.Link;
044import org.apache.reef.wake.remote.transport.Transport;
045import org.apache.reef.wake.remote.transport.TransportFactory;
046import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
047
048import javax.inject.Inject;
049import java.net.InetSocketAddress;
050import java.net.SocketAddress;
051import java.util.Arrays;
052import java.util.List;
053import java.util.concurrent.BlockingQueue;
054import java.util.concurrent.Callable;
055import java.util.concurrent.LinkedBlockingQueue;
056import java.util.concurrent.TimeUnit;
057import java.util.logging.Level;
058import java.util.logging.Logger;
059
060/**
061 * Naming lookup client.
062 */
063public final class NameLookupClient implements Stage, NamingLookup {
064
065  private static final Logger LOG = Logger.getLogger(NameLookupClient.class.getName());
066  private final SocketAddress serverSocketAddr;
067  private final Transport transport;
068  private final Codec<NamingMessage> codec;
069  private final BlockingQueue<NamingLookupResponse> replyQueue;
070  private final long timeout;
071  private final Cache<Identifier, InetSocketAddress> cache;
072  private final int retryCount;
073  private final int retryTimeout;
074
075
076  /**
077   * Constructs a naming lookup client.
078   *
079   * @param serverAddr a server address
080   * @param serverPort a server port number
081   * @param timeout    request timeout in ms
082   * @param factory    an identifier factory
083   * @param retryCount a count of retrying lookup
084   * @param retryTimeout retry timeout
085   * @param replyQueue a reply queue
086   * @param transport  a transport
087   */
088  NameLookupClient(final String serverAddr,
089                          final int serverPort,
090                          final long timeout,
091                          final IdentifierFactory factory,
092                          final int retryCount,
093                          final int retryTimeout,
094                          final BlockingQueue<NamingLookupResponse> replyQueue,
095                          final Transport transport) {
096    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
097    this.timeout = timeout;
098    this.cache = new NameCache(timeout);
099    this.codec = NamingCodecFactory.createFullCodec(factory);
100    this.replyQueue = replyQueue;
101    this.retryCount = retryCount;
102    this.retryTimeout = retryTimeout;
103    this.transport = transport;
104  }
105
106  /**
107    * Constructs a naming lookup client.
108    *
109    * @param serverAddr a server address
110    * @param serverPort a server port number
111    * @param timeout    request timeout in ms
112    * @param factory    an identifier factory
113    * @param tpFactory  a transport factory
114    */
115  @Inject
116  private NameLookupClient(
117            @Parameter(NameResolverNameServerAddr.class) final String serverAddr,
118            @Parameter(NameResolverNameServerPort.class) final int serverPort,
119            @Parameter(NameResolverCacheTimeout.class) final long timeout,
120            @Parameter(NameResolverIdentifierFactory.class) final IdentifierFactory factory,
121            @Parameter(NameResolverRetryCount.class) final int retryCount,
122            @Parameter(NameResolverRetryTimeout.class) final int retryTimeout,
123            final LocalAddressProvider localAddressProvider,
124            final TransportFactory tpFactory) {
125    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
126    this.timeout = timeout;
127    this.cache = new NameCache(timeout);
128    this.codec = NamingCodecFactory.createLookupCodec(factory);
129    this.replyQueue = new LinkedBlockingQueue<>();
130
131    this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0,
132            new SyncStage<>(new NamingLookupClientHandler(
133                    new NamingLookupResponseHandler(this.replyQueue), this.codec)),
134            null, retryCount, retryTimeout);
135
136    this.retryCount = retryCount;
137    this.retryTimeout = retryTimeout;
138  }
139
140  /**
141   * Finds an address for an identifier.
142   *
143   * @param id an identifier
144   * @return an Internet socket address
145   */
146  @Override
147  public InetSocketAddress lookup(final Identifier id) throws Exception {
148
149    return cache.get(id, new Callable<InetSocketAddress>() {
150
151      @Override
152      public InetSocketAddress call() throws Exception {
153        final int origRetryCount = NameLookupClient.this.retryCount;
154        int retriesLeft = origRetryCount;
155        while (true) {
156          try {
157            return remoteLookup(id);
158          } catch (final NamingException e) {
159            if (retriesLeft <= 0) {
160              throw e;
161            } else {
162              final int currentRetryTimeout = NameLookupClient.this.retryTimeout
163                  * (origRetryCount - retriesLeft + 1);
164              LOG.log(Level.WARNING,
165                  "Caught Naming Exception while looking up " + id
166                      + " with Name Server. Will retry " + retriesLeft
167                      + " time(s) after waiting for " + currentRetryTimeout + " msec.");
168              Thread.sleep(currentRetryTimeout);
169              --retriesLeft;
170            }
171          }
172        }
173      }
174
175    });
176  }
177
178  /**
179   * Retrieves an address for an identifier remotely.
180   *
181   * @param id an identifier
182   * @return an Internet socket address
183   * @throws Exception
184   */
185  public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
186    // the lookup is not thread-safe, because concurrent replies may
187    // be read by the wrong thread.
188    // TODO: better fix uses a map of id's after REEF-198
189    synchronized (this) {
190
191      LOG.log(Level.INFO, "Looking up {0} on NameServer {1}", new Object[]{id, serverSocketAddr});
192
193      final List<Identifier> ids = Arrays.asList(id);
194      final Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
195          new LoggingLinkListener<NamingMessage>());
196      link.write(new NamingLookupRequest(ids));
197
198      final NamingLookupResponse resp;
199      for (;;) {
200        try {
201          resp = replyQueue.poll(timeout, TimeUnit.MILLISECONDS);
202          break;
203        } catch (final InterruptedException e) {
204          LOG.log(Level.INFO, "Lookup interrupted", e);
205          throw new NamingException(e);
206        }
207      }
208
209      final List<NameAssignment> list = resp.getNameAssignments();
210      if (list.isEmpty()) {
211        throw new NamingException("Cannot find " + id + " from the name server");
212      } else {
213        return list.get(0).getAddress();
214      }
215    }
216  }
217
218  /**
219   * Closes resources.
220   */
221  @Override
222  public void close() throws Exception {
223    // Should not close transport as we did not
224    // create it
225  }
226}
227
228/**
229 * Naming lookup client transport event handler.
230 */
231class NamingLookupClientHandler implements EventHandler<TransportEvent> {
232
233  private final EventHandler<NamingLookupResponse> handler;
234  private final Codec<NamingMessage> codec;
235
236  NamingLookupClientHandler(final EventHandler<NamingLookupResponse> handler, final Codec<NamingMessage> codec) {
237    this.handler = handler;
238    this.codec = codec;
239  }
240
241  @Override
242  public void onNext(final TransportEvent value) {
243    handler.onNext((NamingLookupResponse) codec.decode(value.getData()));
244  }
245
246}
247
248/**
249 * Naming lookup response handler.
250 */
251class NamingLookupResponseHandler implements EventHandler<NamingLookupResponse> {
252  private static final Logger LOG = Logger.getLogger(NamingLookupResponseHandler.class.getName());
253
254  private final BlockingQueue<NamingLookupResponse> replyQueue;
255
256  NamingLookupResponseHandler(final BlockingQueue<NamingLookupResponse> replyQueue) {
257    this.replyQueue = replyQueue;
258  }
259
260  @Override
261  public void onNext(final NamingLookupResponse value) {
262    if (!replyQueue.offer(value)) {
263      LOG.log(Level.FINEST, "Element {0} was not added to the queue", value);
264    }
265  }
266}