001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.NameAssignment; 022import org.apache.reef.io.network.naming.serialization.*; 023import org.apache.reef.tang.annotations.Parameter; 024import org.apache.reef.wake.EventHandler; 025import org.apache.reef.wake.Identifier; 026import org.apache.reef.wake.IdentifierFactory; 027import org.apache.reef.wake.Stage; 028import org.apache.reef.wake.impl.MultiEventHandler; 029import org.apache.reef.wake.impl.SyncStage; 030import org.apache.reef.wake.remote.Codec; 031import org.apache.reef.wake.remote.NetUtils; 032import org.apache.reef.wake.remote.impl.TransportEvent; 033import org.apache.reef.wake.remote.transport.Transport; 034import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 035import org.apache.reef.webserver.AvroReefServiceInfo; 036import org.apache.reef.webserver.ReefEventStateManager; 037 038import javax.inject.Inject; 039import java.io.IOException; 040import java.net.InetSocketAddress; 041import java.util.*; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Naming server implementation 047 */ 048public class NameServerImpl implements NameServer { 049 050 private static final Logger LOG = Logger.getLogger(NameServer.class.getName()); 051 052 private final Transport transport; 053 private final Map<Identifier, InetSocketAddress> idToAddrMap; 054 private final ReefEventStateManager reefEventStateManager; 055 private final int port; 056 057 /** 058 * @param port a listening port number 059 * @param factory an identifier factory 060 * @deprecated inject the NameServer instead of new it up 061 * Constructs a name server 062 */ 063 // TODO: All existing NameServer usage is currently new-up, need to make them injected as well. 064 @Deprecated 065 public NameServerImpl( 066 final int port, 067 final IdentifierFactory factory) { 068 069 this.reefEventStateManager = null; 070 final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); 071 final EventHandler<NamingMessage> handler = createEventHandler(codec); 072 073 this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null, 074 new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000); 075 076 this.port = transport.getListeningPort(); 077 this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>()); 078 079 LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port); 080 } 081 082 083 /** 084 * Constructs a name server 085 * 086 * @param port a listening port number 087 * @param factory an identifier factory 088 * @param reefEventStateManager the event state manager used to register name server info 089 */ 090 @Inject 091 public NameServerImpl( 092 final @Parameter(NameServerParameters.NameServerPort.class) int port, 093 final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory, 094 final ReefEventStateManager reefEventStateManager) { 095 096 this.reefEventStateManager = reefEventStateManager; 097 final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); 098 final EventHandler<NamingMessage> handler = createEventHandler(codec); 099 100 this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null, 101 new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000); 102 103 this.port = transport.getListeningPort(); 104 this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>()); 105 106 this.reefEventStateManager.registerServiceInfo( 107 AvroReefServiceInfo.newBuilder() 108 .setServiceName("NameServer") 109 .setServiceInfo(getNameServerId()) 110 .build()); 111 LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port); 112 } 113 114 private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) { 115 116 final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>> 117 clazzToHandlerMap = new HashMap<>(); 118 119 clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec)); 120 clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec)); 121 clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this)); 122 final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap); 123 124 return handler; 125 } 126 127 /** 128 * Gets port 129 */ 130 @Override 131 public int getPort() { 132 return port; 133 } 134 135 /** 136 * Closes resources 137 */ 138 @Override 139 public void close() throws Exception { 140 transport.close(); 141 } 142 143 /** 144 * Registers an (identifier, address) mapping locally 145 * 146 * @param id an identifier 147 * @param addr an Internet socket address 148 */ 149 @Override 150 public void register(final Identifier id, final InetSocketAddress addr) { 151 LOG.log(Level.FINE, "id: " + id + " addr: " + addr); 152 idToAddrMap.put(id, addr); 153 } 154 155 /** 156 * Unregisters an identifier locally 157 * 158 * @param id an identifier 159 */ 160 @Override 161 public void unregister(final Identifier id) { 162 LOG.log(Level.FINE, "id: " + id); 163 idToAddrMap.remove(id); 164 } 165 166 /** 167 * Finds an address for an identifier locally 168 * 169 * @param id an identifier 170 * @return an Internet socket address 171 */ 172 @Override 173 public InetSocketAddress lookup(final Identifier id) { 174 LOG.log(Level.FINE, "id: {0}", id); 175 return idToAddrMap.get(id); 176 } 177 178 /** 179 * Finds addresses for identifiers locally 180 * 181 * @param identifiers an iterable of identifiers 182 * @return a list of name assignments 183 */ 184 @Override 185 public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) { 186 LOG.log(Level.FINE, "identifiers"); 187 final List<NameAssignment> nas = new ArrayList<>(); 188 for (final Identifier id : identifiers) { 189 final InetSocketAddress addr = idToAddrMap.get(id); 190 LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr}); 191 if (addr != null) { 192 nas.add(new NameAssignmentTuple(id, addr)); 193 } 194 } 195 return nas; 196 } 197 198 private String getNameServerId() { 199 return NetUtils.getLocalAddress() + ":" + getPort(); 200 } 201} 202 203/** 204 * Naming server transport event handler that invokes a specific naming message handler 205 */ 206class NamingServerHandler implements EventHandler<TransportEvent> { 207 208 private final Codec<NamingMessage> codec; 209 private final EventHandler<NamingMessage> handler; 210 211 NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) { 212 this.codec = codec; 213 this.handler = handler; 214 } 215 216 @Override 217 public void onNext(final TransportEvent value) { 218 final byte[] data = value.getData(); 219 final NamingMessage message = codec.decode(data); 220 message.setLink(value.getLink()); 221 handler.onNext(message); 222 } 223} 224 225/** 226 * Naming lookup request handler 227 */ 228class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> { 229 230 private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName()); 231 232 233 private final NameServer server; 234 private final Codec<NamingMessage> codec; 235 236 public NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) { 237 this.server = server; 238 this.codec = codec; 239 } 240 241 @Override 242 public void onNext(final NamingLookupRequest value) { 243 final List<NameAssignment> nas = server.lookup(value.getIdentifiers()); 244 final byte[] resp = codec.encode(new NamingLookupResponse(nas)); 245 try { 246 value.getLink().write(resp); 247 } catch (final IOException e) { 248 //Actually, there is no way Link.write can throw and IOException 249 //after netty4 merge. This needs to cleaned up 250 LOG.throwing("NamingLookupRequestHandler", "onNext", e); 251 } 252 } 253} 254 255/** 256 * Naming register request handler 257 */ 258class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> { 259 260 private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName()); 261 262 263 private final NameServer server; 264 private final Codec<NamingMessage> codec; 265 266 public NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) { 267 this.server = server; 268 this.codec = codec; 269 } 270 271 @Override 272 public void onNext(final NamingRegisterRequest value) { 273 server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress()); 274 final byte[] resp = codec.encode(new NamingRegisterResponse(value)); 275 try { 276 value.getLink().write(resp); 277 } catch (final IOException e) { 278 //Actually, there is no way Link.write can throw and IOException 279 //after netty4 merge. This needs to cleaned up 280 LOG.throwing("NamingRegisterRequestHandler", "onNext", e); 281 } 282 } 283} 284 285/** 286 * Naming unregister request handler 287 */ 288class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> { 289 290 private final NameServer server; 291 292 public NamingUnregisterRequestHandler(final NameServer server) { 293 this.server = server; 294 } 295 296 @Override 297 public void onNext(final NamingUnregisterRequest value) { 298 server.unregister(value.getIdentifier()); 299 } 300}