001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.NameAssignment; 022import org.apache.reef.io.network.naming.serialization.*; 023import org.apache.reef.tang.Injector; 024import org.apache.reef.tang.Tang; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.tang.exceptions.InjectionException; 027import org.apache.reef.wake.EventHandler; 028import org.apache.reef.wake.Identifier; 029import org.apache.reef.wake.IdentifierFactory; 030import org.apache.reef.wake.impl.MultiEventHandler; 031import org.apache.reef.wake.impl.SyncStage; 032import org.apache.reef.wake.remote.Codec; 033import org.apache.reef.wake.remote.RemoteConfiguration; 034import org.apache.reef.wake.remote.address.LocalAddressProvider; 035import org.apache.reef.wake.remote.impl.TransportEvent; 036import org.apache.reef.wake.remote.transport.Transport; 037import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 038import org.apache.reef.webserver.ReefEventStateManager; 039 040import javax.inject.Inject; 041import java.net.InetSocketAddress; 042import java.util.*; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046/** 047 * Naming server implementation. 048 */ 049public final class NameServerImpl implements NameServer { 050 051 private static final Logger LOG = Logger.getLogger(NameServer.class.getName()); 052 053 private final Transport transport; 054 private final Map<Identifier, InetSocketAddress> idToAddrMap; 055 private final ReefEventStateManager reefEventStateManager; 056 private final int port; 057 private final LocalAddressProvider localAddressProvider; 058 059 /** 060 * @param port a listening port number 061 * @param factory an identifier factory 062 * @param localAddressProvider a local address provider 063 * Constructs a name server 064 * @deprecated in 0.12. Use Tang to obtain an instance of this or, better, NameServer, instead. 065 */ 066 @Deprecated 067 @Inject 068 public NameServerImpl( 069 @Parameter(NameServerParameters.NameServerPort.class) final int port, 070 @Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory, 071 final LocalAddressProvider localAddressProvider) { 072 073 final Injector injector = Tang.Factory.getTang().newInjector(); 074 075 this.localAddressProvider = localAddressProvider; 076 this.reefEventStateManager = null; 077 final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); 078 final EventHandler<NamingMessage> handler = createEventHandler(codec); 079 080 injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress()); 081 injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); 082 injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, 083 new SyncStage<>(new NamingServerHandler(handler, codec))); 084 085 try { 086 this.transport = injector.getInstance(NettyMessagingTransport.class); 087 } catch (final InjectionException e) { 088 throw new RuntimeException(e); 089 } 090 091 this.port = transport.getListeningPort(); 092 this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>()); 093 094 LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port); 095 } 096 097 private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) { 098 099 final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>> 100 clazzToHandlerMap = new HashMap<>(); 101 102 clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec)); 103 clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec)); 104 clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this)); 105 final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap); 106 107 return handler; 108 } 109 /** 110 * Gets port. 111 */ 112 @Override 113 public int getPort() { 114 return port; 115 } 116 117 /** 118 * Closes resources. 119 */ 120 @Override 121 public void close() throws Exception { 122 transport.close(); 123 } 124 125 /** 126 * Registers an (identifier, address) mapping locally. 127 * 128 * @param id an identifier 129 * @param addr an Internet socket address 130 */ 131 @Override 132 public void register(final Identifier id, final InetSocketAddress addr) { 133 LOG.log(Level.FINE, "id: " + id + " addr: " + addr); 134 idToAddrMap.put(id, addr); 135 } 136 137 /** 138 * Unregisters an identifier locally. 139 * 140 * @param id an identifier 141 */ 142 @Override 143 public void unregister(final Identifier id) { 144 LOG.log(Level.FINE, "id: " + id); 145 idToAddrMap.remove(id); 146 } 147 148 /** 149 * Finds an address for an identifier locally. 150 * 151 * @param id an identifier 152 * @return an Internet socket address 153 */ 154 @Override 155 public InetSocketAddress lookup(final Identifier id) { 156 LOG.log(Level.FINE, "id: {0}", id); 157 return idToAddrMap.get(id); 158 } 159 160 /** 161 * Finds addresses for identifiers locally. 162 * 163 * @param identifiers an iterable of identifiers 164 * @return a list of name assignments 165 */ 166 @Override 167 public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) { 168 LOG.log(Level.FINE, "identifiers"); 169 final List<NameAssignment> nas = new ArrayList<>(); 170 for (final Identifier id : identifiers) { 171 final InetSocketAddress addr = idToAddrMap.get(id); 172 LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr}); 173 if (addr != null) { 174 nas.add(new NameAssignmentTuple(id, addr)); 175 } 176 } 177 return nas; 178 } 179 180 private String getNameServerId() { 181 return this.localAddressProvider.getLocalAddress() + ":" + getPort(); 182 } 183} 184 185/** 186 * Naming server transport event handler that invokes a specific naming message handler. 187 */ 188class NamingServerHandler implements EventHandler<TransportEvent> { 189 190 private final Codec<NamingMessage> codec; 191 private final EventHandler<NamingMessage> handler; 192 193 NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) { 194 this.codec = codec; 195 this.handler = handler; 196 } 197 198 @Override 199 public void onNext(final TransportEvent value) { 200 final byte[] data = value.getData(); 201 final NamingMessage message = codec.decode(data); 202 message.setLink(value.getLink()); 203 handler.onNext(message); 204 } 205} 206 207/** 208 * Naming lookup request handler. 209 */ 210class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> { 211 212 private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName()); 213 214 215 private final NameServer server; 216 private final Codec<NamingMessage> codec; 217 218 NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) { 219 this.server = server; 220 this.codec = codec; 221 } 222 223 @Override 224 public void onNext(final NamingLookupRequest value) { 225 final List<NameAssignment> nas = server.lookup(value.getIdentifiers()); 226 final byte[] resp = codec.encode(new NamingLookupResponse(nas)); 227 value.getLink().write(resp); 228 } 229} 230 231/** 232 * Naming register request handler. 233 */ 234class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> { 235 236 private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName()); 237 238 239 private final NameServer server; 240 private final Codec<NamingMessage> codec; 241 242 NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) { 243 this.server = server; 244 this.codec = codec; 245 } 246 247 @Override 248 public void onNext(final NamingRegisterRequest value) { 249 server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress()); 250 final byte[] resp = codec.encode(new NamingRegisterResponse(value)); 251 value.getLink().write(resp); 252 } 253} 254 255/** 256 * Naming unregister request handler. 257 */ 258class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> { 259 260 private final NameServer server; 261 262 NamingUnregisterRequestHandler(final NameServer server) { 263 this.server = server; 264 } 265 266 @Override 267 public void onNext(final NamingUnregisterRequest value) { 268 server.unregister(value.getIdentifier()); 269 } 270}