001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.impl; 020 021import org.apache.reef.io.Tuple; 022import org.apache.reef.io.naming.Naming; 023import org.apache.reef.io.network.Connection; 024import org.apache.reef.io.network.ConnectionFactory; 025import org.apache.reef.io.network.Message; 026import org.apache.reef.io.network.TransportFactory; 027import org.apache.reef.io.network.naming.NameCache; 028import org.apache.reef.io.network.naming.NameClient; 029import org.apache.reef.io.network.naming.NameLookupClient; 030import org.apache.reef.io.network.naming.NameServerParameters; 031import org.apache.reef.tang.Injector; 032import org.apache.reef.tang.Tang; 033import org.apache.reef.tang.annotations.Parameter; 034import org.apache.reef.tang.exceptions.InjectionException; 035import org.apache.reef.wake.*; 036import org.apache.reef.wake.impl.LoggingEventHandler; 037import org.apache.reef.wake.impl.SingleThreadStage; 038import org.apache.reef.wake.remote.Codec; 039import org.apache.reef.wake.remote.impl.TransportEvent; 040import org.apache.reef.wake.remote.transport.LinkListener; 041import org.apache.reef.wake.remote.transport.Transport; 042 043import javax.inject.Inject; 044import java.net.InetSocketAddress; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.logging.Level; 048import java.util.logging.Logger; 049 050/** 051 * Network service for Task 052 */ 053public final class NetworkService<T> implements Stage, ConnectionFactory<T> { 054 055 private static final Logger LOG = Logger.getLogger(NetworkService.class.getName()); 056 057 private static final int retryCount; 058 private static final int retryTimeout; 059 060 static { 061 try { 062 final Injector injector = Tang.Factory.getTang().newInjector(); 063 retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class); 064 retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class); 065 } catch (final InjectionException ex) { 066 final String msg = "Exception while trying to find default values for retryCount & Timeout"; 067 LOG.log(Level.SEVERE, msg, ex); 068 throw new RuntimeException(msg, ex); 069 } 070 } 071 072 private final IdentifierFactory factory; 073 private final Codec<T> codec; 074 private final Transport transport; 075 private final NameClient nameClient; 076 private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>(); 077 private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage; 078 private final EStage<Identifier> nameServiceUnregisteringStage; 079 private Identifier myId; 080 081 public NetworkService(final IdentifierFactory factory, 082 final int nsPort, 083 final String nameServerAddr, 084 final int nameServerPort, 085 final Codec<T> codec, 086 final TransportFactory tpFactory, 087 final EventHandler<Message<T>> recvHandler, 088 final EventHandler<Exception> exHandler) { 089 this(factory, nsPort, nameServerAddr, nameServerPort, 090 retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler); 091 } 092 093 @Inject 094 public NetworkService( 095 final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory factory, 096 final @Parameter(NetworkServiceParameters.NetworkServicePort.class) int nsPort, 097 final @Parameter(NameServerParameters.NameServerAddr.class) String nameServerAddr, 098 final @Parameter(NameServerParameters.NameServerPort.class) int nameServerPort, 099 final @Parameter(NameLookupClient.RetryCount.class) int retryCount, 100 final @Parameter(NameLookupClient.RetryTimeout.class) int retryTimeout, 101 final @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec, 102 final @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory tpFactory, 103 final @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> recvHandler, 104 final @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> exHandler) { 105 106 this.factory = factory; 107 this.codec = codec; 108 this.transport = tpFactory.create(nsPort, 109 new LoggingEventHandler<TransportEvent>(), 110 new MessageHandler<T>(recvHandler, codec, factory), exHandler); 111 112 this.nameClient = new NameClient(nameServerAddr, nameServerPort, 113 factory, retryCount, retryTimeout, new NameCache(30000)); 114 115 this.nameServiceRegisteringStage = new SingleThreadStage<>( 116 "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { 117 @Override 118 public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) { 119 try { 120 nameClient.register(tuple.getKey(), tuple.getValue()); 121 LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey()); 122 } catch (final Exception ex) { 123 final String msg = "Unable to register " + tuple.getKey() + "with name service"; 124 LOG.log(Level.WARNING, msg, ex); 125 throw new RuntimeException(msg, ex); 126 } 127 } 128 }, 5); 129 130 this.nameServiceUnregisteringStage = new SingleThreadStage<>( 131 "NameServiceRegisterer", new EventHandler<Identifier>() { 132 @Override 133 public void onNext(final Identifier id) { 134 try { 135 nameClient.unregister(id); 136 LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id); 137 } catch (final Exception ex) { 138 final String msg = "Unable to unregister " + id + " with name service"; 139 LOG.log(Level.WARNING, msg, ex); 140 throw new RuntimeException(msg, ex); 141 } 142 } 143 }, 5); 144 } 145 146 public void registerId(final Identifier id) { 147 this.myId = id; 148 final Tuple<Identifier, InetSocketAddress> tuple = 149 new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress()); 150 LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})", 151 new Object[]{tuple.getKey(), tuple.getValue()}); 152 this.nameServiceRegisteringStage.onNext(tuple); 153 } 154 155 public void unregisterId(Identifier id) { 156 this.myId = null; 157 LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})", 158 new Object[]{id, this.transport.getLocalAddress()}); 159 this.nameServiceUnregisteringStage.onNext(id); 160 } 161 162 public Identifier getMyId() { 163 return this.myId; 164 } 165 166 public Transport getTransport() { 167 return this.transport; 168 } 169 170 public Codec<T> getCodec() { 171 return this.codec; 172 } 173 174 public Naming getNameClient() { 175 return this.nameClient; 176 } 177 178 public IdentifierFactory getIdentifierFactory() { 179 return this.factory; 180 } 181 182 void remove(final Identifier id) { 183 this.idToConnMap.remove(id); 184 } 185 186 @Override 187 public void close() throws Exception { 188 LOG.log(Level.FINE, "Shutting down"); 189 this.transport.close(); 190 this.nameClient.close(); 191 } 192 193 @Override 194 public Connection<T> newConnection(final Identifier destId) { 195 196 if (this.myId == null) { 197 throw new RuntimeException( 198 "Trying to establish a connection from a Network Service that is not bound to any task"); 199 } 200 201 final Connection<T> conn = this.idToConnMap.get(destId); 202 if (conn != null) { 203 return conn; 204 } 205 206 final Connection<T> newConnection = new NSConnection<T>( 207 this.myId, destId, new LinkListener<T>() { 208 @Override 209 public void messageReceived(final Object message) { 210 } 211 }, this); 212 213 final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection); 214 return existing == null ? newConnection : existing; 215 } 216} 217 218class MessageHandler<T> implements EventHandler<TransportEvent> { 219 220 private final EventHandler<Message<T>> handler; 221 private final NSMessageCodec<T> codec; 222 223 public MessageHandler(final EventHandler<Message<T>> handler, 224 final Codec<T> codec, final IdentifierFactory factory) { 225 this.handler = handler; 226 this.codec = new NSMessageCodec<T>(codec, factory); 227 } 228 229 @Override 230 public void onNext(final TransportEvent value) { 231 final byte[] data = value.getData(); 232 final NSMessage<T> obj = this.codec.decode(data); 233 this.handler.onNext(obj); 234 } 235}