001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.NamingRegistry; 022import org.apache.reef.io.network.naming.exception.NamingException; 023import org.apache.reef.io.network.naming.serialization.NamingMessage; 024import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest; 025import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse; 026import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest; 027import org.apache.reef.wake.EventHandler; 028import org.apache.reef.wake.Identifier; 029import org.apache.reef.wake.IdentifierFactory; 030import org.apache.reef.wake.Stage; 031import org.apache.reef.wake.impl.SyncStage; 032import org.apache.reef.wake.remote.Codec; 033import org.apache.reef.wake.remote.NetUtils; 034import org.apache.reef.wake.remote.impl.TransportEvent; 035import org.apache.reef.wake.remote.transport.Link; 036import org.apache.reef.wake.remote.transport.LinkListener; 037import org.apache.reef.wake.remote.transport.Transport; 038import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; 039import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 040 041import java.io.IOException; 042import java.net.InetSocketAddress; 043import java.net.SocketAddress; 044import java.util.concurrent.BlockingQueue; 045import java.util.concurrent.LinkedBlockingQueue; 046import java.util.concurrent.TimeUnit; 047import java.util.logging.Level; 048import java.util.logging.Logger; 049 050/** 051 * Naming registry client 052 */ 053public class NameRegistryClient implements Stage, NamingRegistry { 054 055 private static final Logger LOG = Logger.getLogger(NameRegistryClient.class.getName()); 056 057 private final SocketAddress serverSocketAddr; 058 private final Transport transport; 059 private final Codec<NamingMessage> codec; 060 private final BlockingQueue<NamingRegisterResponse> replyQueue; 061 private final long timeout; 062 063 /** 064 * Constructs a naming registry client 065 * 066 * @param serverAddr a name server address 067 * @param serverPort a name server port 068 * @param factory an identifier factory 069 */ 070 public NameRegistryClient( 071 final String serverAddr, final int serverPort, final IdentifierFactory factory) { 072 this(serverAddr, serverPort, 10000, factory); 073 } 074 075 /** 076 * Constructs a naming registry client 077 * 078 * @param serverAddr a name server address 079 * @param serverPort a name server port 080 * @param timeout timeout in ms 081 * @param factory an identifier factory 082 */ 083 public NameRegistryClient(final String serverAddr, final int serverPort, 084 final long timeout, final IdentifierFactory factory) { 085 086 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 087 this.timeout = timeout; 088 this.codec = NamingCodecFactory.createRegistryCodec(factory); 089 this.replyQueue = new LinkedBlockingQueue<>(); 090 this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0, 091 new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)), 092 null, 3, 10000); 093 } 094 095 public NameRegistryClient(final String serverAddr, final int serverPort, 096 final long timeout, final IdentifierFactory factory, 097 final BlockingQueue<NamingRegisterResponse> replyQueue, 098 final Transport transport) { 099 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 100 this.timeout = timeout; 101 this.codec = NamingCodecFactory.createFullCodec(factory); 102 this.replyQueue = replyQueue; 103 this.transport = transport; 104 } 105 106 /** 107 * Registers an (identifier, address) mapping 108 * 109 * @param id an identifier 110 * @param addr an Internet socket address 111 */ 112 @Override 113 public void register(final Identifier id, final InetSocketAddress addr) throws Exception { 114 115 // needed to keep threads from reading the wrong response 116 // TODO: better fix matches replies to threads with a map after REEF-198 117 synchronized (this) { 118 119 LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr}); 120 121 final Link<NamingMessage> link = this.transport.open( 122 this.serverSocketAddr, this.codec, new LoggingLinkListener<NamingMessage>()); 123 124 link.write(new NamingRegisterRequest(new NameAssignmentTuple(id, addr))); 125 126 for (; ; ) { 127 try { 128 this.replyQueue.poll(this.timeout, TimeUnit.MILLISECONDS); 129 break; 130 } catch (final InterruptedException e) { 131 LOG.log(Level.INFO, "Interrupted", e); 132 throw new NamingException(e); 133 } 134 } 135 } 136 } 137 138 /** 139 * Unregisters an identifier 140 * 141 * @param id an identifier 142 */ 143 @Override 144 public void unregister(Identifier id) throws IOException { 145 Link<NamingMessage> link = transport.open(serverSocketAddr, codec, 146 new LinkListener<NamingMessage>() { 147 @Override 148 public void messageReceived(NamingMessage message) { 149 } 150 }); 151 link.write(new NamingUnregisterRequest(id)); 152 } 153 154 /** 155 * Closes resources 156 */ 157 @Override 158 public void close() throws Exception { 159 // Should not close transport as we did not 160 // create it 161 } 162} 163 164/** 165 * Naming registry client transport event handler 166 */ 167class NamingRegistryClientHandler implements EventHandler<TransportEvent> { 168 private static final Logger LOG = Logger.getLogger(NamingRegistryClientHandler.class.getName()); 169 170 private final EventHandler<NamingRegisterResponse> handler; 171 private final Codec<NamingMessage> codec; 172 173 NamingRegistryClientHandler(EventHandler<NamingRegisterResponse> handler, Codec<NamingMessage> codec) { 174 this.handler = handler; 175 this.codec = codec; 176 } 177 178 @Override 179 public void onNext(TransportEvent value) { 180 LOG.log(Level.FINE, value.toString()); 181 handler.onNext((NamingRegisterResponse) codec.decode(value.getData())); 182 } 183} 184 185/** 186 * Naming register response handler 187 */ 188class NamingRegistryResponseHandler implements EventHandler<NamingRegisterResponse> { 189 190 private final BlockingQueue<NamingRegisterResponse> replyQueue; 191 192 NamingRegistryResponseHandler(BlockingQueue<NamingRegisterResponse> replyQueue) { 193 this.replyQueue = replyQueue; 194 } 195 196 @Override 197 public void onNext(NamingRegisterResponse value) { 198 replyQueue.offer(value); 199 } 200}