This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.p2p;
020
021import org.apache.reef.wake.EventHandler;
022
023import java.util.LinkedList;
024import java.util.Queue;
025import java.util.logging.Level;
026import java.util.logging.Logger;
027
028/**
029 * Performs a Pull-to-Push conversion in Wake.
030 * <p>
031 * The class pulls from a set of event sources, and pushes to a single
032 * EventHandler. If the downstream event handler blocks, this will block,
033 * providing a simple rate limiting scheme.
034 * <p>
035 * The EventSources are managed in a basic Queue.
036 *
037 * @param <T> the message type
038 */
039public final class Pull2Push<T> implements Runnable, AutoCloseable {
040
041  private final EventHandler<T> output; // The downstream EventHandler
042  private final Queue<EventSource<T>> sources = new LinkedList<>(); // The upstream event sources
043  private boolean closed = false;
044
045  /**
046   * @param output the EventHandler that receives the messages from this
047   *               Pull2Push.
048   */
049  public Pull2Push(final EventHandler<T> output) {
050    this.output = output;
051  }
052
053  /**
054   * Registers an event source.
055   *
056   * @param source The source that will be added to the queue of this
057   *               Pull2Push
058   */
059  public void register(final EventSource<T> source) {
060    this.sources.add(source);
061  }
062
063  /**
064   * Executes the message loop.
065   */
066  @Override
067  public void run() {
068
069    while (!this.closed) {
070      // Grab the next available message source, if any
071      final EventSource<T> nextSource = sources.poll();
072      if (null != nextSource) {
073        // Grab the next message from that source, if any
074        final T message = nextSource.getNext();
075        if (null != message) {
076          // Add the source to the end of the queue again.
077          sources.add(nextSource);
078          // Send the message. Note that this may block depending on the underlying EventHandler.
079          this.output.onNext(message);
080        } else {
081          // The message source has returned null as the next message. We drop the message source in that case.
082          Logger.getLogger(Pull2Push.class.getName()).log(Level.INFO, "Droping message source {0} from the queue",
083              nextSource.toString());
084        }
085      }
086    }
087  }
088
089  @Override
090  public void close() throws Exception {
091    this.closed = true;
092  }
093}