001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.impl; 020 021import org.apache.reef.io.Tuple; 022import org.apache.reef.io.naming.Naming; 023import org.apache.reef.io.network.Connection; 024import org.apache.reef.io.network.ConnectionFactory; 025import org.apache.reef.io.network.Message; 026import org.apache.reef.io.network.naming.NameResolver; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.wake.*; 029import org.apache.reef.wake.impl.LoggingEventHandler; 030import org.apache.reef.wake.impl.SingleThreadStage; 031import org.apache.reef.wake.remote.Codec; 032import org.apache.reef.wake.remote.impl.TransportEvent; 033import org.apache.reef.wake.remote.transport.Transport; 034import org.apache.reef.wake.remote.transport.TransportFactory; 035import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; 036 037import javax.inject.Inject; 038import java.net.InetSocketAddress; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Network service for Task. 046 */ 047public final class NetworkService<T> implements Stage, ConnectionFactory<T> { 048 049 private static final Logger LOG = Logger.getLogger(NetworkService.class.getName()); 050 051 private final IdentifierFactory factory; 052 private final Codec<T> codec; 053 private final Transport transport; 054 private final NameResolver nameResolver; 055 private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>(); 056 private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage; 057 private final EStage<Identifier> nameServiceUnregisteringStage; 058 private Identifier myId; 059 060 @Inject 061 private NetworkService( 062 @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) final IdentifierFactory factory, 063 @Parameter(NetworkServiceParameters.NetworkServicePort.class) final int nsPort, 064 final NameResolver nameResolver, 065 @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) final Codec<T> codec, 066 @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) final TransportFactory tpFactory, 067 @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) final EventHandler<Message<T>> recvHandler, 068 @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) 069 final EventHandler<Exception> exHandler) { 070 this.factory = factory; 071 this.codec = codec; 072 this.transport = tpFactory.newInstance(nsPort, 073 new LoggingEventHandler<TransportEvent>(), 074 new MessageHandler<T>(recvHandler, codec, factory), exHandler); 075 076 this.nameResolver = nameResolver; 077 078 this.nameServiceRegisteringStage = new SingleThreadStage<>( 079 "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { 080 @Override 081 public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) { 082 try { 083 nameResolver.register(tuple.getKey(), tuple.getValue()); 084 LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey()); 085 } catch (final Exception ex) { 086 final String msg = "Unable to register " + tuple.getKey() + "with name service"; 087 LOG.log(Level.WARNING, msg, ex); 088 throw new RuntimeException(msg, ex); 089 } 090 } 091 }, 5); 092 093 this.nameServiceUnregisteringStage = new SingleThreadStage<>( 094 "NameServiceRegisterer", new EventHandler<Identifier>() { 095 @Override 096 public void onNext(final Identifier id) { 097 try { 098 nameResolver.unregister(id); 099 LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id); 100 } catch (final Exception ex) { 101 final String msg = "Unable to unregister " + id + " with name service"; 102 LOG.log(Level.WARNING, msg, ex); 103 throw new RuntimeException(msg, ex); 104 } 105 } 106 }, 5); 107 } 108 109 public void registerId(final Identifier id) { 110 this.myId = id; 111 final Tuple<Identifier, InetSocketAddress> tuple = 112 new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress()); 113 LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})", 114 new Object[]{tuple.getKey(), tuple.getValue()}); 115 this.nameServiceRegisteringStage.onNext(tuple); 116 } 117 118 public void unregisterId(final Identifier id) { 119 this.myId = null; 120 LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})", 121 new Object[]{id, this.transport.getLocalAddress()}); 122 this.nameServiceUnregisteringStage.onNext(id); 123 } 124 125 public Identifier getMyId() { 126 return this.myId; 127 } 128 129 public Transport getTransport() { 130 return this.transport; 131 } 132 133 public Codec<T> getCodec() { 134 return this.codec; 135 } 136 137 public Naming getNameClient() { 138 return this.nameResolver; 139 } 140 141 public IdentifierFactory getIdentifierFactory() { 142 return this.factory; 143 } 144 145 void remove(final Identifier id) { 146 this.idToConnMap.remove(id); 147 } 148 149 @Override 150 public void close() throws Exception { 151 LOG.log(Level.FINE, "Shutting down"); 152 this.transport.close(); 153 this.nameResolver.close(); 154 } 155 156 @Override 157 public Connection<T> newConnection(final Identifier destId) { 158 159 if (this.myId == null) { 160 throw new RuntimeException( 161 "Trying to establish a connection from a Network Service that is not bound to any task"); 162 } 163 164 final Connection<T> conn = this.idToConnMap.get(destId); 165 if (conn != null) { 166 return conn; 167 } 168 169 final Connection<T> newConnection = new NSConnection<>( 170 this.myId, destId, new LoggingLinkListener<T>(), this); 171 172 final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection); 173 return existing == null ? newConnection : existing; 174 } 175 176 @Override 177 public Identifier getConnectionFactoryId() { 178 throw new UnsupportedOperationException(); 179 } 180 181 @Override 182 public Identifier getLocalEndPointId() { 183 throw new UnsupportedOperationException(); 184 } 185} 186 187class MessageHandler<T> implements EventHandler<TransportEvent> { 188 189 private final EventHandler<Message<T>> handler; 190 private final NSMessageCodec<T> codec; 191 192 MessageHandler(final EventHandler<Message<T>> handler, 193 final Codec<T> codec, final IdentifierFactory factory) { 194 this.handler = handler; 195 this.codec = new NSMessageCodec<>(codec, factory); 196 } 197 198 @Override 199 public void onNext(final TransportEvent value) { 200 final byte[] data = value.getData(); 201 final NSMessage<T> obj = this.codec.decode(data); 202 this.handler.onNext(obj); 203 } 204}