This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.impl;
020
021import org.apache.reef.wake.AbstractEStage;
022import org.apache.reef.wake.EventHandler;
023
024import java.util.List;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.TimeUnit;
028import java.util.logging.Logger;
029
030/**
031 * This stage uses a thread pool to schedule events in parallel.
032 * Should be used when input events are already materialized in a List and
033 * can be fired in any order.
034 */
035public class IndependentIterationsThreadPoolStage<T> extends AbstractEStage<List<T>> {
036
037  private final int granularity;
038  private EventHandler<T> handler;
039  private ExecutorService executor;
040
041  /**
042   * Create a thread pool with fixed threads.
043   *
044   * @param handler     an event handler
045   * @param numThreads  fixed number of threads available in the pool
046   * @param granularity maximum number of events executed serially.
047   *                    The right choice will balance task spawn overhead with parallelism.
048   */
049  public IndependentIterationsThreadPoolStage(
050      final EventHandler<T> handler, final int numThreads, final int granularity) {
051    super(handler.getClass().getName());
052    this.handler = handler;
053    this.executor = Executors.newFixedThreadPool(numThreads);
054    this.granularity = granularity;
055  }
056
057  private Runnable newTask(final List<T> iterations) {
058    return new Runnable() {
059      @Override
060      public void run() {
061        for (final T e : iterations) {
062          handler.onNext(e);
063        }
064      }
065    };
066  }
067
068  @Override
069  public void onNext(final List<T> iterations) {
070    Logger.getAnonymousLogger().info("Execute new task [" + iterations.size());
071    final int size = iterations.size();
072    for (int i = 0; i < size; i += granularity) {
073      int toIndex = i + granularity;
074      toIndex = toIndex > size ? size : toIndex;
075      executor.execute(newTask(iterations.subList(i, toIndex)));
076    }
077  }
078
079  @Override
080  public void close() throws Exception {
081    executor.shutdown();
082    executor.awaitTermination(1000, TimeUnit.DAYS);
083  }
084
085
086}