001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.NameAssignment; 022import org.apache.reef.io.naming.NamingLookup; 023import org.apache.reef.io.network.Cache; 024import org.apache.reef.io.network.naming.exception.NamingException; 025import org.apache.reef.io.network.naming.serialization.NamingLookupRequest; 026import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; 027import org.apache.reef.io.network.naming.serialization.NamingMessage; 028import org.apache.reef.tang.annotations.Name; 029import org.apache.reef.tang.annotations.NamedParameter; 030import org.apache.reef.wake.EventHandler; 031import org.apache.reef.wake.Identifier; 032import org.apache.reef.wake.IdentifierFactory; 033import org.apache.reef.wake.Stage; 034import org.apache.reef.wake.impl.SyncStage; 035import org.apache.reef.wake.remote.Codec; 036import org.apache.reef.wake.remote.NetUtils; 037import org.apache.reef.wake.remote.impl.TransportEvent; 038import org.apache.reef.wake.remote.transport.Link; 039import org.apache.reef.wake.remote.transport.Transport; 040import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; 041import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 042 043import java.net.InetSocketAddress; 044import java.net.SocketAddress; 045import java.util.Arrays; 046import java.util.List; 047import java.util.concurrent.BlockingQueue; 048import java.util.concurrent.Callable; 049import java.util.concurrent.LinkedBlockingQueue; 050import java.util.concurrent.TimeUnit; 051import java.util.logging.Level; 052import java.util.logging.Logger; 053 054/** 055 * Naming lookup client 056 */ 057public class NameLookupClient implements Stage, NamingLookup { 058 059 private static final Logger LOG = Logger.getLogger(NameLookupClient.class.getName()); 060 private final SocketAddress serverSocketAddr; 061 private final Transport transport; 062 private final Codec<NamingMessage> codec; 063 private final BlockingQueue<NamingLookupResponse> replyQueue; 064 private final long timeout; 065 private final Cache<Identifier, InetSocketAddress> cache; 066 private final int retryCount; 067 private final int retryTimeout; 068 069 /** 070 * Constructs a naming lookup client 071 * 072 * @param serverAddr a server address 073 * @param serverPort a server port number 074 * @param factory an identifier factory 075 * @param cache an cache 076 */ 077 public NameLookupClient(final String serverAddr, final int serverPort, 078 final IdentifierFactory factory, final int retryCount, final int retryTimeout, 079 final Cache<Identifier, InetSocketAddress> cache) { 080 this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache); 081 } 082 083 /** 084 * Constructs a naming lookup client 085 * 086 * @param serverAddr a server address 087 * @param serverPort a server port number 088 * @param timeout request timeout in ms 089 * @param factory an identifier factory 090 * @param cache an cache 091 */ 092 public NameLookupClient(final String serverAddr, final int serverPort, final long timeout, 093 final IdentifierFactory factory, final int retryCount, final int retryTimeout, 094 final Cache<Identifier, InetSocketAddress> cache) { 095 096 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 097 this.timeout = timeout; 098 this.cache = cache; 099 this.codec = NamingCodecFactory.createLookupCodec(factory); 100 this.replyQueue = new LinkedBlockingQueue<>(); 101 102 this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0, 103 new SyncStage<>(new NamingLookupClientHandler( 104 new NamingLookupResponseHandler(this.replyQueue), this.codec)), 105 null, retryCount, retryTimeout); 106 107 this.retryCount = retryCount; 108 this.retryTimeout = retryTimeout; 109 } 110 111 NameLookupClient(final String serverAddr, final int serverPort, final long timeout, 112 final IdentifierFactory factory, final int retryCount, final int retryTimeout, 113 final BlockingQueue<NamingLookupResponse> replyQueue, final Transport transport, 114 final Cache<Identifier, InetSocketAddress> cache) { 115 116 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 117 this.timeout = timeout; 118 this.cache = cache; 119 this.codec = NamingCodecFactory.createFullCodec(factory); 120 this.replyQueue = replyQueue; 121 this.transport = transport; 122 this.retryCount = retryCount; 123 this.retryTimeout = retryTimeout; 124 } 125 126 /** 127 * Finds an address for an identifier 128 * 129 * @param id an identifier 130 * @return an Internet socket address 131 */ 132 @Override 133 public InetSocketAddress lookup(final Identifier id) throws Exception { 134 135 return cache.get(id, new Callable<InetSocketAddress>() { 136 137 @Override 138 public InetSocketAddress call() throws Exception { 139 final int origRetryCount = NameLookupClient.this.retryCount; 140 int retryCount = origRetryCount; 141 while (true) { 142 try { 143 return remoteLookup(id); 144 } catch (final NamingException e) { 145 if (retryCount <= 0) { 146 throw e; 147 } else { 148 final int retryTimeout = NameLookupClient.this.retryTimeout 149 * (origRetryCount - retryCount + 1); 150 LOG.log(Level.WARNING, 151 "Caught Naming Exception while looking up " + id 152 + " with Name Server. Will retry " + retryCount 153 + " time(s) after waiting for " + retryTimeout + " msec."); 154 Thread.sleep(retryTimeout * retryCount); 155 --retryCount; 156 } 157 } 158 } 159 } 160 161 }); 162 } 163 164 /** 165 * Retrieves an address for an identifier remotely 166 * 167 * @param id an identifier 168 * @return an Internet socket address 169 * @throws Exception 170 */ 171 public InetSocketAddress remoteLookup(final Identifier id) throws Exception { 172 // the lookup is not thread-safe, because concurrent replies may 173 // be read by the wrong thread. 174 // TODO: better fix uses a map of id's after REEF-198 175 synchronized (this) { 176 177 LOG.log(Level.INFO, "Looking up {0} on NameServer {1}", new Object[]{id, serverSocketAddr}); 178 179 final List<Identifier> ids = Arrays.asList(id); 180 final Link<NamingMessage> link = transport.open(serverSocketAddr, codec, 181 new LoggingLinkListener<NamingMessage>()); 182 link.write(new NamingLookupRequest(ids)); 183 184 final NamingLookupResponse resp; 185 for (; ; ) { 186 try { 187 resp = replyQueue.poll(timeout, TimeUnit.MILLISECONDS); 188 break; 189 } catch (final InterruptedException e) { 190 LOG.log(Level.INFO, "Lookup interrupted", e); 191 throw new NamingException(e); 192 } 193 } 194 195 final List<NameAssignment> list = resp.getNameAssignments(); 196 if (list.isEmpty()) { 197 throw new NamingException("Cannot find " + id + " from the name server"); 198 } else { 199 return list.get(0).getAddress(); 200 } 201 } 202 } 203 204 /** 205 * Closes resources 206 */ 207 @Override 208 public void close() throws Exception { 209 // Should not close transport as we did not 210 // create it 211 } 212 213 @NamedParameter(doc = "When should a retry timeout(msec)?", short_name = "retryTimeout", default_value = "100") 214 public static class RetryTimeout implements Name<Integer> { 215 } 216 217 @NamedParameter(doc = "How many times should I retry?", short_name = "retryCount", default_value = "10") 218 public static class RetryCount implements Name<Integer> { 219 } 220} 221 222/** 223 * Naming lookup client transport event handler 224 */ 225class NamingLookupClientHandler implements EventHandler<TransportEvent> { 226 227 private final EventHandler<NamingLookupResponse> handler; 228 private final Codec<NamingMessage> codec; 229 230 NamingLookupClientHandler(final EventHandler<NamingLookupResponse> handler, final Codec<NamingMessage> codec) { 231 this.handler = handler; 232 this.codec = codec; 233 } 234 235 @Override 236 public void onNext(final TransportEvent value) { 237 handler.onNext((NamingLookupResponse) codec.decode(value.getData())); 238 } 239 240} 241 242/** 243 * Naming lookup response handler 244 */ 245class NamingLookupResponseHandler implements EventHandler<NamingLookupResponse> { 246 247 private final BlockingQueue<NamingLookupResponse> replyQueue; 248 249 NamingLookupResponseHandler(final BlockingQueue<NamingLookupResponse> replyQueue) { 250 this.replyQueue = replyQueue; 251 } 252 253 @Override 254 public void onNext(final NamingLookupResponse value) { 255 replyQueue.offer(value); 256 } 257}