001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.impl; 020 021import org.apache.reef.io.Tuple; 022import org.apache.reef.io.naming.Naming; 023import org.apache.reef.io.network.Connection; 024import org.apache.reef.io.network.ConnectionFactory; 025import org.apache.reef.io.network.Message; 026import org.apache.reef.io.network.naming.NameResolver; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.wake.*; 029import org.apache.reef.wake.impl.LoggingEventHandler; 030import org.apache.reef.wake.impl.SingleThreadStage; 031import org.apache.reef.wake.remote.Codec; 032import org.apache.reef.wake.remote.address.LocalAddressProvider; 033import org.apache.reef.wake.remote.impl.TransportEvent; 034import org.apache.reef.wake.remote.transport.Transport; 035import org.apache.reef.wake.remote.transport.TransportFactory; 036import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; 037 038import javax.inject.Inject; 039import java.net.InetSocketAddress; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Network service for Task. 047 */ 048public final class NetworkService<T> implements Stage, ConnectionFactory<T> { 049 050 private static final Logger LOG = Logger.getLogger(NetworkService.class.getName()); 051 052 private final IdentifierFactory factory; 053 private final Codec<T> codec; 054 private final Transport transport; 055 private final NameResolver nameResolver; 056 private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>(); 057 private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage; 058 private final EStage<Identifier> nameServiceUnregisteringStage; 059 private Identifier myId; 060 061 /** 062 * @deprecated in 0.12. Use Tang to obtain an instance of this instead. 063 */ 064 @Deprecated 065 @Inject 066 public NetworkService( 067 @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) final IdentifierFactory factory, 068 @Parameter(NetworkServiceParameters.NetworkServicePort.class) final int nsPort, 069 final NameResolver nameResolver, 070 @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) final Codec<T> codec, 071 @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) final TransportFactory tpFactory, 072 @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) final EventHandler<Message<T>> recvHandler, 073 @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) final EventHandler<Exception> exHandler, 074 final LocalAddressProvider localAddressProvider) { 075 076 this.factory = factory; 077 this.codec = codec; 078 this.transport = tpFactory.newInstance(nsPort, 079 new LoggingEventHandler<TransportEvent>(), 080 new MessageHandler<T>(recvHandler, codec, factory), exHandler); 081 082 this.nameResolver = nameResolver; 083 084 this.nameServiceRegisteringStage = new SingleThreadStage<>( 085 "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { 086 @Override 087 public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) { 088 try { 089 nameResolver.register(tuple.getKey(), tuple.getValue()); 090 LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey()); 091 } catch (final Exception ex) { 092 final String msg = "Unable to register " + tuple.getKey() + "with name service"; 093 LOG.log(Level.WARNING, msg, ex); 094 throw new RuntimeException(msg, ex); 095 } 096 } 097 }, 5); 098 099 this.nameServiceUnregisteringStage = new SingleThreadStage<>( 100 "NameServiceRegisterer", new EventHandler<Identifier>() { 101 @Override 102 public void onNext(final Identifier id) { 103 try { 104 nameResolver.unregister(id); 105 LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id); 106 } catch (final Exception ex) { 107 final String msg = "Unable to unregister " + id + " with name service"; 108 LOG.log(Level.WARNING, msg, ex); 109 throw new RuntimeException(msg, ex); 110 } 111 } 112 }, 5); 113 } 114 115 public void registerId(final Identifier id) { 116 this.myId = id; 117 final Tuple<Identifier, InetSocketAddress> tuple = 118 new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress()); 119 LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})", 120 new Object[]{tuple.getKey(), tuple.getValue()}); 121 this.nameServiceRegisteringStage.onNext(tuple); 122 } 123 124 public void unregisterId(final Identifier id) { 125 this.myId = null; 126 LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})", 127 new Object[]{id, this.transport.getLocalAddress()}); 128 this.nameServiceUnregisteringStage.onNext(id); 129 } 130 131 public Identifier getMyId() { 132 return this.myId; 133 } 134 135 public Transport getTransport() { 136 return this.transport; 137 } 138 139 public Codec<T> getCodec() { 140 return this.codec; 141 } 142 143 public Naming getNameClient() { 144 return this.nameResolver; 145 } 146 147 public IdentifierFactory getIdentifierFactory() { 148 return this.factory; 149 } 150 151 void remove(final Identifier id) { 152 this.idToConnMap.remove(id); 153 } 154 155 @Override 156 public void close() throws Exception { 157 LOG.log(Level.FINE, "Shutting down"); 158 this.transport.close(); 159 this.nameResolver.close(); 160 } 161 162 @Override 163 public Connection<T> newConnection(final Identifier destId) { 164 165 if (this.myId == null) { 166 throw new RuntimeException( 167 "Trying to establish a connection from a Network Service that is not bound to any task"); 168 } 169 170 final Connection<T> conn = this.idToConnMap.get(destId); 171 if (conn != null) { 172 return conn; 173 } 174 175 final Connection<T> newConnection = new NSConnection<>( 176 this.myId, destId, new LoggingLinkListener<T>(), this); 177 178 final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection); 179 return existing == null ? newConnection : existing; 180 } 181 182 @Override 183 public Identifier getConnectionFactoryId() { 184 throw new UnsupportedOperationException(); 185 } 186 187 @Override 188 public Identifier getLocalEndPointId() { 189 throw new UnsupportedOperationException(); 190 } 191} 192 193class MessageHandler<T> implements EventHandler<TransportEvent> { 194 195 private final EventHandler<Message<T>> handler; 196 private final NSMessageCodec<T> codec; 197 198 MessageHandler(final EventHandler<Message<T>> handler, 199 final Codec<T> codec, final IdentifierFactory factory) { 200 this.handler = handler; 201 this.codec = new NSMessageCodec<>(codec, factory); 202 } 203 204 @Override 205 public void onNext(final TransportEvent value) { 206 final byte[] data = value.getData(); 207 final NSMessage<T> obj = this.codec.decode(data); 208 this.handler.onNext(obj); 209 } 210}