001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.rx.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.StageConfiguration.NumberOfThreads; 023import org.apache.reef.wake.StageConfiguration.StageName; 024import org.apache.reef.wake.StageConfiguration.StageObserver; 025import org.apache.reef.wake.WakeParameters; 026import org.apache.reef.wake.exception.WakeRuntimeException; 027import org.apache.reef.wake.impl.DefaultThreadFactory; 028import org.apache.reef.wake.impl.StageManager; 029import org.apache.reef.wake.rx.AbstractRxStage; 030import org.apache.reef.wake.rx.Observer; 031 032import javax.inject.Inject; 033import java.util.List; 034import java.util.concurrent.*; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038/** 039 * Stage that executes the observer with a thread pool. 040 * <p> 041 * {@code onNext}'s will be arbitrarily subject to reordering, as with most stages. 042 * <p> 043 * All {@code onNext}'s for which returning from the method call 044 * happens-before the call to {@code onComplete} will maintain 045 * this relationship when passed to the observer. 046 * <p> 047 * Any {@code onNext} whose return is not ordered before 048 * {@code onComplete} may or may not get dropped. 049 * 050 * @param <T> type of event 051 */ 052public final class RxThreadPoolStage<T> extends AbstractRxStage<T> { 053 private static final Logger LOG = Logger.getLogger(RxThreadPoolStage.class.getName()); 054 055 private final Observer<T> observer; 056 private final ExecutorService executor; 057 private final long shutdownTimeout = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT; 058 private ExecutorService completionExecutor; 059 private DefaultThreadFactory tf; 060 061 /** 062 * Constructs a Rx thread pool stage. 063 * 064 * @param observer the observer to execute 065 * @param numThreads the number of threads 066 */ 067 @Inject 068 public RxThreadPoolStage(@Parameter(StageObserver.class) final Observer<T> observer, 069 @Parameter(NumberOfThreads.class) final int numThreads) { 070 this(observer.getClass().getName(), observer, numThreads); 071 } 072 073 /** 074 * Constructs a Rx thread pool stage. 075 * 076 * @param name the stage name 077 * @param observer the observer to execute 078 * @param numThreads the number of threads 079 */ 080 @Inject 081 public RxThreadPoolStage(@Parameter(StageName.class) final String name, 082 @Parameter(StageObserver.class) final Observer<T> observer, 083 @Parameter(NumberOfThreads.class) final int numThreads) { 084 super(name); 085 this.observer = observer; 086 if (numThreads <= 0) { 087 throw new WakeRuntimeException(name + " numThreads " + numThreads + " is less than or equal to 0"); 088 } 089 tf = new DefaultThreadFactory(name); 090 this.executor = Executors.newFixedThreadPool(numThreads, tf); 091 this.completionExecutor = Executors.newSingleThreadExecutor(tf); 092 StageManager.instance().register(this); 093 } 094 095 /** 096 * Provides the observer with the new value. 097 * 098 * @param value the new value 099 */ 100 @Override 101 public void onNext(final T value) { 102 beforeOnNext(); 103 executor.submit(new Runnable() { 104 105 @Override 106 public void run() { 107 observer.onNext(value); 108 afterOnNext(); 109 } 110 }); 111 } 112 113 /** 114 * Notifies the observer that the provider has experienced an error 115 * condition. 116 * 117 * @param error the error 118 */ 119 @Override 120 public void onError(final Exception error) { 121 submitCompletion(new Runnable() { 122 123 @Override 124 public void run() { 125 observer.onError(error); 126 } 127 128 }); 129 } 130 131 /** 132 * Notifies the observer that the provider has finished sending push-based 133 * notifications. 134 */ 135 @Override 136 public void onCompleted() { 137 submitCompletion(new Runnable() { 138 139 @Override 140 public void run() { 141 observer.onCompleted(); 142 } 143 144 }); 145 } 146 147 private void submitCompletion(final Runnable r) { 148 executor.shutdown(); 149 completionExecutor.submit(new Runnable() { 150 151 @Override 152 public void run() { 153 try { 154 // no timeout for completion, only close() 155 if (!executor.awaitTermination(3153600000L, TimeUnit.SECONDS)) { 156 TimeoutException e = new TimeoutException("Executor terminated due to unrequired timeout"); 157 LOG.log(Level.SEVERE, e.getMessage()); 158 observer.onError(e); 159 } 160 } catch (final InterruptedException e) { 161 e.printStackTrace(); 162 observer.onError(e); 163 } 164 r.run(); 165 } 166 }); 167 } 168 169 /** 170 * Closes the stage. 171 */ 172 @Override 173 public void close() throws Exception { 174 if (closed.compareAndSet(false, true)) { 175 executor.shutdown(); 176 completionExecutor.shutdown(); 177 if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { 178 LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); 179 final List<Runnable> droppedRunnables = executor.shutdownNow(); 180 LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); 181 } 182 if (!completionExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { 183 LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); 184 final List<Runnable> droppedRunnables = completionExecutor.shutdownNow(); 185 LOG.log(Level.WARNING, "Completion executor dropped " + droppedRunnables.size() + " tasks."); 186 } 187 } 188 } 189 190 /** 191 * Gets the queue length of this stage. 192 * 193 * @return the queue length 194 */ 195 public int getQueueLength() { 196 return ((ThreadPoolExecutor) executor).getQueue().size(); 197 } 198}