This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.NameAssignment;
022import org.apache.reef.io.network.naming.serialization.*;
023import org.apache.reef.tang.Injector;
024import org.apache.reef.tang.Tang;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.tang.exceptions.InjectionException;
027import org.apache.reef.wake.EventHandler;
028import org.apache.reef.wake.Identifier;
029import org.apache.reef.wake.IdentifierFactory;
030import org.apache.reef.wake.impl.MultiEventHandler;
031import org.apache.reef.wake.impl.SyncStage;
032import org.apache.reef.wake.remote.Codec;
033import org.apache.reef.wake.remote.RemoteConfiguration;
034import org.apache.reef.wake.remote.address.LocalAddressProvider;
035import org.apache.reef.wake.remote.impl.TransportEvent;
036import org.apache.reef.wake.remote.transport.Transport;
037import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
038import org.apache.reef.webserver.ReefEventStateManager;
039
040import javax.inject.Inject;
041import java.net.InetSocketAddress;
042import java.util.*;
043import java.util.logging.Level;
044import java.util.logging.Logger;
045
046/**
047 * Naming server implementation.
048 */
049public final class NameServerImpl implements NameServer {
050
051  private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
052
053  private final Transport transport;
054  private final Map<Identifier, InetSocketAddress> idToAddrMap;
055  private final ReefEventStateManager reefEventStateManager;
056  private final int port;
057  private final LocalAddressProvider localAddressProvider;
058
059  /**
060   * @param port    a listening port number
061   * @param factory an identifier factory
062   * @param localAddressProvider a local address provider
063   * Constructs a name server
064   */
065  @Inject
066  private NameServerImpl(
067      @Parameter(NameServerParameters.NameServerPort.class) final int port,
068      @Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory,
069      final LocalAddressProvider localAddressProvider) {
070
071    final Injector injector = Tang.Factory.getTang().newInjector();
072
073    this.localAddressProvider = localAddressProvider;
074    this.reefEventStateManager = null;
075    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
076    final EventHandler<NamingMessage> handler = createEventHandler(codec);
077
078    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress());
079    injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
080    injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class,
081        new SyncStage<>(new NamingServerHandler(handler, codec)));
082
083    try {
084      this.transport = injector.getInstance(NettyMessagingTransport.class);
085    } catch (final InjectionException e) {
086      throw new RuntimeException(e);
087    }
088
089    this.port = transport.getListeningPort();
090    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
091
092    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
093  }
094
095  private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
096
097    final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
098        clazzToHandlerMap = new HashMap<>();
099
100    clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
101    clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
102    clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
103    final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);
104
105    return handler;
106  }
107  /**
108   * Gets port.
109   */
110  @Override
111  public int getPort() {
112    return port;
113  }
114
115  /**
116   * Closes resources.
117   */
118  @Override
119  public void close() throws Exception {
120    transport.close();
121  }
122
123  /**
124   * Registers an (identifier, address) mapping locally.
125   *
126   * @param id   an identifier
127   * @param addr an Internet socket address
128   */
129  @Override
130  public void register(final Identifier id, final InetSocketAddress addr) {
131    LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
132    idToAddrMap.put(id, addr);
133  }
134
135  /**
136   * Unregisters an identifier locally.
137   *
138   * @param id an identifier
139   */
140  @Override
141  public void unregister(final Identifier id) {
142    LOG.log(Level.FINE, "id: " + id);
143    idToAddrMap.remove(id);
144  }
145
146  /**
147   * Finds an address for an identifier locally.
148   *
149   * @param id an identifier
150   * @return an Internet socket address
151   */
152  @Override
153  public InetSocketAddress lookup(final Identifier id) {
154    LOG.log(Level.FINE, "id: {0}", id);
155    return idToAddrMap.get(id);
156  }
157
158  /**
159   * Finds addresses for identifiers locally.
160   *
161   * @param identifiers an iterable of identifiers
162   * @return a list of name assignments
163   */
164  @Override
165  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
166    LOG.log(Level.FINE, "identifiers");
167    final List<NameAssignment> nas = new ArrayList<>();
168    for (final Identifier id : identifiers) {
169      final InetSocketAddress addr = idToAddrMap.get(id);
170      LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
171      if (addr != null) {
172        nas.add(new NameAssignmentTuple(id, addr));
173      }
174    }
175    return nas;
176  }
177
178  private String getNameServerId() {
179    return this.localAddressProvider.getLocalAddress() + ":" + getPort();
180  }
181}
182
183/**
184 * Naming server transport event handler that invokes a specific naming message handler.
185 */
186class NamingServerHandler implements EventHandler<TransportEvent> {
187
188  private final Codec<NamingMessage> codec;
189  private final EventHandler<NamingMessage> handler;
190
191  NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
192    this.codec = codec;
193    this.handler = handler;
194  }
195
196  @Override
197  public void onNext(final TransportEvent value) {
198    final byte[] data = value.getData();
199    final NamingMessage message = codec.decode(data);
200    message.setLink(value.getLink());
201    handler.onNext(message);
202  }
203}
204
205/**
206 * Naming lookup request handler.
207 */
208class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {
209
210  private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());
211
212
213  private final NameServer server;
214  private final Codec<NamingMessage> codec;
215
216  NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
217    this.server = server;
218    this.codec = codec;
219  }
220
221  @Override
222  public void onNext(final NamingLookupRequest value) {
223    final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
224    final byte[] resp = codec.encode(new NamingLookupResponse(nas));
225    value.getLink().write(resp);
226  }
227}
228
229/**
230 * Naming register request handler.
231 */
232class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {
233
234  private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());
235
236
237  private final NameServer server;
238  private final Codec<NamingMessage> codec;
239
240  NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
241    this.server = server;
242    this.codec = codec;
243  }
244
245  @Override
246  public void onNext(final NamingRegisterRequest value) {
247    server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
248    final byte[] resp = codec.encode(new NamingRegisterResponse(value));
249    value.getLink().write(resp);
250  }
251}
252
253/**
254 * Naming unregister request handler.
255 */
256class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {
257
258  private final NameServer server;
259
260  NamingUnregisterRequestHandler(final NameServer server) {
261    this.server = server;
262  }
263
264  @Override
265  public void onNext(final NamingUnregisterRequest value) {
266    server.unregister(value.getIdentifier());
267  }
268}