001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.AbstractEStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.StageConfiguration; 025 026import javax.inject.Inject; 027import java.util.concurrent.ForkJoinTask; 028import java.util.logging.Logger; 029 030/** 031 * This Wake event handling stage uses a {@link java.util.concurrent.ForkJoinPool} 032 * to submit tasks. The advantage is that underlying workers 033 * have separate queues instead of sharing one. The queues are load 034 * balanced with work stealing. 035 * <p> 036 * The pool is provided to the constructor, so multiple stages 037 * may use the same pool. 038 * <p> 039 * Some advantage in throughput over other stage implementations should be seen 040 * when one wake stage is submitting to another using the same 041 * {@link WakeSharedPool}. In this case, the new event may be executed 042 * directly by that thread. 043 * 044 * @param <T> type of events 045 */ 046public class ForkPoolStage<T> extends AbstractEStage<T> { 047 private static final Logger LOG = Logger.getLogger(ForkPoolStage.class.getName()); 048 049 private final EventHandler<T> handler; 050 private final WakeSharedPool pool; 051 052 @Inject 053 public ForkPoolStage(@Parameter(StageConfiguration.StageName.class) final String stageName, 054 @Parameter(StageConfiguration.StageHandler.class) final EventHandler<T> handler, 055 final WakeSharedPool sharedPool 056 ) { 057 super(stageName); 058 this.pool = sharedPool; 059 this.handler = handler; 060 //TODO[JIRA REEF-911]: should WakeSharedPool register its stages? 061 062 StageManager.instance().register(this); 063 } 064 065 @Inject 066 public ForkPoolStage(@Parameter(StageConfiguration.StageHandler.class) final EventHandler<T> handler, 067 final WakeSharedPool sharedPool) { 068 this(ForkPoolStage.class.getName(), handler, sharedPool); 069 } 070 071 @Override 072 public void onNext(final T value) { 073 beforeOnNext(); 074 pool.submit(new ForkJoinTask<T>() { 075 @Override 076 public T getRawResult() { 077 // tasks have no results because they are events 078 // this may be used for extensions 079 return null; 080 } 081 082 @Override 083 protected void setRawResult(final T value) { 084 // tasks have no results because they are events 085 // this may be used for extensions 086 } 087 088 @Override 089 protected boolean exec() { 090 handler.onNext(value); 091 afterOnNext(); 092 return true; 093 } 094 }); 095 } 096 097 098 @Override 099 public void close() throws Exception { 100 LOG.warning("close(): " + pool.getClass().getName() + " " + pool + " must really be close()'d"); 101 } 102 103}