This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.impl;
020
021import org.apache.reef.exception.evaluator.NetworkException;
022import org.apache.reef.io.Tuple;
023import org.apache.reef.io.network.ConnectionFactory;
024import org.apache.reef.io.network.Message;
025import org.apache.reef.io.network.NetworkConnectionService;
026import org.apache.reef.io.network.exception.NetworkRuntimeException;
027import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
028import org.apache.reef.io.network.impl.config.NetworkConnectionServicePort;
029import org.apache.reef.io.network.naming.NameResolver;
030import org.apache.reef.tang.annotations.Parameter;
031import org.apache.reef.wake.EStage;
032import org.apache.reef.wake.EventHandler;
033import org.apache.reef.wake.Identifier;
034import org.apache.reef.wake.IdentifierFactory;
035import org.apache.reef.wake.impl.SingleThreadStage;
036import org.apache.reef.wake.remote.Codec;
037import org.apache.reef.wake.remote.impl.TransportEvent;
038import org.apache.reef.wake.remote.transport.Link;
039import org.apache.reef.wake.remote.transport.LinkListener;
040import org.apache.reef.wake.remote.transport.Transport;
041import org.apache.reef.wake.remote.transport.TransportFactory;
042
043import javax.inject.Inject;
044import java.net.InetSocketAddress;
045import java.net.SocketAddress;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.ConcurrentMap;
048import java.util.concurrent.atomic.AtomicBoolean;
049import java.util.logging.Level;
050import java.util.logging.Logger;
051
052/**
053 * Default Network connection service implementation.
054 */
055public final class NetworkConnectionServiceImpl implements NetworkConnectionService {
056
057  private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceImpl.class.getName());
058
059  /**
060   * An identifier factory registering network connection service id.
061   */
062  private final IdentifierFactory idFactory;
063  /**
064   * A name resolver looking up nameserver.
065   */
066  private final NameResolver nameResolver;
067  /**
068   * A messaging transport.
069   */
070  private final Transport transport;
071  /**
072   * A map of (id of connection factory, a connection factory instance).
073   */
074  private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap;
075
076  /**
077   * A network connection service message codec.
078   */
079  private final Codec<NetworkConnectionServiceMessage> nsCodec;
080  /**
081   * A network connection service link listener.
082   */
083  private final LinkListener<NetworkConnectionServiceMessage> nsLinkListener;
084  /**
085   * A stage registering identifiers to nameServer.
086   */
087  private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
088  /**
089   * A stage unregistering identifiers from nameServer.
090   */
091  private final EStage<Identifier> nameServiceUnregisteringStage;
092  /**
093   * A boolean flag that indicates whether the NetworkConnectionService is closed.
094   */
095  private final AtomicBoolean isClosed;
096  /**
097   * A DELIMITER to make a concatenated end point id {{connectionFactoryId}}{{DELIMITER}}{{localEndPointId}}.
098   */
099  private static final String DELIMITER = "/";
100
101  @Inject
102  private NetworkConnectionServiceImpl(
103      @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFactory,
104      @Parameter(NetworkConnectionServicePort.class) final int nsPort,
105      final TransportFactory transportFactory,
106      final NameResolver nameResolver) {
107    this.idFactory = idFactory;
108    this.connFactoryMap = new ConcurrentHashMap<>();
109    this.nsCodec = new NetworkConnectionServiceMessageCodec(idFactory, connFactoryMap);
110    this.nsLinkListener = new NetworkConnectionServiceLinkListener(connFactoryMap);
111    final EventHandler<TransportEvent> recvHandler =
112        new NetworkConnectionServiceReceiveHandler(connFactoryMap, nsCodec);
113    this.nameResolver = nameResolver;
114    this.transport = transportFactory.newInstance(nsPort, recvHandler, recvHandler,
115        new NetworkConnectionServiceExceptionHandler());
116
117    this.nameServiceRegisteringStage = new SingleThreadStage<>(
118        "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
119          @Override
120          public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
121            try {
122              nameResolver.register(tuple.getKey(), tuple.getValue());
123              LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
124            } catch (final Exception ex) {
125              final String msg = "Unable to register " + tuple.getKey() + " with name service";
126              LOG.log(Level.WARNING, msg, ex);
127              throw new RuntimeException(msg, ex);
128            }
129          }
130        }, 5);
131
132    this.nameServiceUnregisteringStage = new SingleThreadStage<>(
133        "NameServiceRegisterer", new EventHandler<Identifier>() {
134          @Override
135          public void onNext(final Identifier id) {
136            try {
137              nameResolver.unregister(id);
138              LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
139            } catch (final Exception ex) {
140              final String msg = "Unable to unregister " + id + " with name service";
141              LOG.log(Level.WARNING, msg, ex);
142              throw new RuntimeException(msg, ex);
143            }
144          }
145        }, 5);
146
147    this.isClosed = new AtomicBoolean();
148  }
149
150
151  private void checkBeforeRegistration(final String connectionFactoryId) {
152    if (isClosed.get()) {
153      throw new NetworkRuntimeException("Unable to register new ConnectionFactory to closed NetworkConnectionService");
154    }
155
156    if (connFactoryMap.get(connectionFactoryId) != null) {
157      throw new NetworkRuntimeException("ConnectionFactory " + connectionFactoryId + " was already registered.");
158    }
159
160    if (connectionFactoryId.contains(DELIMITER)) {
161      throw new NetworkRuntimeException(
162          "The ConnectionFactoryId " + connectionFactoryId + " should not contain " + DELIMITER);
163    }
164  }
165
166  @Override
167  public <T> ConnectionFactory<T> registerConnectionFactory(
168      final Identifier connectionFactoryId,
169      final Codec<T> codec,
170      final EventHandler<Message<T>> eventHandler,
171      final LinkListener<Message<T>> linkListener,
172      final Identifier localEndPointId) {
173    final String id = connectionFactoryId.toString();
174    checkBeforeRegistration(id);
175
176    final NetworkConnectionFactory<T> connectionFactory = new NetworkConnectionFactory<>(
177        this, connectionFactoryId, codec, eventHandler, linkListener, localEndPointId);
178    final Identifier localId = getEndPointIdWithConnectionFactoryId(connectionFactoryId, localEndPointId);
179    nameServiceRegisteringStage.onNext(new Tuple<>(localId, (InetSocketAddress) transport.getLocalAddress()));
180
181    if (connFactoryMap.putIfAbsent(id, connectionFactory) != null) {
182      throw new NetworkRuntimeException("ConnectionFactory " + connectionFactoryId + " was already registered.");
183    }
184
185    LOG.log(Level.INFO, "ConnectionFactory {0} was registered", id);
186
187    return connectionFactory;
188  }
189
190  @Override
191  public void unregisterConnectionFactory(final Identifier connFactoryId) {
192    final String id = connFactoryId.toString();
193    final NetworkConnectionFactory connFactory = connFactoryMap.remove(id);
194    if (connFactory != null) {
195      LOG.log(Level.INFO, "ConnectionFactory {0} was unregistered", id);
196
197      final Identifier localId = getEndPointIdWithConnectionFactoryId(
198            connFactoryId, connFactory.getLocalEndPointId());
199      nameServiceUnregisteringStage.onNext(localId);
200    } else {
201      LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id);
202    }
203  }
204
205
206  /**
207   * Open a channel for destination identifier of NetworkConnectionService.
208   * @param connectionFactoryId
209   * @param remoteEndPointId
210   * @throws NetworkException
211   */
212  <T> Link<NetworkConnectionServiceMessage<T>> openLink(
213      final Identifier connectionFactoryId, final Identifier remoteEndPointId) throws NetworkException {
214    final Identifier remoteId = getEndPointIdWithConnectionFactoryId(connectionFactoryId, remoteEndPointId);
215    try {
216      final SocketAddress address = nameResolver.lookup(remoteId);
217      if (address == null) {
218        throw new NetworkException("Lookup " + remoteId + " is null");
219      }
220      return transport.open(address, nsCodec, nsLinkListener);
221    } catch(final Exception e) {
222      throw new NetworkException(e);
223    }
224  }
225
226
227  private Identifier getEndPointIdWithConnectionFactoryId(
228      final Identifier connectionFactoryId, final Identifier endPointId) {
229    final String identifier = connectionFactoryId.toString() + DELIMITER + endPointId.toString();
230    return idFactory.getNewInstance(identifier);
231  }
232
233  /**
234   * Gets a ConnectionFactory.
235   * @param connFactoryId the identifier of the ConnectionFactory
236   */
237  @Override
238  public <T> ConnectionFactory<T> getConnectionFactory(final Identifier connFactoryId) {
239    final ConnectionFactory<T> connFactory = connFactoryMap.get(connFactoryId.toString());
240    if (connFactory == null) {
241      throw new RuntimeException("Cannot find ConnectionFactory of " + connFactoryId + ".");
242    }
243    return connFactory;
244  }
245
246  @Override
247  public void close() throws Exception {
248    if (isClosed.compareAndSet(false, true)) {
249      LOG.log(Level.FINE, "Shutting down");
250      this.nameServiceRegisteringStage.close();
251      this.nameServiceUnregisteringStage.close();
252      this.nameResolver.close();
253      this.transport.close();
254    }
255  }
256}