001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.NameAssignment; 022import org.apache.reef.io.network.naming.serialization.*; 023import org.apache.reef.tang.Injector; 024import org.apache.reef.tang.Tang; 025import org.apache.reef.tang.annotations.Parameter; 026import org.apache.reef.tang.exceptions.InjectionException; 027import org.apache.reef.wake.EventHandler; 028import org.apache.reef.wake.Identifier; 029import org.apache.reef.wake.IdentifierFactory; 030import org.apache.reef.wake.impl.MultiEventHandler; 031import org.apache.reef.wake.impl.SyncStage; 032import org.apache.reef.wake.remote.Codec; 033import org.apache.reef.wake.remote.RemoteConfiguration; 034import org.apache.reef.wake.remote.address.LocalAddressProvider; 035import org.apache.reef.wake.remote.impl.TransportEvent; 036import org.apache.reef.wake.remote.transport.Transport; 037import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 038import org.apache.reef.webserver.ReefEventStateManager; 039 040import javax.inject.Inject; 041import java.net.InetSocketAddress; 042import java.util.*; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046/** 047 * Naming server implementation. 048 */ 049public final class NameServerImpl implements NameServer { 050 051 private static final Logger LOG = Logger.getLogger(NameServer.class.getName()); 052 053 private final Transport transport; 054 private final Map<Identifier, InetSocketAddress> idToAddrMap; 055 private final ReefEventStateManager reefEventStateManager; 056 private final int port; 057 private final LocalAddressProvider localAddressProvider; 058 059 /** 060 * @param port a listening port number 061 * @param factory an identifier factory 062 * @param localAddressProvider a local address provider 063 * Constructs a name server 064 */ 065 @Inject 066 private NameServerImpl( 067 @Parameter(NameServerParameters.NameServerPort.class) final int port, 068 @Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory, 069 final LocalAddressProvider localAddressProvider) { 070 071 final Injector injector = Tang.Factory.getTang().newInjector(); 072 073 this.localAddressProvider = localAddressProvider; 074 this.reefEventStateManager = null; 075 final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); 076 final EventHandler<NamingMessage> handler = createEventHandler(codec); 077 078 injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress()); 079 injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); 080 injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, 081 new SyncStage<>(new NamingServerHandler(handler, codec))); 082 083 try { 084 this.transport = injector.getInstance(NettyMessagingTransport.class); 085 } catch (final InjectionException e) { 086 throw new RuntimeException(e); 087 } 088 089 this.port = transport.getListeningPort(); 090 this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>()); 091 092 LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port); 093 } 094 095 private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) { 096 097 final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>> 098 clazzToHandlerMap = new HashMap<>(); 099 100 clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec)); 101 clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec)); 102 clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this)); 103 final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap); 104 105 return handler; 106 } 107 /** 108 * Gets port. 109 */ 110 @Override 111 public int getPort() { 112 return port; 113 } 114 115 /** 116 * Closes resources. 117 */ 118 @Override 119 public void close() throws Exception { 120 transport.close(); 121 } 122 123 /** 124 * Registers an (identifier, address) mapping locally. 125 * 126 * @param id an identifier 127 * @param addr an Internet socket address 128 */ 129 @Override 130 public void register(final Identifier id, final InetSocketAddress addr) { 131 LOG.log(Level.FINE, "id: " + id + " addr: " + addr); 132 idToAddrMap.put(id, addr); 133 } 134 135 /** 136 * Unregisters an identifier locally. 137 * 138 * @param id an identifier 139 */ 140 @Override 141 public void unregister(final Identifier id) { 142 LOG.log(Level.FINE, "id: " + id); 143 idToAddrMap.remove(id); 144 } 145 146 /** 147 * Finds an address for an identifier locally. 148 * 149 * @param id an identifier 150 * @return an Internet socket address 151 */ 152 @Override 153 public InetSocketAddress lookup(final Identifier id) { 154 LOG.log(Level.FINE, "id: {0}", id); 155 return idToAddrMap.get(id); 156 } 157 158 /** 159 * Finds addresses for identifiers locally. 160 * 161 * @param identifiers an iterable of identifiers 162 * @return a list of name assignments 163 */ 164 @Override 165 public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) { 166 LOG.log(Level.FINE, "identifiers"); 167 final List<NameAssignment> nas = new ArrayList<>(); 168 for (final Identifier id : identifiers) { 169 final InetSocketAddress addr = idToAddrMap.get(id); 170 LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr}); 171 if (addr != null) { 172 nas.add(new NameAssignmentTuple(id, addr)); 173 } 174 } 175 return nas; 176 } 177 178 private String getNameServerId() { 179 return this.localAddressProvider.getLocalAddress() + ":" + getPort(); 180 } 181} 182 183/** 184 * Naming server transport event handler that invokes a specific naming message handler. 185 */ 186class NamingServerHandler implements EventHandler<TransportEvent> { 187 188 private final Codec<NamingMessage> codec; 189 private final EventHandler<NamingMessage> handler; 190 191 NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) { 192 this.codec = codec; 193 this.handler = handler; 194 } 195 196 @Override 197 public void onNext(final TransportEvent value) { 198 final byte[] data = value.getData(); 199 final NamingMessage message = codec.decode(data); 200 message.setLink(value.getLink()); 201 handler.onNext(message); 202 } 203} 204 205/** 206 * Naming lookup request handler. 207 */ 208class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> { 209 210 private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName()); 211 212 213 private final NameServer server; 214 private final Codec<NamingMessage> codec; 215 216 NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) { 217 this.server = server; 218 this.codec = codec; 219 } 220 221 @Override 222 public void onNext(final NamingLookupRequest value) { 223 final List<NameAssignment> nas = server.lookup(value.getIdentifiers()); 224 final byte[] resp = codec.encode(new NamingLookupResponse(nas)); 225 value.getLink().write(resp); 226 } 227} 228 229/** 230 * Naming register request handler. 231 */ 232class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> { 233 234 private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName()); 235 236 237 private final NameServer server; 238 private final Codec<NamingMessage> codec; 239 240 NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) { 241 this.server = server; 242 this.codec = codec; 243 } 244 245 @Override 246 public void onNext(final NamingRegisterRequest value) { 247 server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress()); 248 final byte[] resp = codec.encode(new NamingRegisterResponse(value)); 249 value.getLink().write(resp); 250 } 251} 252 253/** 254 * Naming unregister request handler. 255 */ 256class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> { 257 258 private final NameServer server; 259 260 NamingUnregisterRequestHandler(final NameServer server) { 261 this.server = server; 262 } 263 264 @Override 265 public void onNext(final NamingUnregisterRequest value) { 266 server.unregister(value.getIdentifier()); 267 } 268}