This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.NamingRegistry;
022import org.apache.reef.io.network.naming.exception.NamingException;
023import org.apache.reef.io.network.naming.serialization.NamingMessage;
024import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest;
025import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse;
026import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest;
027import org.apache.reef.wake.EventHandler;
028import org.apache.reef.wake.Identifier;
029import org.apache.reef.wake.IdentifierFactory;
030import org.apache.reef.wake.Stage;
031import org.apache.reef.wake.impl.SyncStage;
032import org.apache.reef.wake.remote.Codec;
033import org.apache.reef.wake.remote.NetUtils;
034import org.apache.reef.wake.remote.impl.TransportEvent;
035import org.apache.reef.wake.remote.transport.Link;
036import org.apache.reef.wake.remote.transport.LinkListener;
037import org.apache.reef.wake.remote.transport.Transport;
038import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
039import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
040
041import java.io.IOException;
042import java.net.InetSocketAddress;
043import java.net.SocketAddress;
044import java.util.concurrent.BlockingQueue;
045import java.util.concurrent.LinkedBlockingQueue;
046import java.util.concurrent.TimeUnit;
047import java.util.logging.Level;
048import java.util.logging.Logger;
049
050/**
051 * Naming registry client
052 */
053public class NameRegistryClient implements Stage, NamingRegistry {
054
055  private static final Logger LOG = Logger.getLogger(NameRegistryClient.class.getName());
056
057  private final SocketAddress serverSocketAddr;
058  private final Transport transport;
059  private final Codec<NamingMessage> codec;
060  private final BlockingQueue<NamingRegisterResponse> replyQueue;
061  private final long timeout;
062
063  /**
064   * Constructs a naming registry client
065   *
066   * @param serverAddr a name server address
067   * @param serverPort a name server port
068   * @param factory    an identifier factory
069   */
070  public NameRegistryClient(
071      final String serverAddr, final int serverPort, final IdentifierFactory factory) {
072    this(serverAddr, serverPort, 10000, factory);
073  }
074
075  /**
076   * Constructs a naming registry client
077   *
078   * @param serverAddr a name server address
079   * @param serverPort a name server port
080   * @param timeout    timeout in ms
081   * @param factory    an identifier factory
082   */
083  public NameRegistryClient(final String serverAddr, final int serverPort,
084                            final long timeout, final IdentifierFactory factory) {
085
086    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
087    this.timeout = timeout;
088    this.codec = NamingCodecFactory.createRegistryCodec(factory);
089    this.replyQueue = new LinkedBlockingQueue<>();
090    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
091        new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)),
092        null, 3, 10000);
093  }
094
095  public NameRegistryClient(final String serverAddr, final int serverPort,
096                            final long timeout, final IdentifierFactory factory,
097                            final BlockingQueue<NamingRegisterResponse> replyQueue,
098                            final Transport transport) {
099    this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort);
100    this.timeout = timeout;
101    this.codec = NamingCodecFactory.createFullCodec(factory);
102    this.replyQueue = replyQueue;
103    this.transport = transport;
104  }
105
106  /**
107   * Registers an (identifier, address) mapping
108   *
109   * @param id   an identifier
110   * @param addr an Internet socket address
111   */
112  @Override
113  public void register(final Identifier id, final InetSocketAddress addr) throws Exception {
114
115    // needed to keep threads from reading the wrong response
116    // TODO: better fix matches replies to threads with a map after REEF-198
117    synchronized (this) {
118
119      LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr});
120
121      final Link<NamingMessage> link = this.transport.open(
122          this.serverSocketAddr, this.codec, new LoggingLinkListener<NamingMessage>());
123
124      link.write(new NamingRegisterRequest(new NameAssignmentTuple(id, addr)));
125
126      for (; ; ) {
127        try {
128          this.replyQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
129          break;
130        } catch (final InterruptedException e) {
131          LOG.log(Level.INFO, "Interrupted", e);
132          throw new NamingException(e);
133        }
134      }
135    }
136  }
137
138  /**
139   * Unregisters an identifier
140   *
141   * @param id an identifier
142   */
143  @Override
144  public void unregister(Identifier id) throws IOException {
145    Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
146        new LinkListener<NamingMessage>() {
147          @Override
148          public void messageReceived(NamingMessage message) {
149          }
150        });
151    link.write(new NamingUnregisterRequest(id));
152  }
153
154  /**
155   * Closes resources
156   */
157  @Override
158  public void close() throws Exception {
159    // Should not close transport as we did not
160    // create it
161  }
162}
163
164/**
165 * Naming registry client transport event handler
166 */
167class NamingRegistryClientHandler implements EventHandler<TransportEvent> {
168  private static final Logger LOG = Logger.getLogger(NamingRegistryClientHandler.class.getName());
169
170  private final EventHandler<NamingRegisterResponse> handler;
171  private final Codec<NamingMessage> codec;
172
173  NamingRegistryClientHandler(EventHandler<NamingRegisterResponse> handler, Codec<NamingMessage> codec) {
174    this.handler = handler;
175    this.codec = codec;
176  }
177
178  @Override
179  public void onNext(TransportEvent value) {
180    LOG.log(Level.FINE, value.toString());
181    handler.onNext((NamingRegisterResponse) codec.decode(value.getData()));
182  }
183}
184
185/**
186 * Naming register response handler
187 */
188class NamingRegistryResponseHandler implements EventHandler<NamingRegisterResponse> {
189
190  private final BlockingQueue<NamingRegisterResponse> replyQueue;
191
192  NamingRegistryResponseHandler(BlockingQueue<NamingRegisterResponse> replyQueue) {
193    this.replyQueue = replyQueue;
194  }
195
196  @Override
197  public void onNext(NamingRegisterResponse value) {
198    replyQueue.offer(value);
199  }
200}