001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Unit; 022import org.apache.reef.wake.EventHandler; 023 024import javax.inject.Inject; 025import java.util.logging.Level; 026import java.util.logging.Logger; 027 028/** 029 * An EventHandler combines two events of different types into a single Pair of events. 030 * Handler will block until both events are received. 031 * <p/> 032 * onNext is thread safe 033 * 034 * @param <L> type of event 035 * @param <R> type of event 036 * @see BlockingEventHandler 037 */ 038@Unit 039public final class MergingEventHandler<L, R> { 040 041 private static Logger LOG = Logger.getLogger(MergingEventHandler.class.getName()); 042 public final EventHandler<L> left = new Left(); 043 public final EventHandler<R> right = new Right(); 044 private final Object mutex = new Object(); 045 private final EventHandler<Pair<L, R>> destination; 046 private L leftEvent; 047 private R rightEvent; 048 049 @Inject 050 public MergingEventHandler(final EventHandler<Pair<L, R>> destination) { 051 this.destination = destination; 052 reset(); 053 } 054 055 /* 056 * Not thread safe. Must be externally synchronized. 057 */ 058 private void reset() { 059 rightEvent = null; 060 leftEvent = null; 061 } 062 063 public static class Pair<S1, S2> { 064 public final S1 first; 065 public final S2 second; 066 067 private Pair(S1 s1, S2 s2) { 068 this.first = s1; 069 this.second = s2; 070 } 071 } 072 073 private class Left implements EventHandler<L> { 074 075 @Override 076 public void onNext(final L event) { 077 078 L leftRef = null; 079 R rightRef = null; 080 081 synchronized (mutex) { 082 083 while (leftEvent != null) { 084 try { 085 mutex.wait(); 086 } catch (final InterruptedException e) { 087 LOG.log(Level.SEVERE, "Wait interrupted.", e); 088 } 089 } 090 091 if (LOG.isLoggable(Level.FINEST)) { 092 LOG.log(Level.FINEST, "{0} producing left {1}", 093 new Object[]{Thread.currentThread(), event}); 094 } 095 096 leftEvent = event; 097 leftRef = event; 098 099 if (rightEvent != null) { 100 rightRef = rightEvent; 101 reset(); 102 mutex.notifyAll(); 103 } 104 } 105 106 if (rightRef != null) { 107 // I get to fire the event 108 destination.onNext(new Pair<L, R>(leftRef, rightRef)); 109 } 110 } 111 } 112 113 private class Right implements EventHandler<R> { 114 115 @Override 116 public void onNext(final R event) { 117 118 L leftRef = null; 119 R rightRef = null; 120 121 synchronized (mutex) { 122 123 while (rightEvent != null) { 124 try { 125 mutex.wait(); 126 } catch (final InterruptedException e) { 127 LOG.log(Level.SEVERE, "Wait interrupted.", e); 128 } 129 } 130 131 if (LOG.isLoggable(Level.FINEST)) { 132 LOG.log(Level.FINEST, "{0} producing right {1}", 133 new Object[]{Thread.currentThread(), event}); 134 } 135 136 rightEvent = event; 137 rightRef = event; 138 139 if (leftEvent != null) { 140 leftRef = leftEvent; 141 reset(); 142 mutex.notifyAll(); 143 } 144 } 145 146 if (leftRef != null) { 147 // I get to fire the event 148 destination.onNext(new Pair<L, R>(leftRef, rightRef)); 149 } 150 } 151 } 152}