This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.NameAssignment;
022import org.apache.reef.io.network.naming.serialization.*;
023import org.apache.reef.tang.Injector;
024import org.apache.reef.tang.Tang;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.tang.exceptions.InjectionException;
027import org.apache.reef.wake.EventHandler;
028import org.apache.reef.wake.Identifier;
029import org.apache.reef.wake.IdentifierFactory;
030import org.apache.reef.wake.impl.MultiEventHandler;
031import org.apache.reef.wake.impl.SyncStage;
032import org.apache.reef.wake.remote.Codec;
033import org.apache.reef.wake.remote.RemoteConfiguration;
034import org.apache.reef.wake.remote.address.LocalAddressProvider;
035import org.apache.reef.wake.remote.impl.TransportEvent;
036import org.apache.reef.wake.remote.transport.Transport;
037import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
038import org.apache.reef.webserver.ReefEventStateManager;
039
040import javax.inject.Inject;
041import java.net.InetSocketAddress;
042import java.util.*;
043import java.util.logging.Level;
044import java.util.logging.Logger;
045
046/**
047 * Naming server implementation.
048 */
049public final class NameServerImpl implements NameServer {
050
051  private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
052
053  private final Transport transport;
054  private final Map<Identifier, InetSocketAddress> idToAddrMap;
055  private final ReefEventStateManager reefEventStateManager;
056  private final int port;
057  private final LocalAddressProvider localAddressProvider;
058
059  /**
060   * @param port    a listening port number
061   * @param factory an identifier factory
062   * @param localAddressProvider a local address provider
063   * Constructs a name server
064   * @deprecated in 0.12. Use Tang to obtain an instance of this or, better, NameServer, instead.
065   */
066  @Deprecated
067  @Inject
068  public NameServerImpl(
069      @Parameter(NameServerParameters.NameServerPort.class) final int port,
070      @Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory,
071      final LocalAddressProvider localAddressProvider) {
072
073    final Injector injector = Tang.Factory.getTang().newInjector();
074
075    this.localAddressProvider = localAddressProvider;
076    this.reefEventStateManager = null;
077    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
078    final EventHandler<NamingMessage> handler = createEventHandler(codec);
079
080    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress());
081    injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
082    injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class,
083        new SyncStage<>(new NamingServerHandler(handler, codec)));
084
085    try {
086      this.transport = injector.getInstance(NettyMessagingTransport.class);
087    } catch (final InjectionException e) {
088      throw new RuntimeException(e);
089    }
090
091    this.port = transport.getListeningPort();
092    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
093
094    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
095  }
096
097  private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
098
099    final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
100        clazzToHandlerMap = new HashMap<>();
101
102    clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
103    clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
104    clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
105    final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);
106
107    return handler;
108  }
109  /**
110   * Gets port.
111   */
112  @Override
113  public int getPort() {
114    return port;
115  }
116
117  /**
118   * Closes resources.
119   */
120  @Override
121  public void close() throws Exception {
122    transport.close();
123  }
124
125  /**
126   * Registers an (identifier, address) mapping locally.
127   *
128   * @param id   an identifier
129   * @param addr an Internet socket address
130   */
131  @Override
132  public void register(final Identifier id, final InetSocketAddress addr) {
133    LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
134    idToAddrMap.put(id, addr);
135  }
136
137  /**
138   * Unregisters an identifier locally.
139   *
140   * @param id an identifier
141   */
142  @Override
143  public void unregister(final Identifier id) {
144    LOG.log(Level.FINE, "id: " + id);
145    idToAddrMap.remove(id);
146  }
147
148  /**
149   * Finds an address for an identifier locally.
150   *
151   * @param id an identifier
152   * @return an Internet socket address
153   */
154  @Override
155  public InetSocketAddress lookup(final Identifier id) {
156    LOG.log(Level.FINE, "id: {0}", id);
157    return idToAddrMap.get(id);
158  }
159
160  /**
161   * Finds addresses for identifiers locally.
162   *
163   * @param identifiers an iterable of identifiers
164   * @return a list of name assignments
165   */
166  @Override
167  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
168    LOG.log(Level.FINE, "identifiers");
169    final List<NameAssignment> nas = new ArrayList<>();
170    for (final Identifier id : identifiers) {
171      final InetSocketAddress addr = idToAddrMap.get(id);
172      LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
173      if (addr != null) {
174        nas.add(new NameAssignmentTuple(id, addr));
175      }
176    }
177    return nas;
178  }
179
180  private String getNameServerId() {
181    return this.localAddressProvider.getLocalAddress() + ":" + getPort();
182  }
183}
184
185/**
186 * Naming server transport event handler that invokes a specific naming message handler.
187 */
188class NamingServerHandler implements EventHandler<TransportEvent> {
189
190  private final Codec<NamingMessage> codec;
191  private final EventHandler<NamingMessage> handler;
192
193  NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
194    this.codec = codec;
195    this.handler = handler;
196  }
197
198  @Override
199  public void onNext(final TransportEvent value) {
200    final byte[] data = value.getData();
201    final NamingMessage message = codec.decode(data);
202    message.setLink(value.getLink());
203    handler.onNext(message);
204  }
205}
206
207/**
208 * Naming lookup request handler.
209 */
210class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {
211
212  private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());
213
214
215  private final NameServer server;
216  private final Codec<NamingMessage> codec;
217
218  NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
219    this.server = server;
220    this.codec = codec;
221  }
222
223  @Override
224  public void onNext(final NamingLookupRequest value) {
225    final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
226    final byte[] resp = codec.encode(new NamingLookupResponse(nas));
227    value.getLink().write(resp);
228  }
229}
230
231/**
232 * Naming register request handler.
233 */
234class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {
235
236  private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());
237
238
239  private final NameServer server;
240  private final Codec<NamingMessage> codec;
241
242  NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
243    this.server = server;
244    this.codec = codec;
245  }
246
247  @Override
248  public void onNext(final NamingRegisterRequest value) {
249    server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
250    final byte[] resp = codec.encode(new NamingRegisterResponse(value));
251    value.getLink().write(resp);
252  }
253}
254
255/**
256 * Naming unregister request handler.
257 */
258class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {
259
260  private final NameServer server;
261
262  NamingUnregisterRequestHandler(final NameServer server) {
263    this.server = server;
264  }
265
266  @Override
267  public void onNext(final NamingUnregisterRequest value) {
268    server.unregister(value.getIdentifier());
269  }
270}