This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.impl;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.wake.AbstractEStage;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.StageConfiguration.*;
025import org.apache.reef.wake.WakeParameters;
026import org.apache.reef.wake.exception.WakeRuntimeException;
027
028import javax.inject.Inject;
029import java.util.List;
030import java.util.concurrent.*;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Stage that executes an event handler with a thread pool.
036 *
037 * @param <T> type
038 */
039public final class ThreadPoolStage<T> extends AbstractEStage<T> {
040
041  private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName());
042
043  private static final long SHUTDOWN_TIMEOUT = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT;
044
045  private final EventHandler<T> handler;
046  private final EventHandler<Throwable> errorHandler;
047  private final ExecutorService executor;
048  private final int numThreads;
049
050  /**
051   * Constructs a thread-pool stage.
052   *
053   * @param handler    the event handler to execute
054   * @param numThreads the number of threads to use
055   * @throws WakeRuntimeException
056   */
057  @Inject
058  public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler,
059                         @Parameter(NumberOfThreads.class) final int numThreads) {
060    this(handler.getClass().getName(), handler, numThreads, null);
061  }
062
063  /**
064   * Constructs a thread-pool stage.
065   *
066   * @param name         the stage name
067   * @param handler      the event handler to execute
068   * @param numThreads   the number of threads to use
069   * @param errorHandler the error handler
070   * @throws WakeRuntimeException
071   */
072  @Inject
073  public ThreadPoolStage(@Parameter(StageName.class) final String name,
074                         @Parameter(StageHandler.class) final EventHandler<T> handler,
075                         @Parameter(NumberOfThreads.class) final int numThreads,
076                         @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) {
077    super(name);
078    this.handler = handler;
079    this.errorHandler = errorHandler;
080    if (numThreads <= 0) {
081      throw new WakeRuntimeException(name + " numThreads " + numThreads + " is less than or equal to 0");
082    }
083    this.numThreads = numThreads;
084    this.executor = Executors.newFixedThreadPool(numThreads, new DefaultThreadFactory(name));
085    StageManager.instance().register(this);
086  }
087
088  /**
089   * Constructs a thread-pool stage.
090   *
091   * @param name       the stage name
092   * @param handler    the event handler to execute
093   * @param numThreads the number of threads to use
094   * @throws WakeRuntimeException
095   */
096  @Inject
097  public ThreadPoolStage(@Parameter(StageName.class) final String name,
098                         @Parameter(StageHandler.class) final EventHandler<T> handler,
099                         @Parameter(NumberOfThreads.class) final int numThreads) {
100    this(name, handler, numThreads, null);
101  }
102
103  /**
104   * Constructs a thread-pool stage.
105   *
106   * @param handler  the event handler to execute
107   * @param executor the external executor service provided
108   */
109  @Inject
110  public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler,
111                         @Parameter(StageExecutorService.class) final ExecutorService executor) {
112    this(handler.getClass().getName(), handler, executor);
113  }
114
115
116  /**
117   * Constructs a thread-pool stage.
118   *
119   * @param handler      the event handler to execute
120   * @param executor     the external executor service provided
121   * @param errorHandler the error handler
122   */
123  @Inject
124  public ThreadPoolStage(@Parameter(StageHandler.class) final EventHandler<T> handler,
125                         @Parameter(StageExecutorService.class) final ExecutorService executor,
126                         @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) {
127    this(handler.getClass().getName(), handler, executor, errorHandler);
128  }
129
130  /**
131   * Constructs a thread-pool stage.
132   *
133   * @param name     the stage name
134   * @param handler  the event handler to execute
135   * @param executor the external executor service provided
136   *                 for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory}
137   */
138  @Inject
139  public ThreadPoolStage(@Parameter(StageName.class) final String name,
140                         @Parameter(StageHandler.class) final EventHandler<T> handler,
141                         @Parameter(StageExecutorService.class) final ExecutorService executor) {
142    this(name, handler, executor, null);
143  }
144
145  /**
146   * Constructs a thread-pool stage.
147   *
148   * @param name         the stage name
149   * @param handler      the event handler to execute
150   * @param executor     the external executor service provided
151   *                     for consistent tracking, it is recommended to create executor with {@link DefaultThreadFactory}
152   * @param errorHandler the error handler
153   */
154  @Inject
155  public ThreadPoolStage(@Parameter(StageName.class) final String name,
156                         @Parameter(StageHandler.class) final EventHandler<T> handler,
157                         @Parameter(StageExecutorService.class) final ExecutorService executor,
158                         @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) {
159    super(name);
160    this.handler = handler;
161    this.errorHandler = errorHandler;
162    this.numThreads = 0;
163    this.executor = executor;
164    StageManager.instance().register(this);
165  }
166
167  /**
168   * Handles the event using a thread in the thread pool.
169   *
170   * @param value the event
171   */
172  @Override
173  @SuppressWarnings("checkstyle:illegalcatch")
174  public void onNext(final T value) {
175    beforeOnNext();
176    try {
177      executor.submit(new Runnable() {
178
179        @Override
180        public void run() {
181          try {
182            handler.onNext(value);
183          } catch (final Throwable t) {
184            if (errorHandler != null) {
185              errorHandler.onNext(t);
186            } else {
187              LOG.log(Level.SEVERE, name + " Exception from event handler", t);
188              throw t;
189            }
190          } finally {
191            afterOnNext();
192          }
193        }
194
195      });
196    } catch (final Exception e) {
197      LOG.log(Level.SEVERE, "Encountered error when submitting to executor in ThreadPoolStage.");
198      afterOnNext();
199      throw e;
200    }
201
202  }
203
204  /**
205   * Closes resources.
206   */
207  @Override
208  public void close() {
209
210    if (closed.compareAndSet(false, true) && numThreads > 0) {
211
212      LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: begin", this.name);
213
214      executor.shutdown();
215
216      boolean isTerminated = false;
217      try {
218        isTerminated = executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
219      } catch (final InterruptedException ex) {
220        LOG.log(Level.WARNING, "Interrupted closing ThreadPoolStage " + this.name, ex);
221      }
222
223      if (!isTerminated) {
224        final List<Runnable> droppedRunnables = executor.shutdownNow();
225        LOG.log(Level.SEVERE,
226            "Closing ThreadPoolStage {0}: Executor did not terminate in {1} ms. Dropping {2} tasks",
227            new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()});
228      }
229
230      if (!executor.isTerminated()) {
231        LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to terminate.", this.name);
232      }
233
234      LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name);
235    }
236  }
237
238  /**
239   * Gets the queue length of this stage.
240   *
241   * @return the queue length
242   */
243  public int getQueueLength() {
244    return ((ThreadPoolExecutor) executor).getQueue().size();
245  }
246
247  /**
248   * Gets the active count of this stage.
249   * @return the active count
250   */
251  public int getActiveCount() {
252    return (int)(getInMeter().getCount() - getOutMeter().getCount());
253  }
254}