001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.transport.netty; 020 021import org.apache.reef.tang.Injector; 022import org.apache.reef.tang.Tang; 023import org.apache.reef.tang.exceptions.InjectionException; 024import org.apache.reef.wake.EStage; 025import org.apache.reef.wake.EventHandler; 026import org.apache.reef.wake.impl.SyncStage; 027import org.apache.reef.wake.remote.RemoteConfiguration; 028import org.apache.reef.wake.remote.address.LocalAddressProvider; 029import org.apache.reef.wake.remote.impl.TransportEvent; 030import org.apache.reef.wake.remote.ports.TcpPortProvider; 031import org.apache.reef.wake.remote.transport.Transport; 032import org.apache.reef.wake.remote.transport.TransportFactory; 033 034import javax.inject.Inject; 035 036/** 037 * Factory that creates a messaging transport. 038 */ 039public final class MessagingTransportFactory implements TransportFactory { 040 041 private final String localAddress; 042 043 @Inject 044 private MessagingTransportFactory(final LocalAddressProvider localAddressProvider) { 045 this.localAddress = localAddressProvider.getLocalAddress(); 046 } 047 048 /** 049 * Creates a transport. 050 * 051 * @param port a listening port 052 * @param clientHandler a transport client side handler 053 * @param serverHandler a transport server side handler 054 * @param exHandler a exception handler 055 */ 056 @Override 057 public Transport newInstance(final int port, 058 final EventHandler<TransportEvent> clientHandler, 059 final EventHandler<TransportEvent> serverHandler, 060 final EventHandler<Exception> exHandler) { 061 062 final Injector injector = Tang.Factory.getTang().newInjector(); 063 injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress); 064 injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); 065 injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler)); 066 injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(serverHandler)); 067 068 final Transport transport; 069 try { 070 transport = injector.getInstance(NettyMessagingTransport.class); 071 transport.registerErrorHandler(exHandler); 072 return transport; 073 } catch (final InjectionException e) { 074 throw new RuntimeException(e); 075 } 076 } 077 078 /** 079 * Creates a transport. 080 * 081 * @param hostAddress a host address 082 * @param port a listening port 083 * @param clientStage a client stage 084 * @param serverStage a server stage 085 * @param numberOfTries a number of tries 086 * @param retryTimeout a timeout for retry 087 */ 088 @Override 089 public Transport newInstance(final String hostAddress, 090 final int port, 091 final EStage<TransportEvent> clientStage, 092 final EStage<TransportEvent> serverStage, 093 final int numberOfTries, 094 final int retryTimeout) { 095 try { 096 TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class); 097 return newInstance(hostAddress, port, clientStage, 098 serverStage, numberOfTries, retryTimeout, tcpPortProvider); 099 } catch (final InjectionException e) { 100 throw new RuntimeException(e); 101 } 102 } 103 104 /** 105 * Creates a transport. 106 * 107 * @param hostAddress a host address 108 * @param port a listening port 109 * @param clientStage a client stage 110 * @param serverStage a server stage 111 * @param numberOfTries a number of tries 112 * @param retryTimeout a timeout for retry 113 * @param tcpPortProvider a provider for TCP port 114 */ 115 @Override 116 public Transport newInstance(final String hostAddress, 117 final int port, 118 final EStage<TransportEvent> clientStage, 119 final EStage<TransportEvent> serverStage, 120 final int numberOfTries, 121 final int retryTimeout, 122 final TcpPortProvider tcpPortProvider) { 123 124 final Injector injector = Tang.Factory.getTang().newInjector(); 125 injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); 126 injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); 127 injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage); 128 injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, serverStage); 129 injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); 130 injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); 131 injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); 132 try { 133 return injector.getInstance(NettyMessagingTransport.class); 134 } catch (final InjectionException e) { 135 throw new RuntimeException(e); 136 } 137 } 138}