001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Unit; 022import org.apache.reef.wake.EventHandler; 023 024import javax.inject.Inject; 025import java.util.logging.Level; 026import java.util.logging.Logger; 027 028/** 029 * An EventHandler combines two events of different types into a single Pair of events. 030 * Handler will block until both events are received. 031 * <p> 032 * onNext is thread safe 033 * 034 * @param <L> type of event 035 * @param <R> type of event 036 * @see BlockingEventHandler 037 */ 038@Unit 039public final class MergingEventHandler<L, R> { 040 041 private static final Logger LOG = Logger.getLogger(MergingEventHandler.class.getName()); 042 private final EventHandler<L> left = new Left(); 043 private final EventHandler<R> right = new Right(); 044 private final Object mutex = new Object(); 045 private final EventHandler<Pair<L, R>> destination; 046 private L leftEvent; 047 private R rightEvent; 048 049 public EventHandler<L> getLeft() { 050 return left; 051 } 052 053 public EventHandler<R> getRight() { 054 return right; 055 } 056 057 @Inject 058 public MergingEventHandler(final EventHandler<Pair<L, R>> destination) { 059 this.destination = destination; 060 reset(); 061 } 062 063 /** 064 * Not thread safe. Must be externally synchronized. 065 */ 066 private void reset() { 067 rightEvent = null; 068 leftEvent = null; 069 } 070 071 /** 072 * A pair having two independent typed items. 073 * 074 * @param <S1> a type of first item 075 * @param <S2> a type of second item 076 */ 077 public static final class Pair<S1, S2> { 078 private final S1 first; 079 private final S2 second; 080 081 public S1 getFirst() { 082 return first; 083 } 084 085 public S2 getSecond() { 086 return second; 087 } 088 089 private Pair(final S1 s1, final S2 s2) { 090 this.first = s1; 091 this.second = s2; 092 } 093 } 094 095 private class Left implements EventHandler<L> { 096 097 @Override 098 public void onNext(final L event) { 099 100 L leftRef = null; 101 R rightRef = null; 102 103 synchronized (mutex) { 104 105 while (leftEvent != null) { 106 try { 107 mutex.wait(); 108 } catch (final InterruptedException e) { 109 LOG.log(Level.SEVERE, "Wait interrupted.", e); 110 } 111 } 112 113 if (LOG.isLoggable(Level.FINEST)) { 114 LOG.log(Level.FINEST, "{0} producing left {1}", 115 new Object[]{Thread.currentThread(), event}); 116 } 117 118 leftEvent = event; 119 leftRef = event; 120 121 if (rightEvent != null) { 122 rightRef = rightEvent; 123 reset(); 124 mutex.notifyAll(); 125 } 126 } 127 128 if (rightRef != null) { 129 // I get to fire the event 130 destination.onNext(new Pair<L, R>(leftRef, rightRef)); 131 } 132 } 133 } 134 135 private class Right implements EventHandler<R> { 136 137 @Override 138 public void onNext(final R event) { 139 140 L leftRef = null; 141 R rightRef = null; 142 143 synchronized (mutex) { 144 145 while (rightEvent != null) { 146 try { 147 mutex.wait(); 148 } catch (final InterruptedException e) { 149 LOG.log(Level.SEVERE, "Wait interrupted.", e); 150 } 151 } 152 153 if (LOG.isLoggable(Level.FINEST)) { 154 LOG.log(Level.FINEST, "{0} producing right {1}", 155 new Object[]{Thread.currentThread(), event}); 156 } 157 158 rightEvent = event; 159 rightRef = event; 160 161 if (leftEvent != null) { 162 leftRef = leftEvent; 163 reset(); 164 mutex.notifyAll(); 165 } 166 } 167 168 if (leftRef != null) { 169 // I get to fire the event 170 destination.onNext(new Pair<L, R>(leftRef, rightRef)); 171 } 172 } 173 } 174}