001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.AbstractEStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.StageConfiguration.*; 025import org.apache.reef.wake.WakeParameters; 026import org.apache.reef.wake.exception.WakeRuntimeException; 027 028import javax.inject.Inject; 029import java.util.List; 030import java.util.concurrent.*; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Stage that executes an event handler with a thread pool. 036 * 037 * @param <T> type 038 */ 039public final class ThreadPoolStage<T> extends AbstractEStage<T> { 040 041 private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName()); 042 043 private static final long SHUTDOWN_TIMEOUT = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT; 044 045 private final EventHandler<T> handler; 046 private final EventHandler<Throwable> errorHandler; 047 private final ExecutorService executor; 048 private final int numThreads; 049 050 /** 051 * Constructs a thread-pool stage. 052 * 053 * @param handler the event handler to execute 054 * @param numThreads the number of threads to use 055 * @throws WakeRuntimeException 056 */ 057 @Inject 058 public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler, 059 @Parameter(NumberOfThreads.class) final int numThreads) { 060 this(handler.getClass().getName(), handler, numThreads, null); 061 } 062 063 /** 064 * Constructs a thread-pool stage. 065 * 066 * @param name the stage name 067 * @param handler the event handler to execute 068 * @param numThreads the number of threads to use 069 * @param errorHandler the error handler 070 * @throws WakeRuntimeException 071 */ 072 @Inject 073 public ThreadPoolStage(@Parameter(StageName.class) final String name, 074 @Parameter(StageHandler.class) final EventHandler<T> handler, 075 @Parameter(NumberOfThreads.class) final int numThreads, 076 @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) { 077 super(name); 078 this.handler = handler; 079 this.errorHandler = errorHandler; 080 if (numThreads <= 0) { 081 throw new WakeRuntimeException(name + " numThreads " + numThreads + " is less than or equal to 0"); 082 } 083 this.numThreads = numThreads; 084 this.executor = Executors.newFixedThreadPool(numThreads, new DefaultThreadFactory(name)); 085 StageManager.instance().register(this); 086 } 087 088 /** 089 * Constructs a thread-pool stage. 090 * 091 * @param name the stage name 092 * @param handler the event handler to execute 093 * @param numThreads the number of threads to use 094 * @throws WakeRuntimeException 095 */ 096 @Inject 097 public ThreadPoolStage(@Parameter(StageName.class) final String name, 098 @Parameter(StageHandler.class) final EventHandler<T> handler, 099 @Parameter(NumberOfThreads.class) final int numThreads) { 100 this(name, handler, numThreads, null); 101 } 102 103 /** 104 * Constructs a thread-pool stage. 105 * 106 * @param handler the event handler to execute 107 * @param executor the external executor service provided 108 */ 109 @Inject 110 public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler, 111 @Parameter(StageExecutorService.class) final ExecutorService executor) { 112 this(handler.getClass().getName(), handler, executor); 113 } 114 115 116 /** 117 * Constructs a thread-pool stage. 118 * 119 * @param handler the event handler to execute 120 * @param executor the external executor service provided 121 * @param errorHandler the error handler 122 */ 123 @Inject 124 public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler, 125 @Parameter(StageExecutorService.class) final ExecutorService executor, 126 @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) { 127 this(handler.getClass().getName(), handler, executor, errorHandler); 128 } 129 130 /** 131 * Constructs a thread-pool stage. 132 * 133 * @param name the stage name 134 * @param handler the event handler to execute 135 * @param executor the external executor service provided 136 * for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory} 137 */ 138 @Inject 139 public ThreadPoolStage(@Parameter(StageName.class) final String name, 140 @Parameter(StageHandler.class) final EventHandler<T> handler, 141 @Parameter(StageExecutorService.class) final ExecutorService executor) { 142 this(name, handler, executor, null); 143 } 144 145 /** 146 * Constructs a thread-pool stage. 147 * 148 * @param name the stage name 149 * @param handler the event handler to execute 150 * @param executor the external executor service provided 151 * for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory} 152 * @param errorHandler the error handler 153 */ 154 @Inject 155 public ThreadPoolStage(@Parameter(StageName.class) final String name, 156 @Parameter(StageHandler.class) final EventHandler<T> handler, 157 @Parameter(StageExecutorService.class) final ExecutorService executor, 158 @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) { 159 super(name); 160 this.handler = handler; 161 this.errorHandler = errorHandler; 162 this.numThreads = 0; 163 this.executor = executor; 164 StageManager.instance().register(this); 165 } 166 167 /** 168 * Handles the event using a thread in the thread pool. 169 * 170 * @param value the event 171 */ 172 @Override 173 @SuppressWarnings("checkstyle:illegalcatch") 174 public void onNext(final T value) { 175 beforeOnNext(); 176 try { 177 executor.submit(new Runnable() { 178 179 @Override 180 public void run() { 181 try { 182 handler.onNext(value); 183 } catch (final Throwable t) { 184 if (errorHandler != null) { 185 errorHandler.onNext(t); 186 } else { 187 LOG.log(Level.SEVERE, name + " Exception from event handler", t); 188 throw t; 189 } 190 } finally { 191 afterOnNext(); 192 } 193 } 194 195 }); 196 } catch (final Exception e) { 197 LOG.log(Level.SEVERE, "Encountered error when submitting to executor in ThreadPoolStage."); 198 afterOnNext(); 199 throw e; 200 } 201 202 } 203 204 /** 205 * Closes resources. 206 */ 207 @Override 208 public void close() { 209 210 if (closed.compareAndSet(false, true) && numThreads > 0) { 211 212 LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: begin", this.name); 213 214 executor.shutdown(); 215 216 boolean isTerminated = false; 217 try { 218 isTerminated = executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); 219 } catch (final InterruptedException ex) { 220 LOG.log(Level.WARNING, "Interrupted closing ThreadPoolStage " + this.name, ex); 221 } 222 223 if (!isTerminated) { 224 final List<Runnable> droppedRunnables = executor.shutdownNow(); 225 LOG.log(Level.SEVERE, 226 "Closing ThreadPoolStage {0}: Executor did not terminate in {1} ms. Dropping {2} tasks", 227 new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()}); 228 } 229 230 if (!executor.isTerminated()) { 231 LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to terminate.", this.name); 232 } 233 234 LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name); 235 } 236 } 237 238 /** 239 * Gets the queue length of this stage. 240 * 241 * @return the queue length 242 */ 243 public int getQueueLength() { 244 return ((ThreadPoolExecutor) executor).getQueue().size(); 245 } 246 247 /** 248 * Gets the active count of this stage. 249 * @return the active count 250 */ 251 public int getActiveCount() { 252 return (int)(getInMeter().getCount() - getOutMeter().getCount()); 253 } 254}