001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.examples.p2p; 020 021import org.apache.reef.wake.EventHandler; 022 023import java.util.LinkedList; 024import java.util.Queue; 025import java.util.logging.Level; 026import java.util.logging.Logger; 027 028/** 029 * Performs a Pull-to-Push conversion in Wake. 030 * <p> 031 * The class pulls from a set of event sources, and pushes to a single 032 * EventHandler. If the downstream event handler blocks, this will block, 033 * providing a simple rate limiting scheme. 034 * <p> 035 * The EventSources are managed in a basic Queue. 036 * 037 * @param <T> the message type 038 */ 039public final class Pull2Push<T> implements Runnable, AutoCloseable { 040 041 private final EventHandler<T> output; // The downstream EventHandler 042 private final Queue<EventSource<T>> sources = new LinkedList<>(); // The upstream event sources 043 private boolean closed = false; 044 045 /** 046 * @param output the EventHandler that receives the messages from this 047 * Pull2Push. 048 */ 049 public Pull2Push(final EventHandler<T> output) { 050 this.output = output; 051 } 052 053 /** 054 * Registers an event source. 055 * 056 * @param source The source that will be added to the queue of this 057 * Pull2Push 058 */ 059 public void register(final EventSource<T> source) { 060 this.sources.add(source); 061 } 062 063 /** 064 * Executes the message loop. 065 */ 066 @Override 067 public void run() { 068 069 while (!this.closed) { 070 // Grab the next available message source, if any 071 final EventSource<T> nextSource = sources.poll(); 072 if (null != nextSource) { 073 // Grab the next message from that source, if any 074 final T message = nextSource.getNext(); 075 if (null != message) { 076 // Add the source to the end of the queue again. 077 sources.add(nextSource); 078 // Send the message. Note that this may block depending on the underlying EventHandler. 079 this.output.onNext(message); 080 } else { 081 // The message source has returned null as the next message. We drop the message source in that case. 082 Logger.getLogger(Pull2Push.class.getName()).log(Level.INFO, "Droping message source {0} from the queue", 083 nextSource.toString()); 084 } 085 } 086 } 087 } 088 089 @Override 090 public void close() throws Exception { 091 this.closed = true; 092 } 093}