001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.wake.EventHandler; 022import org.apache.reef.wake.exception.WakeRuntimeException; 023 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.locks.ReadWriteLock; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030import java.util.logging.Level; 031import java.util.logging.Logger; 032 033/** 034 * Event handler that provides publish/subscribe interfaces. 035 * 036 * @param <T> type 037 */ 038public class PubSubEventHandler<T> implements EventHandler<T> { 039 040 private static final Logger LOG = Logger.getLogger(PubSubEventHandler.class.getCanonicalName()); 041 private final Map<Class<? extends T>, List<EventHandler<? extends T>>> clazzToListOfHandlersMap; 042 private final ReadWriteLock lock = new ReentrantReadWriteLock(); 043 044 /** 045 * Constructs a pub-sub event handler. 046 */ 047 public PubSubEventHandler() { 048 this.clazzToListOfHandlersMap = new HashMap<>(); 049 } 050 051 /** 052 * Constructs a pub-sub event handler with initial subscribed event handlers. 053 * 054 * @param clazzToListOfHandlersMap a map of event class types to lists of event handlers 055 */ 056 public PubSubEventHandler(final Map<Class<? extends T>, List<EventHandler<? extends T>>> clazzToListOfHandlersMap) { 057 this.clazzToListOfHandlersMap = clazzToListOfHandlersMap; 058 } 059 060 /** 061 * Subscribes an event handler for an event class type. 062 * 063 * @param clazz an event class 064 * @param handler an event handler 065 */ 066 public void subscribe(final Class<? extends T> clazz, final EventHandler<? extends T> handler) { 067 lock.writeLock().lock(); 068 try { 069 List<EventHandler<? extends T>> list = clazzToListOfHandlersMap.get(clazz); 070 if (list == null) { 071 list = new LinkedList<EventHandler<? extends T>>(); 072 clazzToListOfHandlersMap.put(clazz, list); 073 } 074 list.add(handler); 075 } finally { 076 lock.writeLock().unlock(); 077 } 078 } 079 080 /** 081 * Invokes subscribed handlers for the event class type. 082 * 083 * @param event an event 084 * @throws WakeRuntimeException 085 */ 086 @Override 087 public void onNext(final T event) { 088 LOG.log(Level.FINEST, "Invoked for event: {0}", event); 089 lock.readLock().lock(); 090 final List<EventHandler<? extends T>> list; 091 try { 092 list = clazzToListOfHandlersMap.get(event.getClass()); 093 if (list == null) { 094 throw new WakeRuntimeException("No event " + event.getClass() + " handler"); 095 } 096 for (final EventHandler<? extends T> handler : list) { 097 LOG.log(Level.FINEST, "Invoking {0}", handler); 098 ((EventHandler<T>) handler).onNext(event); 099 } 100 } finally { 101 lock.readLock().unlock(); 102 } 103 } 104 105}