001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.network.naming.exception.NamingRuntimeException; 022import org.apache.reef.io.network.naming.parameters.*; 023import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; 024import org.apache.reef.io.network.naming.serialization.NamingMessage; 025import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.wake.EventHandler; 028import org.apache.reef.wake.Identifier; 029import org.apache.reef.wake.IdentifierFactory; 030import org.apache.reef.wake.impl.SyncStage; 031import org.apache.reef.wake.remote.Codec; 032import org.apache.reef.wake.remote.address.LocalAddressProvider; 033import org.apache.reef.wake.remote.impl.TransportEvent; 034import org.apache.reef.wake.remote.transport.Transport; 035import org.apache.reef.wake.remote.transport.TransportFactory; 036 037import javax.inject.Inject; 038import java.io.IOException; 039import java.net.InetSocketAddress; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Naming client looking up remote server. 047 */ 048public final class NameClient implements NameResolver { 049 private static final Logger LOG = Logger.getLogger(NameClient.class.getName()); 050 051 private NameLookupClient lookupClient; 052 private NameRegistryClient registryClient; 053 private Transport transport; 054 055 /** 056 * Constructs a naming client. 057 * 058 * @param serverAddr a server address 059 * @param serverPort a server port number 060 * @param timeout timeout in ms 061 * @param factory an identifier factory 062 * @param retryCount the number of retries 063 * @param retryTimeout retry timeout 064 * @param localAddressProvider a local address provider 065 * @param tpFactory transport factory 066 */ 067 @Inject 068 private NameClient( 069 @Parameter(NameResolverNameServerAddr.class) final String serverAddr, 070 @Parameter(NameResolverNameServerPort.class) final int serverPort, 071 @Parameter(NameResolverCacheTimeout.class) final long timeout, 072 @Parameter(NameResolverIdentifierFactory.class) final IdentifierFactory factory, 073 @Parameter(NameResolverRetryCount.class) final int retryCount, 074 @Parameter(NameResolverRetryTimeout.class) final int retryTimeout, 075 final LocalAddressProvider localAddressProvider, 076 final TransportFactory tpFactory) { 077 078 final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<>(); 079 final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<>(); 080 final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); 081 082 this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0, 083 new SyncStage<>(new NamingClientEventHandler( 084 new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)), 085 null, retryCount, retryTimeout); 086 087 this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout, factory, 088 retryCount, retryTimeout, replyLookupQueue, this.transport); 089 090 this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout, 091 factory, replyRegisterQueue, this.transport); 092 } 093 094 /** 095 * Registers an (identifier, address) mapping. 096 * 097 * @param id an identifier 098 * @param addr an Internet socket address 099 */ 100 @Override 101 public void register(final Identifier id, final InetSocketAddress addr) 102 throws Exception { 103 LOG.log(Level.FINE, "Register {0} : {1}", new Object[]{id, addr}); 104 this.registryClient.register(id, addr); 105 } 106 107 /** 108 * Unregisters an identifier. 109 * 110 * @param id an identifier 111 */ 112 @Override 113 public void unregister(final Identifier id) throws IOException { 114 this.registryClient.unregister(id); 115 } 116 117 /** 118 * Finds an address for an identifier. 119 * 120 * @param id an identifier 121 * @return an Internet socket address 122 */ 123 @Override 124 public InetSocketAddress lookup(final Identifier id) throws Exception { 125 return this.lookupClient.lookup(id); 126 } 127 128 /** 129 * Retrieves an address for an identifier remotely. 130 * 131 * @param id an identifier 132 * @return an Internet socket address 133 * @throws Exception 134 */ 135 public InetSocketAddress remoteLookup(final Identifier id) throws Exception { 136 return this.lookupClient.remoteLookup(id); 137 } 138 139 /** 140 * Closes resources. 141 */ 142 @Override 143 public void close() throws Exception { 144 145 if (this.lookupClient != null) { 146 this.lookupClient.close(); 147 } 148 149 if (this.registryClient != null) { 150 this.registryClient.close(); 151 } 152 153 if (this.transport != null) { 154 this.transport.close(); 155 } 156 } 157} 158 159/** 160 * Naming client transport event handler. 161 */ 162class NamingClientEventHandler implements EventHandler<TransportEvent> { 163 164 private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName()); 165 166 private final EventHandler<NamingMessage> handler; 167 private final Codec<NamingMessage> codec; 168 169 NamingClientEventHandler( 170 final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) { 171 this.handler = handler; 172 this.codec = codec; 173 } 174 175 @Override 176 public void onNext(final TransportEvent value) { 177 LOG.log(Level.FINE, "Transport: ", value); 178 this.handler.onNext(this.codec.decode(value.getData())); 179 } 180} 181 182/** 183 * Naming response message handler. 184 */ 185class NamingResponseHandler implements EventHandler<NamingMessage> { 186 private static final Logger LOG = Logger.getLogger(NamingResponseHandler.class.getName()); 187 188 private final BlockingQueue<NamingLookupResponse> replyLookupQueue; 189 private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue; 190 191 NamingResponseHandler(final BlockingQueue<NamingLookupResponse> replyLookupQueue, 192 final BlockingQueue<NamingRegisterResponse> replyRegisterQueue) { 193 this.replyLookupQueue = replyLookupQueue; 194 this.replyRegisterQueue = replyRegisterQueue; 195 } 196 197 @Override 198 public void onNext(final NamingMessage value) { 199 if (value instanceof NamingLookupResponse) { 200 if (!replyLookupQueue.offer((NamingLookupResponse) value)) { 201 LOG.log(Level.FINEST, "Element {0} was not added to the queue", value); 202 } 203 } else if (value instanceof NamingRegisterResponse) { 204 if (!replyRegisterQueue.offer((NamingRegisterResponse) value)) { 205 LOG.log(Level.FINEST, "Element {0} was not added to the queue", value); 206 } 207 } else { 208 throw new NamingRuntimeException("Unknown naming response message"); 209 } 210 211 } 212 213}