001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.wake.EventHandler; 022 023import java.util.ArrayList; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.atomic.AtomicInteger; 027 028/** 029 * An EventHandler that blocks until a set number of Events has been received. 030 * Once they have been received, the downstream event handler is called with an 031 * Iterable of the events spooled. 032 * <p> 033 * onNext is thread safe 034 * 035 * @param <T> type of events 036 * @see BlockingSignalEventHandler 037 */ 038public final class BlockingEventHandler<T> implements EventHandler<T> { 039 040 private final int expectedSize; 041 private final EventHandler<Iterable<T>> destination; 042 private final AtomicInteger cursor; 043 // TODO[JIRA REEF-911]: a queue is likely overly conservative given that we only need 044 // to preserve order of those pairs of events that didn't race (have an ordering) 045 private BlockingQueue<T> events = new LinkedBlockingQueue<>(); 046 047 public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) { 048 this.expectedSize = expectedSize; 049 this.destination = destination; 050 this.cursor = new AtomicInteger(0); 051 } 052 053 @Override 054 public void onNext(final T event) { 055 this.events.add(event); 056 final int newCursor = this.cursor.incrementAndGet(); 057 058 if (newCursor % expectedSize == 0) { 059 // TODO[JIRA REEF-911]: There is a race here where the person draining the events might 060 // not include their event as the last one. I'm going to assume this does not 061 // matter, since all events will still be drained exactly once by someone in 062 // the proper order 063 064 final ArrayList<T> nonConcurrent = new ArrayList<>(expectedSize); 065 synchronized (events) { 066 067 // drainTo(maxElements) does not suffice because it has undefined behavior for 068 // any modifications (a better spec would possibly be undefined behavior except for appends) 069 070 // TODO[JIRA REEF-911]: a non-locking implementation will simply atomically update the head of the 071 // queue to index=expectedSize, so that the drainer may drain without synchronization 072 for (int i = 0; i < expectedSize; i++) { 073 nonConcurrent.add(events.poll()); 074 } 075 } 076 this.destination.onNext(nonConcurrent); 077 } 078 } 079}