This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.impl;
020
021import org.apache.reef.io.Tuple;
022import org.apache.reef.io.naming.Naming;
023import org.apache.reef.io.network.Connection;
024import org.apache.reef.io.network.ConnectionFactory;
025import org.apache.reef.io.network.Message;
026import org.apache.reef.io.network.naming.NameResolver;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.wake.*;
029import org.apache.reef.wake.impl.LoggingEventHandler;
030import org.apache.reef.wake.impl.SingleThreadStage;
031import org.apache.reef.wake.remote.Codec;
032import org.apache.reef.wake.remote.impl.TransportEvent;
033import org.apache.reef.wake.remote.transport.Transport;
034import org.apache.reef.wake.remote.transport.TransportFactory;
035import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
036
037import javax.inject.Inject;
038import java.net.InetSocketAddress;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Network service for Task.
046 */
047public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
048
049  private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
050
051  private final IdentifierFactory factory;
052  private final Codec<T> codec;
053  private final Transport transport;
054  private final NameResolver nameResolver;
055  private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>();
056  private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
057  private final EStage<Identifier> nameServiceUnregisteringStage;
058  private Identifier myId;
059
060  @Inject
061  private NetworkService(
062      @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) final IdentifierFactory factory,
063      @Parameter(NetworkServiceParameters.NetworkServicePort.class) final int nsPort,
064      final NameResolver nameResolver,
065      @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) final Codec<T> codec,
066      @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) final TransportFactory tpFactory,
067      @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) final EventHandler<Message<T>> recvHandler,
068      @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class)
069      final EventHandler<Exception> exHandler) {
070    this.factory = factory;
071    this.codec = codec;
072    this.transport = tpFactory.newInstance(nsPort,
073        new LoggingEventHandler<TransportEvent>(),
074        new MessageHandler<T>(recvHandler, codec, factory), exHandler);
075
076    this.nameResolver = nameResolver;
077
078    this.nameServiceRegisteringStage = new SingleThreadStage<>(
079        "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
080          @Override
081          public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
082            try {
083              nameResolver.register(tuple.getKey(), tuple.getValue());
084              LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
085            } catch (final Exception ex) {
086              final String msg = "Unable to register " + tuple.getKey() + "with name service";
087              LOG.log(Level.WARNING, msg, ex);
088              throw new RuntimeException(msg, ex);
089            }
090          }
091        }, 5);
092
093    this.nameServiceUnregisteringStage = new SingleThreadStage<>(
094        "NameServiceRegisterer", new EventHandler<Identifier>() {
095          @Override
096          public void onNext(final Identifier id) {
097            try {
098              nameResolver.unregister(id);
099              LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
100            } catch (final Exception ex) {
101              final String msg = "Unable to unregister " + id + " with name service";
102              LOG.log(Level.WARNING, msg, ex);
103              throw new RuntimeException(msg, ex);
104            }
105          }
106        }, 5);
107  }
108
109  public void registerId(final Identifier id) {
110    this.myId = id;
111    final Tuple<Identifier, InetSocketAddress> tuple =
112        new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress());
113    LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})",
114        new Object[]{tuple.getKey(), tuple.getValue()});
115    this.nameServiceRegisteringStage.onNext(tuple);
116  }
117
118  public void unregisterId(final Identifier id) {
119    this.myId = null;
120    LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})",
121        new Object[]{id, this.transport.getLocalAddress()});
122    this.nameServiceUnregisteringStage.onNext(id);
123  }
124
125  public Identifier getMyId() {
126    return this.myId;
127  }
128
129  public Transport getTransport() {
130    return this.transport;
131  }
132
133  public Codec<T> getCodec() {
134    return this.codec;
135  }
136
137  public Naming getNameClient() {
138    return this.nameResolver;
139  }
140
141  public IdentifierFactory getIdentifierFactory() {
142    return this.factory;
143  }
144
145  void remove(final Identifier id) {
146    this.idToConnMap.remove(id);
147  }
148
149  @Override
150  public void close() throws Exception {
151    LOG.log(Level.FINE, "Shutting down");
152    this.transport.close();
153    this.nameResolver.close();
154  }
155
156  @Override
157  public Connection<T> newConnection(final Identifier destId) {
158
159    if (this.myId == null) {
160      throw new RuntimeException(
161          "Trying to establish a connection from a Network Service that is not bound to any task");
162    }
163
164    final Connection<T> conn = this.idToConnMap.get(destId);
165    if (conn != null) {
166      return conn;
167    }
168
169    final Connection<T> newConnection = new NSConnection<>(
170        this.myId, destId, new LoggingLinkListener<T>(), this);
171
172    final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection);
173    return existing == null ? newConnection : existing;
174  }
175
176  @Override
177  public Identifier getConnectionFactoryId() {
178    throw new UnsupportedOperationException();
179  }
180
181  @Override
182  public Identifier getLocalEndPointId() {
183    throw new UnsupportedOperationException();
184  }
185}
186
187class MessageHandler<T> implements EventHandler<TransportEvent> {
188
189  private final EventHandler<Message<T>> handler;
190  private final NSMessageCodec<T> codec;
191
192  MessageHandler(final EventHandler<Message<T>> handler,
193                 final Codec<T> codec, final IdentifierFactory factory) {
194    this.handler = handler;
195    this.codec = new NSMessageCodec<>(codec, factory);
196  }
197
198  @Override
199  public void onNext(final TransportEvent value) {
200    final byte[] data = value.getData();
201    final NSMessage<T> obj = this.codec.decode(data);
202    this.handler.onNext(obj);
203  }
204}