This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.impl;
020
021import org.apache.reef.wake.EStage;
022import org.apache.reef.wake.EventHandler;
023import org.apache.reef.wake.WakeParameters;
024import org.apache.reef.wake.impl.DefaultThreadFactory;
025import org.apache.reef.wake.impl.ThreadPoolStage;
026import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
027
028import java.net.SocketAddress;
029import java.util.List;
030import java.util.concurrent.*;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Receive incoming events and dispatch to correct handlers in order.
036 */
037public class OrderedRemoteReceiverStage implements EStage<TransportEvent> {
038
039  private static final Logger LOG = Logger.getLogger(OrderedRemoteReceiverStage.class.getName());
040
041  private static final String CLASS_NAME = OrderedRemoteReceiverStage.class.getSimpleName();
042
043  private static final long SHUTDOWN_TIMEOUT = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT;
044
045  private final ExecutorService pushExecutor;
046  private final ExecutorService pullExecutor;
047
048  private final ThreadPoolStage<TransportEvent> pushStage;
049
050  /**
051   * Constructs an ordered remote receiver stage.
052   *
053   * @param handler      the handler of remote events
054   * @param errorHandler the exception handler
055   */
056  public OrderedRemoteReceiverStage(
057      final EventHandler<RemoteEvent<byte[]>> handler, final EventHandler<Throwable> errorHandler) {
058
059    this.pushExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(CLASS_NAME + ":Push"));
060    this.pullExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(CLASS_NAME + ":Pull"));
061
062    final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap = new ConcurrentHashMap<>();
063
064    final ThreadPoolStage<OrderedEventStream> pullStage = new ThreadPoolStage<>(
065        new OrderedPullEventHandler(handler), this.pullExecutor, errorHandler);
066
067    this.pushStage = new ThreadPoolStage<>(
068        new OrderedPushEventHandler(streamMap, pullStage), this.pushExecutor, errorHandler); // for decoupling
069  }
070
071  @Override
072  public void onNext(final TransportEvent value) {
073    LOG.log(Level.FINEST, "Push: {0}", value);
074    this.pushStage.onNext(value);
075  }
076
077  @Override
078  public void close() throws Exception {
079    close("PushExecutor", this.pushExecutor);
080    close("PullExecutor", this.pullExecutor);
081  }
082
083  private static void close(final String name, final ExecutorService executor) {
084    LOG.log(Level.FINE, "Close {0} begin", name);
085    executor.shutdown();
086    try {
087      // wait for threads to finish for timeout
088      if (!executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
089        LOG.log(Level.WARNING, "{0}: Executor did not terminate in {1} ms.", new Object[] {name, SHUTDOWN_TIMEOUT});
090        final List<Runnable> droppedRunnables = executor.shutdownNow();
091        LOG.log(Level.WARNING, "{0}: Executor dropped {1} tasks.", new Object[] {name, droppedRunnables.size()});
092      }
093    } catch (final InterruptedException e) {
094      LOG.log(Level.WARNING, "Close interrupted");
095      throw new RemoteRuntimeException(e);
096    }
097    LOG.log(Level.FINE, "Close {0} end", name);
098  }
099}
100
101class OrderedPushEventHandler implements EventHandler<TransportEvent> {
102
103  private static final Logger LOG = Logger.getLogger(OrderedPushEventHandler.class.getName());
104
105  private final RemoteEventCodec<byte[]> codec;
106  private final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap; // per remote address
107  private final ThreadPoolStage<OrderedEventStream> pullStage;
108
109  OrderedPushEventHandler(final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap,
110                          final ThreadPoolStage<OrderedEventStream> pullStage) {
111    this.codec = new RemoteEventCodec<>(new ByteCodec());
112    this.streamMap = streamMap;
113    this.pullStage = pullStage;
114  }
115
116  @Override
117  public void onNext(final TransportEvent value) {
118    final RemoteEvent<byte[]> re = codec.decode(value.getData());
119    re.setLocalAddress(value.getLocalAddress());
120    re.setRemoteAddress(value.getRemoteAddress());
121
122    if (LOG.isLoggable(Level.FINER)) {
123      LOG.log(Level.FINER, "{0} {1}", new Object[]{value, re});
124    }
125
126    LOG.log(Level.FINER, "Value length is {0}", value.getData().length);
127
128    final SocketAddress addr = re.remoteAddress();
129    OrderedEventStream stream = streamMap.get(re.remoteAddress());
130    if (stream == null) {
131      stream = new OrderedEventStream();
132      if (streamMap.putIfAbsent(addr, stream) != null) {
133        stream = streamMap.get(addr);
134      }
135    }
136    stream.add(re);
137    pullStage.onNext(stream);
138  }
139}
140
141class OrderedPullEventHandler implements EventHandler<OrderedEventStream> {
142
143  private static final Logger LOG = Logger.getLogger(OrderedPullEventHandler.class.getName());
144
145  private final EventHandler<RemoteEvent<byte[]>> handler;
146
147  OrderedPullEventHandler(final EventHandler<RemoteEvent<byte[]>> handler) {
148    this.handler = handler;
149  }
150
151  @Override
152  public void onNext(final OrderedEventStream stream) {
153    if (LOG.isLoggable(Level.FINER)) {
154      LOG.log(Level.FINER, "{0}", stream);
155    }
156
157    synchronized (stream) {
158      RemoteEvent<byte[]> event;
159      while ((event = stream.consume()) != null) {
160        handler.onNext(event);
161      }
162    }
163  }
164}
165
166class OrderedEventStream {
167  private static final Logger LOG = Logger.getLogger(OrderedEventStream.class.getName());
168  private final BlockingQueue<RemoteEvent<byte[]>> queue; // a queue of remote events
169  private long nextSeq; // the number of the next event to consume
170
171  OrderedEventStream() {
172    queue = new PriorityBlockingQueue<>(11, new RemoteEventComparator<byte[]>());
173    nextSeq = 0;
174  }
175
176  synchronized void add(final RemoteEvent<byte[]> event) {
177    queue.add(event);
178  }
179
180  synchronized RemoteEvent<byte[]> consume() {
181    RemoteEvent<byte[]> event = queue.peek();
182    if (event != null) {
183
184      if (event.getSeq() == nextSeq) {
185        event = queue.poll();
186        ++nextSeq;
187        return event;
188      } else {
189        LOG.log(Level.FINER, "Event sequence is {0} does not match expected {1}",
190            new Object[]{event.getSeq(), nextSeq});
191      }
192    } else {
193      LOG.log(Level.FINER, "Event is null");
194    }
195
196    return null;
197  }
198}