This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.naming;
020
021import org.apache.reef.io.naming.NameAssignment;
022import org.apache.reef.io.network.naming.serialization.*;
023import org.apache.reef.tang.annotations.Parameter;
024import org.apache.reef.wake.EventHandler;
025import org.apache.reef.wake.Identifier;
026import org.apache.reef.wake.IdentifierFactory;
027import org.apache.reef.wake.Stage;
028import org.apache.reef.wake.impl.MultiEventHandler;
029import org.apache.reef.wake.impl.SyncStage;
030import org.apache.reef.wake.remote.Codec;
031import org.apache.reef.wake.remote.NetUtils;
032import org.apache.reef.wake.remote.impl.TransportEvent;
033import org.apache.reef.wake.remote.transport.Transport;
034import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
035import org.apache.reef.webserver.AvroReefServiceInfo;
036import org.apache.reef.webserver.ReefEventStateManager;
037
038import javax.inject.Inject;
039import java.io.IOException;
040import java.net.InetSocketAddress;
041import java.util.*;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Naming server implementation
047 */
048public class NameServerImpl implements NameServer {
049
050  private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
051
052  private final Transport transport;
053  private final Map<Identifier, InetSocketAddress> idToAddrMap;
054  private final ReefEventStateManager reefEventStateManager;
055  private final int port;
056
057  /**
058   * @param port    a listening port number
059   * @param factory an identifier factory
060   * @deprecated inject the NameServer instead of new it up
061   * Constructs a name server
062   */
063  // TODO: All existing NameServer usage is currently new-up, need to make them injected as well.
064  @Deprecated
065  public NameServerImpl(
066      final int port,
067      final IdentifierFactory factory) {
068
069    this.reefEventStateManager = null;
070    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
071    final EventHandler<NamingMessage> handler = createEventHandler(codec);
072
073    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
074        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
075
076    this.port = transport.getListeningPort();
077    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
078
079    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
080  }
081
082
083  /**
084   * Constructs a name server
085   *
086   * @param port                  a listening port number
087   * @param factory               an identifier factory
088   * @param reefEventStateManager the event state manager used to register name server info
089   */
090  @Inject
091  public NameServerImpl(
092      final @Parameter(NameServerParameters.NameServerPort.class) int port,
093      final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory,
094      final ReefEventStateManager reefEventStateManager) {
095
096    this.reefEventStateManager = reefEventStateManager;
097    final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
098    final EventHandler<NamingMessage> handler = createEventHandler(codec);
099
100    this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
101        new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);
102
103    this.port = transport.getListeningPort();
104    this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());
105
106    this.reefEventStateManager.registerServiceInfo(
107        AvroReefServiceInfo.newBuilder()
108            .setServiceName("NameServer")
109            .setServiceInfo(getNameServerId())
110            .build());
111    LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
112  }
113
114  private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {
115
116    final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
117        clazzToHandlerMap = new HashMap<>();
118
119    clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
120    clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
121    clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
122    final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);
123
124    return handler;
125  }
126
127  /**
128   * Gets port
129   */
130  @Override
131  public int getPort() {
132    return port;
133  }
134
135  /**
136   * Closes resources
137   */
138  @Override
139  public void close() throws Exception {
140    transport.close();
141  }
142
143  /**
144   * Registers an (identifier, address) mapping locally
145   *
146   * @param id   an identifier
147   * @param addr an Internet socket address
148   */
149  @Override
150  public void register(final Identifier id, final InetSocketAddress addr) {
151    LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
152    idToAddrMap.put(id, addr);
153  }
154
155  /**
156   * Unregisters an identifier locally
157   *
158   * @param id an identifier
159   */
160  @Override
161  public void unregister(final Identifier id) {
162    LOG.log(Level.FINE, "id: " + id);
163    idToAddrMap.remove(id);
164  }
165
166  /**
167   * Finds an address for an identifier locally
168   *
169   * @param id an identifier
170   * @return an Internet socket address
171   */
172  @Override
173  public InetSocketAddress lookup(final Identifier id) {
174    LOG.log(Level.FINE, "id: {0}", id);
175    return idToAddrMap.get(id);
176  }
177
178  /**
179   * Finds addresses for identifiers locally
180   *
181   * @param identifiers an iterable of identifiers
182   * @return a list of name assignments
183   */
184  @Override
185  public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
186    LOG.log(Level.FINE, "identifiers");
187    final List<NameAssignment> nas = new ArrayList<>();
188    for (final Identifier id : identifiers) {
189      final InetSocketAddress addr = idToAddrMap.get(id);
190      LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
191      if (addr != null) {
192        nas.add(new NameAssignmentTuple(id, addr));
193      }
194    }
195    return nas;
196  }
197
198  private String getNameServerId() {
199    return NetUtils.getLocalAddress() + ":" + getPort();
200  }
201}
202
203/**
204 * Naming server transport event handler that invokes a specific naming message handler
205 */
206class NamingServerHandler implements EventHandler<TransportEvent> {
207
208  private final Codec<NamingMessage> codec;
209  private final EventHandler<NamingMessage> handler;
210
211  NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
212    this.codec = codec;
213    this.handler = handler;
214  }
215
216  @Override
217  public void onNext(final TransportEvent value) {
218    final byte[] data = value.getData();
219    final NamingMessage message = codec.decode(data);
220    message.setLink(value.getLink());
221    handler.onNext(message);
222  }
223}
224
225/**
226 * Naming lookup request handler
227 */
228class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {
229
230  private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());
231
232
233  private final NameServer server;
234  private final Codec<NamingMessage> codec;
235
236  public NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
237    this.server = server;
238    this.codec = codec;
239  }
240
241  @Override
242  public void onNext(final NamingLookupRequest value) {
243    final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
244    final byte[] resp = codec.encode(new NamingLookupResponse(nas));
245    try {
246      value.getLink().write(resp);
247    } catch (final IOException e) {
248      //Actually, there is no way Link.write can throw and IOException
249      //after netty4 merge. This needs to cleaned up
250      LOG.throwing("NamingLookupRequestHandler", "onNext", e);
251    }
252  }
253}
254
255/**
256 * Naming register request handler
257 */
258class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {
259
260  private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());
261
262
263  private final NameServer server;
264  private final Codec<NamingMessage> codec;
265
266  public NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
267    this.server = server;
268    this.codec = codec;
269  }
270
271  @Override
272  public void onNext(final NamingRegisterRequest value) {
273    server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
274    final byte[] resp = codec.encode(new NamingRegisterResponse(value));
275    try {
276      value.getLink().write(resp);
277    } catch (final IOException e) {
278      //Actually, there is no way Link.write can throw and IOException
279      //after netty4 merge. This needs to cleaned up
280      LOG.throwing("NamingRegisterRequestHandler", "onNext", e);
281    }
282  }
283}
284
285/**
286 * Naming unregister request handler
287 */
288class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {
289
290  private final NameServer server;
291
292  public NamingUnregisterRequestHandler(final NameServer server) {
293    this.server = server;
294  }
295
296  @Override
297  public void onNext(final NamingUnregisterRequest value) {
298    server.unregister(value.getIdentifier());
299  }
300}