001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.naming; 020 021import org.apache.reef.io.naming.Naming; 022import org.apache.reef.io.network.Cache; 023import org.apache.reef.io.network.naming.exception.NamingRuntimeException; 024import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; 025import org.apache.reef.io.network.naming.serialization.NamingMessage; 026import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse; 027import org.apache.reef.wake.EventHandler; 028import org.apache.reef.wake.Identifier; 029import org.apache.reef.wake.IdentifierFactory; 030import org.apache.reef.wake.Stage; 031import org.apache.reef.wake.impl.SyncStage; 032import org.apache.reef.wake.remote.Codec; 033import org.apache.reef.wake.remote.NetUtils; 034import org.apache.reef.wake.remote.impl.TransportEvent; 035import org.apache.reef.wake.remote.transport.Transport; 036import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; 037 038import java.io.IOException; 039import java.net.InetSocketAddress; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Naming client 047 */ 048public class NameClient implements Stage, Naming { 049 private static final Logger LOG = Logger.getLogger(NameClient.class.getName()); 050 051 private NameLookupClient lookupClient; 052 private NameRegistryClient registryClient; 053 private Transport transport; 054 055 /** 056 * Constructs a naming client 057 * 058 * @param serverAddr a server address 059 * @param serverPort a server port number 060 * @param factory an identifier factory 061 * @param cache a cache 062 */ 063 public NameClient(String serverAddr, int serverPort, 064 IdentifierFactory factory, int retryCount, int retryTimeout, 065 Cache<Identifier, InetSocketAddress> cache) { 066 this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache); 067 } 068 069 /** 070 * Constructs a naming client 071 * 072 * @param serverAddr a server address 073 * @param serverPort a server port number 074 * @param timeout timeout in ms 075 * @param factory an identifier factory 076 * @param cache a cache 077 */ 078 public NameClient(final String serverAddr, final int serverPort, final long timeout, 079 final IdentifierFactory factory, final int retryCount, final int retryTimeout, 080 final Cache<Identifier, InetSocketAddress> cache) { 081 082 final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>(); 083 final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>(); 084 final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); 085 086 this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0, 087 new SyncStage<>(new NamingClientEventHandler( 088 new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)), 089 null, retryCount, retryTimeout); 090 091 this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout, 092 factory, retryCount, retryTimeout, replyLookupQueue, this.transport, cache); 093 094 this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout, 095 factory, replyRegisterQueue, this.transport); 096 } 097 098 /** 099 * Registers an (identifier, address) mapping 100 * 101 * @param id an identifier 102 * @param addr an Internet socket address 103 */ 104 @Override 105 public void register(final Identifier id, final InetSocketAddress addr) 106 throws Exception { 107 LOG.log(Level.FINE, "Refister {0} : {1}", new Object[]{id, addr}); 108 this.registryClient.register(id, addr); 109 } 110 111 /** 112 * Unregisters an identifier 113 * 114 * @param id an identifier 115 */ 116 @Override 117 public void unregister(final Identifier id) throws IOException { 118 this.registryClient.unregister(id); 119 } 120 121 /** 122 * Finds an address for an identifier 123 * 124 * @param id an identifier 125 * @return an Internet socket address 126 */ 127 @Override 128 public InetSocketAddress lookup(final Identifier id) throws Exception { 129 return this.lookupClient.lookup(id); 130 } 131 132 /** 133 * Retrieves an address for an identifier remotely 134 * 135 * @param id an identifier 136 * @return an Internet socket address 137 * @throws Exception 138 */ 139 public InetSocketAddress remoteLookup(final Identifier id) throws Exception { 140 return this.lookupClient.remoteLookup(id); 141 } 142 143 /** 144 * Closes resources 145 */ 146 @Override 147 public void close() throws Exception { 148 149 if (this.lookupClient != null) { 150 this.lookupClient.close(); 151 } 152 153 if (this.registryClient != null) { 154 this.registryClient.close(); 155 } 156 157 if (this.transport != null) { 158 this.transport.close(); 159 } 160 } 161} 162 163/** 164 * Naming client transport event handler 165 */ 166class NamingClientEventHandler implements EventHandler<TransportEvent> { 167 168 private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName()); 169 170 private final EventHandler<NamingMessage> handler; 171 private final Codec<NamingMessage> codec; 172 173 public NamingClientEventHandler( 174 final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) { 175 this.handler = handler; 176 this.codec = codec; 177 } 178 179 @Override 180 public void onNext(final TransportEvent value) { 181 LOG.log(Level.FINE, "Transport: ", value); 182 this.handler.onNext(this.codec.decode(value.getData())); 183 } 184} 185 186/** 187 * Naming response message handler 188 */ 189class NamingResponseHandler implements EventHandler<NamingMessage> { 190 191 private final BlockingQueue<NamingLookupResponse> replyLookupQueue; 192 private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue; 193 194 NamingResponseHandler(BlockingQueue<NamingLookupResponse> replyLookupQueue, 195 BlockingQueue<NamingRegisterResponse> replyRegisterQueue) { 196 this.replyLookupQueue = replyLookupQueue; 197 this.replyRegisterQueue = replyRegisterQueue; 198 } 199 200 @Override 201 public void onNext(NamingMessage value) { 202 if (value instanceof NamingLookupResponse) { 203 replyLookupQueue.offer((NamingLookupResponse) value); 204 } else if (value instanceof NamingRegisterResponse) { 205 replyRegisterQueue.offer((NamingRegisterResponse) value); 206 } else { 207 throw new NamingRuntimeException("Unknown naming response message"); 208 } 209 210 } 211 212}