001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.NamingRegistry; 022import org.apache.reef.io.network.naming.exception.NamingException; 023import org.apache.reef.io.network.naming.serialization.NamingMessage; 024import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest; 025import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse; 026import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest; 027import org.apache.reef.tang.Injector; 028import org.apache.reef.tang.Tang; 029import org.apache.reef.tang.exceptions.InjectionException; 030import org.apache.reef.wake.EventHandler; 031import org.apache.reef.wake.Identifier; 032import org.apache.reef.wake.IdentifierFactory; 033import org.apache.reef.wake.Stage; 034import org.apache.reef.wake.impl.SyncStage; 035import org.apache.reef.wake.remote.Codec; 036import org.apache.reef.wake.remote.RemoteConfiguration; 037import org.apache.reef.wake.remote.address.LocalAddressProvider; 038import org.apache.reef.wake.remote.impl.TransportEvent; 039import org.apache.reef.wake.remote.transport.Link; 040import org.apache.reef.wake.remote.transport.Transport; 041import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; 042import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 043 044import java.io.IOException; 045import java.net.InetSocketAddress; 046import java.net.SocketAddress; 047import java.util.concurrent.BlockingQueue; 048import java.util.concurrent.LinkedBlockingQueue; 049import java.util.concurrent.TimeUnit; 050import java.util.logging.Level; 051import java.util.logging.Logger; 052 053/** 054 * Naming registry client. 055 */ 056public class NameRegistryClient implements Stage, NamingRegistry { 057 058 private static final Logger LOG = Logger.getLogger(NameRegistryClient.class.getName()); 059 060 private final SocketAddress serverSocketAddr; 061 private final Transport transport; 062 private final Codec<NamingMessage> codec; 063 private final BlockingQueue<NamingRegisterResponse> replyQueue; 064 private final long timeout; 065 066 /** 067 * Constructs a naming registry client. 068 * 069 * @param serverAddr a name server address 070 * @param serverPort a name server port 071 * @param factory an identifier factory 072 */ 073 NameRegistryClient( 074 final String serverAddr, final int serverPort, final IdentifierFactory factory, 075 final LocalAddressProvider localAddressProvider) { 076 this(serverAddr, serverPort, 10000, factory, localAddressProvider); 077 } 078 079 /** 080 * Constructs a naming registry client. 081 * 082 * @param serverAddr a name server address 083 * @param serverPort a name server port 084 * @param timeout timeout in ms 085 * @param factory an identifier factory 086 */ 087 NameRegistryClient(final String serverAddr, 088 final int serverPort, 089 final long timeout, 090 final IdentifierFactory factory, 091 final LocalAddressProvider localAddressProvider) { 092 093 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 094 this.timeout = timeout; 095 this.codec = NamingCodecFactory.createRegistryCodec(factory); 096 this.replyQueue = new LinkedBlockingQueue<>(); 097 098 final Injector injector = Tang.Factory.getTang().newInjector(); 099 injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress()); 100 injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, 101 new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec))); 102 103 try { 104 this.transport = injector.getInstance(NettyMessagingTransport.class); 105 } catch (final InjectionException e) { 106 throw new RuntimeException(e); 107 } 108 } 109 110 NameRegistryClient(final String serverAddr, final int serverPort, 111 final long timeout, final IdentifierFactory factory, 112 final BlockingQueue<NamingRegisterResponse> replyQueue, 113 final Transport transport) { 114 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 115 this.timeout = timeout; 116 this.codec = NamingCodecFactory.createFullCodec(factory); 117 this.replyQueue = replyQueue; 118 this.transport = transport; 119 } 120 121 /** 122 * Registers an (identifier, address) mapping. 123 * 124 * @param id an identifier 125 * @param addr an Internet socket address 126 */ 127 @Override 128 public void register(final Identifier id, final InetSocketAddress addr) throws Exception { 129 130 // needed to keep threads from reading the wrong response 131 // TODO: better fix matches replies to threads with a map after REEF-198 132 synchronized (this) { 133 134 LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr}); 135 136 final Link<NamingMessage> link = this.transport.open( 137 this.serverSocketAddr, this.codec, new LoggingLinkListener<NamingMessage>()); 138 139 link.write(new NamingRegisterRequest(new NameAssignmentTuple(id, addr))); 140 141 for (;;) { 142 try { 143 this.replyQueue.poll(this.timeout, TimeUnit.MILLISECONDS); 144 break; 145 } catch (final InterruptedException e) { 146 LOG.log(Level.INFO, "Interrupted", e); 147 throw new NamingException(e); 148 } 149 } 150 } 151 } 152 153 /** 154 * Unregisters an identifier. 155 * 156 * @param id an identifier 157 */ 158 @Override 159 public void unregister(final Identifier id) throws IOException { 160 final Link<NamingMessage> link = transport.open(serverSocketAddr, codec, 161 new LoggingLinkListener<NamingMessage>()); 162 link.write(new NamingUnregisterRequest(id)); 163 } 164 165 /** 166 * Closes resources. 167 */ 168 @Override 169 public void close() throws Exception { 170 // Should not close transport as we did not 171 // create it 172 } 173} 174 175/** 176 * Naming registry client transport event handler. 177 */ 178class NamingRegistryClientHandler implements EventHandler<TransportEvent> { 179 private static final Logger LOG = Logger.getLogger(NamingRegistryClientHandler.class.getName()); 180 181 private final EventHandler<NamingRegisterResponse> handler; 182 private final Codec<NamingMessage> codec; 183 184 NamingRegistryClientHandler(final EventHandler<NamingRegisterResponse> handler, final Codec<NamingMessage> codec) { 185 this.handler = handler; 186 this.codec = codec; 187 } 188 189 @Override 190 public void onNext(final TransportEvent value) { 191 LOG.log(Level.FINE, value.toString()); 192 handler.onNext((NamingRegisterResponse) codec.decode(value.getData())); 193 } 194} 195 196/** 197 * Naming register response handler. 198 */ 199class NamingRegistryResponseHandler implements EventHandler<NamingRegisterResponse> { 200 private static final Logger LOG = Logger.getLogger(NamingRegistryResponseHandler.class.getName()); 201 202 private final BlockingQueue<NamingRegisterResponse> replyQueue; 203 204 NamingRegistryResponseHandler(final BlockingQueue<NamingRegisterResponse> replyQueue) { 205 this.replyQueue = replyQueue; 206 } 207 208 @Override 209 public void onNext(final NamingRegisterResponse value) { 210 if (!replyQueue.offer(value)) { 211 LOG.log(Level.FINEST, "Element {0} was not added to the queue", value); 212 } 213 } 214}