001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.NameAssignment; 022import org.apache.reef.io.naming.NamingLookup; 023import org.apache.reef.io.network.naming.exception.NamingException; 024import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout; 025import org.apache.reef.io.network.naming.parameters.NameResolverIdentifierFactory; 026import org.apache.reef.io.network.naming.parameters.NameResolverNameServerAddr; 027import org.apache.reef.io.network.naming.parameters.NameResolverNameServerPort; 028import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount; 029import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout; 030import org.apache.reef.io.network.naming.serialization.NamingLookupRequest; 031import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; 032import org.apache.reef.io.network.naming.serialization.NamingMessage; 033import org.apache.reef.tang.annotations.Parameter; 034import org.apache.reef.util.cache.Cache; 035import org.apache.reef.wake.EventHandler; 036import org.apache.reef.wake.Identifier; 037import org.apache.reef.wake.IdentifierFactory; 038import org.apache.reef.wake.Stage; 039import org.apache.reef.wake.impl.SyncStage; 040import org.apache.reef.wake.remote.Codec; 041import org.apache.reef.wake.remote.address.LocalAddressProvider; 042import org.apache.reef.wake.remote.impl.TransportEvent; 043import org.apache.reef.wake.remote.transport.Link; 044import org.apache.reef.wake.remote.transport.Transport; 045import org.apache.reef.wake.remote.transport.TransportFactory; 046import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; 047 048import javax.inject.Inject; 049import java.net.InetSocketAddress; 050import java.net.SocketAddress; 051import java.util.Arrays; 052import java.util.List; 053import java.util.concurrent.BlockingQueue; 054import java.util.concurrent.Callable; 055import java.util.concurrent.LinkedBlockingQueue; 056import java.util.concurrent.TimeUnit; 057import java.util.logging.Level; 058import java.util.logging.Logger; 059 060/** 061 * Naming lookup client. 062 */ 063public final class NameLookupClient implements Stage, NamingLookup { 064 065 private static final Logger LOG = Logger.getLogger(NameLookupClient.class.getName()); 066 private final SocketAddress serverSocketAddr; 067 private final Transport transport; 068 private final Codec<NamingMessage> codec; 069 private final BlockingQueue<NamingLookupResponse> replyQueue; 070 private final long timeout; 071 private final Cache<Identifier, InetSocketAddress> cache; 072 private final int retryCount; 073 private final int retryTimeout; 074 075 076 /** 077 * Constructs a naming lookup client. 078 * 079 * @param serverAddr a server address 080 * @param serverPort a server port number 081 * @param timeout request timeout in ms 082 * @param factory an identifier factory 083 * @param retryCount a count of retrying lookup 084 * @param retryTimeout retry timeout 085 * @param replyQueue a reply queue 086 * @param transport a transport 087 */ 088 NameLookupClient(final String serverAddr, 089 final int serverPort, 090 final long timeout, 091 final IdentifierFactory factory, 092 final int retryCount, 093 final int retryTimeout, 094 final BlockingQueue<NamingLookupResponse> replyQueue, 095 final Transport transport) { 096 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 097 this.timeout = timeout; 098 this.cache = new NameCache(timeout); 099 this.codec = NamingCodecFactory.createFullCodec(factory); 100 this.replyQueue = replyQueue; 101 this.retryCount = retryCount; 102 this.retryTimeout = retryTimeout; 103 this.transport = transport; 104 } 105 106 /** 107 * Constructs a naming lookup client. 108 * 109 * @param serverAddr a server address 110 * @param serverPort a server port number 111 * @param timeout request timeout in ms 112 * @param factory an identifier factory 113 * @param tpFactory a transport factory 114 */ 115 @Inject 116 private NameLookupClient( 117 @Parameter(NameResolverNameServerAddr.class) final String serverAddr, 118 @Parameter(NameResolverNameServerPort.class) final int serverPort, 119 @Parameter(NameResolverCacheTimeout.class) final long timeout, 120 @Parameter(NameResolverIdentifierFactory.class) final IdentifierFactory factory, 121 @Parameter(NameResolverRetryCount.class) final int retryCount, 122 @Parameter(NameResolverRetryTimeout.class) final int retryTimeout, 123 final LocalAddressProvider localAddressProvider, 124 final TransportFactory tpFactory) { 125 this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); 126 this.timeout = timeout; 127 this.cache = new NameCache(timeout); 128 this.codec = NamingCodecFactory.createLookupCodec(factory); 129 this.replyQueue = new LinkedBlockingQueue<>(); 130 131 this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0, 132 new SyncStage<>(new NamingLookupClientHandler( 133 new NamingLookupResponseHandler(this.replyQueue), this.codec)), 134 null, retryCount, retryTimeout); 135 136 this.retryCount = retryCount; 137 this.retryTimeout = retryTimeout; 138 } 139 140 /** 141 * Finds an address for an identifier. 142 * 143 * @param id an identifier 144 * @return an Internet socket address 145 */ 146 @Override 147 public InetSocketAddress lookup(final Identifier id) throws Exception { 148 149 return cache.get(id, new Callable<InetSocketAddress>() { 150 151 @Override 152 public InetSocketAddress call() throws Exception { 153 final int origRetryCount = NameLookupClient.this.retryCount; 154 int retriesLeft = origRetryCount; 155 while (true) { 156 try { 157 return remoteLookup(id); 158 } catch (final NamingException e) { 159 if (retriesLeft <= 0) { 160 throw e; 161 } else { 162 final int currentRetryTimeout = NameLookupClient.this.retryTimeout 163 * (origRetryCount - retriesLeft + 1); 164 LOG.log(Level.WARNING, 165 "Caught Naming Exception while looking up " + id 166 + " with Name Server. Will retry " + retriesLeft 167 + " time(s) after waiting for " + currentRetryTimeout + " msec."); 168 Thread.sleep(currentRetryTimeout); 169 --retriesLeft; 170 } 171 } 172 } 173 } 174 175 }); 176 } 177 178 /** 179 * Retrieves an address for an identifier remotely. 180 * 181 * @param id an identifier 182 * @return an Internet socket address 183 * @throws Exception 184 */ 185 public InetSocketAddress remoteLookup(final Identifier id) throws Exception { 186 // the lookup is not thread-safe, because concurrent replies may 187 // be read by the wrong thread. 188 // TODO: better fix uses a map of id's after REEF-198 189 synchronized (this) { 190 191 LOG.log(Level.INFO, "Looking up {0} on NameServer {1}", new Object[]{id, serverSocketAddr}); 192 193 final List<Identifier> ids = Arrays.asList(id); 194 final Link<NamingMessage> link = transport.open(serverSocketAddr, codec, 195 new LoggingLinkListener<NamingMessage>()); 196 link.write(new NamingLookupRequest(ids)); 197 198 final NamingLookupResponse resp; 199 for (;;) { 200 try { 201 resp = replyQueue.poll(timeout, TimeUnit.MILLISECONDS); 202 break; 203 } catch (final InterruptedException e) { 204 LOG.log(Level.INFO, "Lookup interrupted", e); 205 throw new NamingException(e); 206 } 207 } 208 209 final List<NameAssignment> list = resp.getNameAssignments(); 210 if (list.isEmpty()) { 211 throw new NamingException("Cannot find " + id + " from the name server"); 212 } else { 213 return list.get(0).getAddress(); 214 } 215 } 216 } 217 218 /** 219 * Closes resources. 220 */ 221 @Override 222 public void close() throws Exception { 223 // Should not close transport as we did not 224 // create it 225 } 226} 227 228/** 229 * Naming lookup client transport event handler. 230 */ 231class NamingLookupClientHandler implements EventHandler<TransportEvent> { 232 233 private final EventHandler<NamingLookupResponse> handler; 234 private final Codec<NamingMessage> codec; 235 236 NamingLookupClientHandler(final EventHandler<NamingLookupResponse> handler, final Codec<NamingMessage> codec) { 237 this.handler = handler; 238 this.codec = codec; 239 } 240 241 @Override 242 public void onNext(final TransportEvent value) { 243 handler.onNext((NamingLookupResponse) codec.decode(value.getData())); 244 } 245 246} 247 248/** 249 * Naming lookup response handler. 250 */ 251class NamingLookupResponseHandler implements EventHandler<NamingLookupResponse> { 252 private static final Logger LOG = Logger.getLogger(NamingLookupResponseHandler.class.getName()); 253 254 private final BlockingQueue<NamingLookupResponse> replyQueue; 255 256 NamingLookupResponseHandler(final BlockingQueue<NamingLookupResponse> replyQueue) { 257 this.replyQueue = replyQueue; 258 } 259 260 @Override 261 public void onNext(final NamingLookupResponse value) { 262 if (!replyQueue.offer(value)) { 263 LOG.log(Level.FINEST, "Element {0} was not added to the queue", value); 264 } 265 } 266}