001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.impl; 020 021import org.apache.reef.exception.evaluator.NetworkException; 022import org.apache.reef.io.Tuple; 023import org.apache.reef.io.network.ConnectionFactory; 024import org.apache.reef.io.network.Message; 025import org.apache.reef.io.network.NetworkConnectionService; 026import org.apache.reef.io.network.exception.NetworkRuntimeException; 027import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; 028import org.apache.reef.io.network.impl.config.NetworkConnectionServicePort; 029import org.apache.reef.io.network.naming.NameResolver; 030import org.apache.reef.tang.annotations.Parameter; 031import org.apache.reef.wake.EStage; 032import org.apache.reef.wake.EventHandler; 033import org.apache.reef.wake.Identifier; 034import org.apache.reef.wake.IdentifierFactory; 035import org.apache.reef.wake.impl.SingleThreadStage; 036import org.apache.reef.wake.remote.Codec; 037import org.apache.reef.wake.remote.impl.TransportEvent; 038import org.apache.reef.wake.remote.transport.Link; 039import org.apache.reef.wake.remote.transport.LinkListener; 040import org.apache.reef.wake.remote.transport.Transport; 041import org.apache.reef.wake.remote.transport.TransportFactory; 042 043import javax.inject.Inject; 044import java.net.InetSocketAddress; 045import java.net.SocketAddress; 046import java.util.concurrent.ConcurrentHashMap; 047import java.util.concurrent.ConcurrentMap; 048import java.util.concurrent.atomic.AtomicBoolean; 049import java.util.logging.Level; 050import java.util.logging.Logger; 051 052/** 053 * Default Network connection service implementation. 054 */ 055public final class NetworkConnectionServiceImpl implements NetworkConnectionService { 056 057 private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceImpl.class.getName()); 058 059 /** 060 * An identifier factory registering network connection service id. 061 */ 062 private final IdentifierFactory idFactory; 063 /** 064 * A name resolver looking up nameserver. 065 */ 066 private final NameResolver nameResolver; 067 /** 068 * A messaging transport. 069 */ 070 private final Transport transport; 071 /** 072 * A map of (id of connection factory, a connection factory instance). 073 */ 074 private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap; 075 076 /** 077 * A network connection service message codec. 078 */ 079 private final Codec<NetworkConnectionServiceMessage> nsCodec; 080 /** 081 * A network connection service link listener. 082 */ 083 private final LinkListener<NetworkConnectionServiceMessage> nsLinkListener; 084 /** 085 * A stage registering identifiers to nameServer. 086 */ 087 private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage; 088 /** 089 * A stage unregistering identifiers from nameServer. 090 */ 091 private final EStage<Identifier> nameServiceUnregisteringStage; 092 /** 093 * A boolean flag that indicates whether the NetworkConnectionService is closed. 094 */ 095 private final AtomicBoolean isClosed; 096 /** 097 * A DELIMITER to make a concatenated end point id {{connectionFactoryId}}{{DELIMITER}}{{localEndPointId}}. 098 */ 099 private static final String DELIMITER = "/"; 100 101 @Inject 102 private NetworkConnectionServiceImpl( 103 @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFactory, 104 @Parameter(NetworkConnectionServicePort.class) final int nsPort, 105 final TransportFactory transportFactory, 106 final NameResolver nameResolver) { 107 this.idFactory = idFactory; 108 this.connFactoryMap = new ConcurrentHashMap<>(); 109 this.nsCodec = new NetworkConnectionServiceMessageCodec(idFactory, connFactoryMap); 110 this.nsLinkListener = new NetworkConnectionServiceLinkListener(connFactoryMap); 111 final EventHandler<TransportEvent> recvHandler = 112 new NetworkConnectionServiceReceiveHandler(connFactoryMap, nsCodec); 113 this.nameResolver = nameResolver; 114 this.transport = transportFactory.newInstance(nsPort, recvHandler, recvHandler, 115 new NetworkConnectionServiceExceptionHandler()); 116 117 this.nameServiceRegisteringStage = new SingleThreadStage<>( 118 "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { 119 @Override 120 public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) { 121 try { 122 nameResolver.register(tuple.getKey(), tuple.getValue()); 123 LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey()); 124 } catch (final Exception ex) { 125 final String msg = "Unable to register " + tuple.getKey() + " with name service"; 126 LOG.log(Level.WARNING, msg, ex); 127 throw new RuntimeException(msg, ex); 128 } 129 } 130 }, 5); 131 132 this.nameServiceUnregisteringStage = new SingleThreadStage<>( 133 "NameServiceRegisterer", new EventHandler<Identifier>() { 134 @Override 135 public void onNext(final Identifier id) { 136 try { 137 nameResolver.unregister(id); 138 LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id); 139 } catch (final Exception ex) { 140 final String msg = "Unable to unregister " + id + " with name service"; 141 LOG.log(Level.WARNING, msg, ex); 142 throw new RuntimeException(msg, ex); 143 } 144 } 145 }, 5); 146 147 this.isClosed = new AtomicBoolean(); 148 } 149 150 151 private void checkBeforeRegistration(final String connectionFactoryId) { 152 if (isClosed.get()) { 153 throw new NetworkRuntimeException("Unable to register new ConnectionFactory to closed NetworkConnectionService"); 154 } 155 156 if (connFactoryMap.get(connectionFactoryId) != null) { 157 throw new NetworkRuntimeException("ConnectionFactory " + connectionFactoryId + " was already registered."); 158 } 159 160 if (connectionFactoryId.contains(DELIMITER)) { 161 throw new NetworkRuntimeException( 162 "The ConnectionFactoryId " + connectionFactoryId + " should not contain " + DELIMITER); 163 } 164 } 165 166 @Override 167 public <T> ConnectionFactory<T> registerConnectionFactory( 168 final Identifier connectionFactoryId, 169 final Codec<T> codec, 170 final EventHandler<Message<T>> eventHandler, 171 final LinkListener<Message<T>> linkListener, 172 final Identifier localEndPointId) { 173 final String id = connectionFactoryId.toString(); 174 checkBeforeRegistration(id); 175 176 final NetworkConnectionFactory<T> connectionFactory = new NetworkConnectionFactory<>( 177 this, connectionFactoryId, codec, eventHandler, linkListener, localEndPointId); 178 final Identifier localId = getEndPointIdWithConnectionFactoryId(connectionFactoryId, localEndPointId); 179 nameServiceRegisteringStage.onNext(new Tuple<>(localId, (InetSocketAddress) transport.getLocalAddress())); 180 181 if (connFactoryMap.putIfAbsent(id, connectionFactory) != null) { 182 throw new NetworkRuntimeException("ConnectionFactory " + connectionFactoryId + " was already registered."); 183 } 184 185 LOG.log(Level.INFO, "ConnectionFactory {0} was registered", id); 186 187 return connectionFactory; 188 } 189 190 @Override 191 public void unregisterConnectionFactory(final Identifier connFactoryId) { 192 final String id = connFactoryId.toString(); 193 final NetworkConnectionFactory connFactory = connFactoryMap.remove(id); 194 if (connFactory != null) { 195 LOG.log(Level.INFO, "ConnectionFactory {0} was unregistered", id); 196 197 final Identifier localId = getEndPointIdWithConnectionFactoryId( 198 connFactoryId, connFactory.getLocalEndPointId()); 199 nameServiceUnregisteringStage.onNext(localId); 200 } else { 201 LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id); 202 } 203 } 204 205 206 /** 207 * Open a channel for destination identifier of NetworkConnectionService. 208 * @param connectionFactoryId 209 * @param remoteEndPointId 210 * @throws NetworkException 211 */ 212 <T> Link<NetworkConnectionServiceMessage<T>> openLink( 213 final Identifier connectionFactoryId, final Identifier remoteEndPointId) throws NetworkException { 214 final Identifier remoteId = getEndPointIdWithConnectionFactoryId(connectionFactoryId, remoteEndPointId); 215 try { 216 final SocketAddress address = nameResolver.lookup(remoteId); 217 if (address == null) { 218 throw new NetworkException("Lookup " + remoteId + " is null"); 219 } 220 return transport.open(address, nsCodec, nsLinkListener); 221 } catch(final Exception e) { 222 throw new NetworkException(e); 223 } 224 } 225 226 227 private Identifier getEndPointIdWithConnectionFactoryId( 228 final Identifier connectionFactoryId, final Identifier endPointId) { 229 final String identifier = connectionFactoryId.toString() + DELIMITER + endPointId.toString(); 230 return idFactory.getNewInstance(identifier); 231 } 232 233 /** 234 * Gets a ConnectionFactory. 235 * @param connFactoryId the identifier of the ConnectionFactory 236 */ 237 @Override 238 public <T> ConnectionFactory<T> getConnectionFactory(final Identifier connFactoryId) { 239 final ConnectionFactory<T> connFactory = connFactoryMap.get(connFactoryId.toString()); 240 if (connFactory == null) { 241 throw new RuntimeException("Cannot find ConnectionFactory of " + connFactoryId + "."); 242 } 243 return connFactory; 244 } 245 246 @Override 247 public void close() throws Exception { 248 if (isClosed.compareAndSet(false, true)) { 249 LOG.log(Level.FINE, "Shutting down"); 250 this.nameServiceRegisteringStage.close(); 251 this.nameServiceUnregisteringStage.close(); 252 this.nameResolver.close(); 253 this.transport.close(); 254 } 255 } 256}