This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.impl;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.wake.AbstractEStage;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.StageConfiguration;
025
026import javax.inject.Inject;
027import java.util.concurrent.ForkJoinTask;
028import java.util.logging.Logger;
029
030/**
031 * This Wake event handling stage uses a {@link java.util.concurrent.ForkJoinPool}
032 * to submit tasks. The advantage is that underlying workers
033 * have separate queues instead of sharing one. The queues are load
034 * balanced with work stealing.
035 * <p>
036 * The pool is provided to the constructor, so multiple stages
037 * may use the same pool.
038 * <p>
039 * Some advantage in throughput over other stage implementations should be seen
040 * when one wake stage is submitting to another using the same
041 * {@link WakeSharedPool}. In this case, the new event may be executed
042 * directly by that thread.
043 *
044 * @param <T> type of events
045 */
046public class ForkPoolStage<T> extends AbstractEStage<T> {
047  private static final Logger LOG = Logger.getLogger(ForkPoolStage.class.getName());
048
049  private final EventHandler<T> handler;
050  private final WakeSharedPool pool;
051
052  @Inject
053  public ForkPoolStage(@Parameter(StageConfiguration.StageName.class) final String stageName,
054                       @Parameter(StageConfiguration.StageHandler.class) final EventHandler<T> handler,
055                       final WakeSharedPool sharedPool
056  ) {
057    super(stageName);
058    this.pool = sharedPool;
059    this.handler = handler;
060    //TODO[JIRA REEF-911]: should WakeSharedPool register its stages?
061
062    StageManager.instance().register(this);
063  }
064
065  @Inject
066  public ForkPoolStage(@Parameter(StageConfiguration.StageHandler.class) final EventHandler<T> handler,
067                       final WakeSharedPool sharedPool) {
068    this(ForkPoolStage.class.getName(), handler, sharedPool);
069  }
070
071  @Override
072  public void onNext(final T value) {
073    beforeOnNext();
074    pool.submit(new ForkJoinTask<T>() {
075      @Override
076      public T getRawResult() {
077        // tasks have no results because they are events
078        // this may be used for extensions
079        return null;
080      }
081
082      @Override
083      protected void setRawResult(final T value) {
084        // tasks have no results because they are events
085        // this may be used for extensions
086      }
087
088      @Override
089      protected boolean exec() {
090        handler.onNext(value);
091        afterOnNext();
092        return true;
093      }
094    });
095  }
096
097
098  @Override
099  public void close() throws Exception {
100    LOG.warning("close(): " + pool.getClass().getName() + " " + pool + " must really be close()'d");
101  }
102
103}