This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.impl;
020
021import org.apache.reef.io.Tuple;
022import org.apache.reef.io.naming.Naming;
023import org.apache.reef.io.network.Connection;
024import org.apache.reef.io.network.ConnectionFactory;
025import org.apache.reef.io.network.Message;
026import org.apache.reef.io.network.naming.NameResolver;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.wake.*;
029import org.apache.reef.wake.impl.LoggingEventHandler;
030import org.apache.reef.wake.impl.SingleThreadStage;
031import org.apache.reef.wake.remote.Codec;
032import org.apache.reef.wake.remote.address.LocalAddressProvider;
033import org.apache.reef.wake.remote.impl.TransportEvent;
034import org.apache.reef.wake.remote.transport.Transport;
035import org.apache.reef.wake.remote.transport.TransportFactory;
036import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
037
038import javax.inject.Inject;
039import java.net.InetSocketAddress;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Network service for Task.
047 */
048public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
049
050  private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
051
052  private final IdentifierFactory factory;
053  private final Codec<T> codec;
054  private final Transport transport;
055  private final NameResolver nameResolver;
056  private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>();
057  private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
058  private final EStage<Identifier> nameServiceUnregisteringStage;
059  private Identifier myId;
060
061  /**
062   * @deprecated in 0.12. Use Tang to obtain an instance of this instead.
063   */
064  @Deprecated
065  @Inject
066  public NetworkService(
067      @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) final IdentifierFactory factory,
068      @Parameter(NetworkServiceParameters.NetworkServicePort.class) final int nsPort,
069      final NameResolver nameResolver,
070      @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) final Codec<T> codec,
071      @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) final TransportFactory tpFactory,
072      @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) final EventHandler<Message<T>> recvHandler,
073      @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) final EventHandler<Exception> exHandler,
074      final LocalAddressProvider localAddressProvider) {
075
076    this.factory = factory;
077    this.codec = codec;
078    this.transport = tpFactory.newInstance(nsPort,
079        new LoggingEventHandler<TransportEvent>(),
080        new MessageHandler<T>(recvHandler, codec, factory), exHandler);
081
082    this.nameResolver = nameResolver;
083
084    this.nameServiceRegisteringStage = new SingleThreadStage<>(
085        "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
086          @Override
087          public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
088            try {
089              nameResolver.register(tuple.getKey(), tuple.getValue());
090              LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
091            } catch (final Exception ex) {
092              final String msg = "Unable to register " + tuple.getKey() + "with name service";
093              LOG.log(Level.WARNING, msg, ex);
094              throw new RuntimeException(msg, ex);
095            }
096          }
097        }, 5);
098
099    this.nameServiceUnregisteringStage = new SingleThreadStage<>(
100        "NameServiceRegisterer", new EventHandler<Identifier>() {
101          @Override
102          public void onNext(final Identifier id) {
103            try {
104              nameResolver.unregister(id);
105              LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
106            } catch (final Exception ex) {
107              final String msg = "Unable to unregister " + id + " with name service";
108              LOG.log(Level.WARNING, msg, ex);
109              throw new RuntimeException(msg, ex);
110            }
111          }
112        }, 5);
113  }
114
115  public void registerId(final Identifier id) {
116    this.myId = id;
117    final Tuple<Identifier, InetSocketAddress> tuple =
118        new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress());
119    LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})",
120        new Object[]{tuple.getKey(), tuple.getValue()});
121    this.nameServiceRegisteringStage.onNext(tuple);
122  }
123
124  public void unregisterId(final Identifier id) {
125    this.myId = null;
126    LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})",
127        new Object[]{id, this.transport.getLocalAddress()});
128    this.nameServiceUnregisteringStage.onNext(id);
129  }
130
131  public Identifier getMyId() {
132    return this.myId;
133  }
134
135  public Transport getTransport() {
136    return this.transport;
137  }
138
139  public Codec<T> getCodec() {
140    return this.codec;
141  }
142
143  public Naming getNameClient() {
144    return this.nameResolver;
145  }
146
147  public IdentifierFactory getIdentifierFactory() {
148    return this.factory;
149  }
150
151  void remove(final Identifier id) {
152    this.idToConnMap.remove(id);
153  }
154
155  @Override
156  public void close() throws Exception {
157    LOG.log(Level.FINE, "Shutting down");
158    this.transport.close();
159    this.nameResolver.close();
160  }
161
162  @Override
163  public Connection<T> newConnection(final Identifier destId) {
164
165    if (this.myId == null) {
166      throw new RuntimeException(
167          "Trying to establish a connection from a Network Service that is not bound to any task");
168    }
169
170    final Connection<T> conn = this.idToConnMap.get(destId);
171    if (conn != null) {
172      return conn;
173    }
174
175    final Connection<T> newConnection = new NSConnection<>(
176        this.myId, destId, new LoggingLinkListener<T>(), this);
177
178    final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection);
179    return existing == null ? newConnection : existing;
180  }
181
182  @Override
183  public Identifier getConnectionFactoryId() {
184    throw new UnsupportedOperationException();
185  }
186
187  @Override
188  public Identifier getLocalEndPointId() {
189    throw new UnsupportedOperationException();
190  }
191}
192
193class MessageHandler<T> implements EventHandler<TransportEvent> {
194
195  private final EventHandler<Message<T>> handler;
196  private final NSMessageCodec<T> codec;
197
198  MessageHandler(final EventHandler<Message<T>> handler,
199                 final Codec<T> codec, final IdentifierFactory factory) {
200    this.handler = handler;
201    this.codec = new NSMessageCodec<>(codec, factory);
202  }
203
204  @Override
205  public void onNext(final TransportEvent value) {
206    final byte[] data = value.getData();
207    final NSMessage<T> obj = this.codec.decode(data);
208    this.handler.onNext(obj);
209  }
210}