001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.impl;
020
021import org.apache.reef.wake.EventHandler;
022
023import java.util.ArrayList;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.atomic.AtomicInteger;
027
028/**
029 * An EventHandler that blocks until a set number of Events has been received.
030 * Once they have been received, the downstream event handler is called with an
031 * Iterable of the events spooled.
032 * <p>
033 * onNext is thread safe
034 *
035 * @param <T> type of events
036 * @see BlockingSignalEventHandler
037 */
038public final class BlockingEventHandler<T> implements EventHandler<T> {
039
040  private final int expectedSize;
041  private final EventHandler<Iterable<T>> destination;
042  private final AtomicInteger cursor;
043  // TODO[JIRA REEF-911]: a queue is likely overly conservative given that we only need
044  // to preserve order of those pairs of events that didn't race (have an ordering)
045  private BlockingQueue<T> events = new LinkedBlockingQueue<>();
046
047  public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) {
048    this.expectedSize = expectedSize;
049    this.destination = destination;
050    this.cursor = new AtomicInteger(0);
051  }
052
053  @Override
054  public void onNext(final T event) {
055    this.events.add(event);
056    final int newCursor = this.cursor.incrementAndGet();
057
058    if (newCursor % expectedSize == 0) {
059      // TODO[JIRA REEF-911]: There is a race here where the person draining the events might
060      // not include their event as the last one. I'm going to assume this does not
061      // matter, since all events will still be drained exactly once by someone in
062      // the proper order
063
064      final ArrayList<T> nonConcurrent = new ArrayList<>(expectedSize);
065      synchronized (events) {
066
067        // drainTo(maxElements) does not suffice because it has undefined behavior for
068        // any modifications (a better spec would possibly be undefined behavior except for appends)
069
070        // TODO[JIRA REEF-911]: a non-locking implementation will simply atomically update the head of the
071        // queue to index=expectedSize, so that the drainer may drain without synchronization
072        for (int i = 0; i < expectedSize; i++) {
073          nonConcurrent.add(events.poll());
074        }
075      }
076      this.destination.onNext(nonConcurrent);
077    }
078  }
079}