001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.utils; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.tang.util.MonotonicHashMap; 024import org.apache.reef.util.ExceptionHandlingEventHandler; 025import org.apache.reef.wake.EventHandler; 026import org.apache.reef.wake.impl.ThreadPoolStage; 027 028import java.util.Collections; 029import java.util.Map; 030import java.util.Set; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Delayed event router that dispatches messages to the proper event handler by type. 036 * This class is used in EvaluatorManager to isolate user threads from REEF. 037 */ 038@Private 039@DriverSide 040public final class DispatchingEStage implements AutoCloseable { 041 042 private static final Logger LOG = Logger.getLogger(DispatchingEStage.class.getName()); 043 044 /** 045 * A map of event handlers, populated in the register() method. 046 */ 047 private final Map<Class<?>, EventHandler<?>> handlers = 048 Collections.synchronizedMap(new MonotonicHashMap<Class<?>, EventHandler<?>>()); 049 /** 050 * Exception handler, one for all event handlers. 051 */ 052 private final EventHandler<Throwable> errorHandler; 053 /** 054 * Thread pool to process delayed event handler invocations. 055 */ 056 private final ThreadPoolStage<DelayedOnNext> stage; 057 058 /** 059 * @param errorHandler used for exceptions thrown from the event handlers registered. 060 * @param numThreads number of threads to allocate to dispatch events. 061 * @param stageName the name to use for the underlying stage. 062 * It will be carried over to name the Thread(s) spawned. 063 */ 064 public DispatchingEStage(final EventHandler<Throwable> errorHandler, 065 final int numThreads, 066 final String stageName) { 067 this.errorHandler = errorHandler; 068 this.stage = new ThreadPoolStage<>(stageName, 069 new EventHandler<DelayedOnNext>() { 070 @Override 071 public void onNext(final DelayedOnNext promise) { 072 promise.handler.onNext(promise.message); 073 } 074 }, numThreads 075 ); 076 077 } 078 079 /** 080 * Constructs a DispatchingEStage that uses the Thread pool and ErrorHandler of another one. 081 * 082 * @param other 083 */ 084 public DispatchingEStage(final DispatchingEStage other) { 085 this.errorHandler = other.errorHandler; 086 this.stage = other.stage; 087 } 088 089 /** 090 * Register a new event handler. 091 * 092 * @param type Message type to process with this handler. 093 * @param handlers A set of handlers that process that type of message. 094 * @param <T> Message type. 095 * @param <U> Type of message that event handler supports. Must be a subclass of T. 096 */ 097 @SuppressWarnings("checkstyle:hiddenfield") 098 public <T, U extends T> void register(final Class<T> type, final Set<EventHandler<U>> handlers) { 099 this.handlers.put(type, new ExceptionHandlingEventHandler<>( 100 new BroadCastEventHandler<>(handlers), this.errorHandler)); 101 } 102 103 /** 104 * Dispatch a new message by type. 105 * If the stage is already closed, log a warning and ignore the message. 106 * @param type Type of event handler - must match the register() call. 107 * @param message A message to process. Must be a subclass of T. 108 * @param <T> Message type that event handler supports. 109 * @param <U> input message type. Must be a subclass of T. 110 */ 111 @SuppressWarnings("unchecked") 112 public <T, U extends T> void onNext(final Class<T> type, final U message) { 113 if (this.isClosed()) { 114 LOG.log(Level.WARNING, "Dispatcher {0} already closed: ignoring message {1}: {2}", 115 new Object[] {this.stage, type.getCanonicalName(), message}); 116 } else { 117 final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type); 118 this.stage.onNext(new DelayedOnNext(handler, message)); 119 } 120 } 121 122 /** 123 * Return true if there are no messages queued or in processing, false otherwise. 124 */ 125 public boolean isEmpty() { 126 return this.stage.getQueueLength() + this.stage.getActiveCount() == 0; 127 } 128 129 /** 130 * Close the stage adn stop accepting new messages. 131 * Closes the internal thread pool. 132 */ 133 @Override 134 public void close() { 135 this.stage.close(); 136 } 137 138 /** 139 * Check if the stage can still accept messages. 140 * @return true if the stage can no longer accept messages, false otherwise. 141 */ 142 public boolean isClosed() { 143 return this.stage.isClosed(); 144 } 145 146 /** 147 * Delayed EventHandler.onNext() call. 148 * Contains a message object and EventHandler to process it. 149 */ 150 private static final class DelayedOnNext { 151 152 private final EventHandler<Object> handler; 153 private final Object message; 154 155 @SuppressWarnings("unchecked") 156 <T, U extends T> DelayedOnNext(final EventHandler<T> handler, final U message) { 157 this.handler = (EventHandler<Object>) handler; 158 this.message = message; 159 } 160 } 161}