This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.NamingRegistry;
022import org.apache.reef.io.network.naming.exception.NamingException;
023import org.apache.reef.io.network.naming.serialization.NamingMessage;
024import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest;
025import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
026import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest;
027import org.apache.reef.tang.Injector;
028import org.apache.reef.tang.Tang;
029import org.apache.reef.tang.exceptions.InjectionException;
030import org.apache.reef.wake.EventHandler;
031import org.apache.reef.wake.Identifier;
032import org.apache.reef.wake.IdentifierFactory;
033import org.apache.reef.wake.Stage;
034import org.apache.reef.wake.impl.SyncStage;
035import org.apache.reef.wake.remote.Codec;
036import org.apache.reef.wake.remote.RemoteConfiguration;
037import org.apache.reef.wake.remote.address.LocalAddressProvider;
038import org.apache.reef.wake.remote.impl.TransportEvent;
039import org.apache.reef.wake.remote.transport.Link;
040import org.apache.reef.wake.remote.transport.Transport;
041import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
042import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
043
044import java.io.IOException;
045import java.net.InetSocketAddress;
046import java.net.SocketAddress;
047import java.util.concurrent.BlockingQueue;
048import java.util.concurrent.LinkedBlockingQueue;
049import java.util.concurrent.TimeUnit;
050import java.util.logging.Level;
051import java.util.logging.Logger;
052
053/**
054 * Naming registry client.
055 */
056public class NameRegistryClient implements Stage, NamingRegistry {
057
058  private static final Logger LOG = Logger.getLogger(NameRegistryClient.class.getName());
059
060  private final SocketAddress serverSocketAddr;
061  private final Transport transport;
062  private final Codec<NamingMessage> codec;
063  private final BlockingQueue<NamingRegisterResponse> replyQueue;
064  private final long timeout;
065
066  /**
067   * Constructs a naming registry client.
068   *
069   * @param serverAddr a name server address
070   * @param serverPort a name server port
071   * @param factory    an identifier factory
072   */
073  NameRegistryClient(
074      final String serverAddr, final int serverPort, final IdentifierFactory factory,
075      final LocalAddressProvider localAddressProvider) {
076    this(serverAddr, serverPort, 10000, factory, localAddressProvider);
077  }
078
079  /**
080   * Constructs a naming registry client.
081   *
082   * @param serverAddr a name server address
083   * @param serverPort a name server port
084   * @param timeout    timeout in ms
085   * @param factory    an identifier factory
086   */
087  NameRegistryClient(final String serverAddr,
088                            final int serverPort,
089                            final long timeout,
090                            final IdentifierFactory factory,
091                            final LocalAddressProvider localAddressProvider) {
092
093    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
094    this.timeout = timeout;
095    this.codec = NamingCodecFactory.createRegistryCodec(factory);
096    this.replyQueue = new LinkedBlockingQueue<>();
097
098    final Injector injector = Tang.Factory.getTang().newInjector();
099    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress());
100    injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class,
101        new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)));
102
103    try {
104      this.transport = injector.getInstance(NettyMessagingTransport.class);
105    } catch (final InjectionException e) {
106      throw new RuntimeException(e);
107    }
108  }
109
110  NameRegistryClient(final String serverAddr, final int serverPort,
111                            final long timeout, final IdentifierFactory factory,
112                            final BlockingQueue<NamingRegisterResponse> replyQueue,
113                            final Transport transport) {
114    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
115    this.timeout = timeout;
116    this.codec = NamingCodecFactory.createFullCodec(factory);
117    this.replyQueue = replyQueue;
118    this.transport = transport;
119  }
120
121  /**
122   * Registers an (identifier, address) mapping.
123   *
124   * @param id   an identifier
125   * @param addr an Internet socket address
126   */
127  @Override
128  public void register(final Identifier id, final InetSocketAddress addr) throws Exception {
129
130    // needed to keep threads from reading the wrong response
131    // TODO: better fix matches replies to threads with a map after REEF-198
132    synchronized (this) {
133
134      LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr});
135
136      final Link<NamingMessage> link = this.transport.open(
137          this.serverSocketAddr, this.codec, new LoggingLinkListener<NamingMessage>());
138
139      link.write(new NamingRegisterRequest(new NameAssignmentTuple(id, addr)));
140
141      for (;;) {
142        try {
143          this.replyQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
144          break;
145        } catch (final InterruptedException e) {
146          LOG.log(Level.INFO, "Interrupted", e);
147          throw new NamingException(e);
148        }
149      }
150    }
151  }
152
153  /**
154   * Unregisters an identifier.
155   *
156   * @param id an identifier
157   */
158  @Override
159  public void unregister(final Identifier id) throws IOException {
160    final Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
161        new LoggingLinkListener<NamingMessage>());
162    link.write(new NamingUnregisterRequest(id));
163  }
164
165  /**
166   * Closes resources.
167   */
168  @Override
169  public void close() throws Exception {
170    // Should not close transport as we did not
171    // create it
172  }
173}
174
175/**
176 * Naming registry client transport event handler.
177 */
178class NamingRegistryClientHandler implements EventHandler<TransportEvent> {
179  private static final Logger LOG = Logger.getLogger(NamingRegistryClientHandler.class.getName());
180
181  private final EventHandler<NamingRegisterResponse> handler;
182  private final Codec<NamingMessage> codec;
183
184  NamingRegistryClientHandler(final EventHandler<NamingRegisterResponse> handler, final Codec<NamingMessage> codec) {
185    this.handler = handler;
186    this.codec = codec;
187  }
188
189  @Override
190  public void onNext(final TransportEvent value) {
191    LOG.log(Level.FINE, value.toString());
192    handler.onNext((NamingRegisterResponse) codec.decode(value.getData()));
193  }
194}
195
196/**
197 * Naming register response handler.
198 */
199class NamingRegistryResponseHandler implements EventHandler<NamingRegisterResponse> {
200  private static final Logger LOG = Logger.getLogger(NamingRegistryResponseHandler.class.getName());
201
202  private final BlockingQueue<NamingRegisterResponse> replyQueue;
203
204  NamingRegistryResponseHandler(final BlockingQueue<NamingRegisterResponse> replyQueue) {
205    this.replyQueue = replyQueue;
206  }
207
208  @Override
209  public void onNext(final NamingRegisterResponse value) {
210    if (!replyQueue.offer(value)) {
211      LOG.log(Level.FINEST, "Element {0} was not added to the queue", value);
212    }
213  }
214}