001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.network.naming.exception.NamingRuntimeException; 022import org.apache.reef.io.network.naming.parameters.*; 023import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; 024import org.apache.reef.io.network.naming.serialization.NamingMessage; 025import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.wake.EventHandler; 028import org.apache.reef.wake.Identifier; 029import org.apache.reef.wake.IdentifierFactory; 030import org.apache.reef.wake.impl.SyncStage; 031import org.apache.reef.wake.remote.Codec; 032import org.apache.reef.wake.remote.address.LocalAddressProvider; 033import org.apache.reef.wake.remote.impl.TransportEvent; 034import org.apache.reef.wake.remote.transport.Transport; 035import org.apache.reef.wake.remote.transport.TransportFactory; 036 037import javax.inject.Inject; 038import java.io.IOException; 039import java.net.InetSocketAddress; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Naming client looking up remote server. 047 */ 048public final class NameClient implements NameResolver { 049 private static final Logger LOG = Logger.getLogger(NameClient.class.getName()); 050 051 private NameLookupClient lookupClient; 052 private NameRegistryClient registryClient; 053 private Transport transport; 054 055 /** 056 * Constructs a naming client. 057 * 058 * @param serverAddr a server address 059 * @param serverPort a server port number 060 * @param timeout timeout in ms 061 * @param factory an identifier factory 062 * @param retryCount the number of retries 063 * @param retryTimeout retry timeout 064 * @param localAddressProvider a local address provider 065 * @param tpFactory transport factory 066 */ 067 @Inject 068 @SuppressWarnings("deprecation") 069 private NameClient( 070 @Parameter(NameResolverNameServerAddr.class) final String serverAddr, 071 @Parameter(NameResolverNameServerPort.class) final int serverPort, 072 @Parameter(NameResolverCacheTimeout.class) final long timeout, 073 @Parameter(NameResolverIdentifierFactory.class) final IdentifierFactory factory, 074 @Parameter(NameResolverRetryCount.class) final int retryCount, 075 @Parameter(NameResolverRetryTimeout.class) final int retryTimeout, 076 final LocalAddressProvider localAddressProvider, 077 final TransportFactory tpFactory) { 078 079 final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<>(); 080 final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<>(); 081 final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); 082 083 this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0, 084 new SyncStage<>(new NamingClientEventHandler( 085 new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)), 086 null, retryCount, retryTimeout); 087 088 this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout, factory, 089 retryCount, retryTimeout, replyLookupQueue, this.transport); 090 091 this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout, 092 factory, replyRegisterQueue, this.transport); 093 } 094 095 /** 096 * Registers an (identifier, address) mapping. 097 * 098 * @param id an identifier 099 * @param addr an Internet socket address 100 */ 101 @Override 102 public void register(final Identifier id, final InetSocketAddress addr) 103 throws Exception { 104 LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr}); 105 this.registryClient.register(id, addr); 106 } 107 108 /** 109 * Unregisters an identifier. 110 * 111 * @param id an identifier 112 */ 113 @Override 114 public void unregister(final Identifier id) throws IOException { 115 this.registryClient.unregister(id); 116 } 117 118 /** 119 * Finds an address for an identifier. 120 * 121 * @param id an identifier 122 * @return an Internet socket address 123 */ 124 @Override 125 public InetSocketAddress lookup(final Identifier id) throws Exception { 126 return this.lookupClient.lookup(id); 127 } 128 129 /** 130 * Retrieves an address for an identifier remotely. 131 * 132 * @param id an identifier 133 * @return an Internet socket address 134 * @throws Exception 135 */ 136 public InetSocketAddress remoteLookup(final Identifier id) throws Exception { 137 return this.lookupClient.remoteLookup(id); 138 } 139 140 /** 141 * Closes resources. 142 */ 143 @Override 144 public void close() throws Exception { 145 146 if (this.lookupClient != null) { 147 this.lookupClient.close(); 148 } 149 150 if (this.registryClient != null) { 151 this.registryClient.close(); 152 } 153 154 if (this.transport != null) { 155 this.transport.close(); 156 } 157 } 158} 159 160/** 161 * Naming client transport event handler. 162 */ 163class NamingClientEventHandler implements EventHandler<TransportEvent> { 164 165 private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName()); 166 167 private final EventHandler<NamingMessage> handler; 168 private final Codec<NamingMessage> codec; 169 170 NamingClientEventHandler( 171 final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) { 172 this.handler = handler; 173 this.codec = codec; 174 } 175 176 @Override 177 public void onNext(final TransportEvent value) { 178 LOG.log(Level.FINE, "Transport: ", value); 179 this.handler.onNext(this.codec.decode(value.getData())); 180 } 181} 182 183/** 184 * Naming response message handler. 185 */ 186class NamingResponseHandler implements EventHandler<NamingMessage> { 187 private static final Logger LOG = Logger.getLogger(NamingResponseHandler.class.getName()); 188 189 private final BlockingQueue<NamingLookupResponse> replyLookupQueue; 190 private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue; 191 192 NamingResponseHandler(final BlockingQueue<NamingLookupResponse> replyLookupQueue, 193 final BlockingQueue<NamingRegisterResponse> replyRegisterQueue) { 194 this.replyLookupQueue = replyLookupQueue; 195 this.replyRegisterQueue = replyRegisterQueue; 196 } 197 198 @Override 199 public void onNext(final NamingMessage value) { 200 if (value instanceof NamingLookupResponse) { 201 if (!replyLookupQueue.offer((NamingLookupResponse) value)) { 202 LOG.log(Level.FINEST, "Element {0} was not added to the queue", value); 203 } 204 } else if (value instanceof NamingRegisterResponse) { 205 if (!replyRegisterQueue.offer((NamingRegisterResponse) value)) { 206 LOG.log(Level.FINEST, "Element {0} was not added to the queue", value); 207 } 208 } else { 209 throw new NamingRuntimeException("Unknown naming response message"); 210 } 211 212 } 213 214}