001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Name; 022import org.apache.reef.tang.annotations.NamedParameter; 023import org.apache.reef.tang.annotations.Parameter; 024import org.apache.reef.wake.Stage; 025import org.apache.reef.wake.WakeParameters; 026 027import javax.inject.Inject; 028import java.util.List; 029import java.util.concurrent.ForkJoinPool; 030import java.util.concurrent.ForkJoinTask; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036 037/** 038 * This implementation uses the fork join framework to reduce the cost of spawning 039 * events in stages. For two participating stages back to back, the pool allows 040 * for the thread in the first stage to execute the event it submits to the second stage. 041 * These choices are made by the ForkJoinPool. 042 * 043 * So, this does sort of go against the reason for stages, but doesn't eliminate them 044 * and raises the level of abstraction that Wake sees above threads. 045 * 046 * this will only be deadlock free if blocking synchronization done by events is safe. 047 * That is no event submitted to the pool can have a producer/consumer dependency 048 * on another event submitted to the pool 049 */ 050public class WakeSharedPool implements Stage { 051 private static final Logger LOG = Logger.getLogger(WakeSharedPool.class.getName()); 052 053 // not a constant, so specify here 054 private static final int DEFAULT_PARALLELISM = Math.max(1, Runtime.getRuntime().availableProcessors() - 2); 055 private final ForkJoinPool pool; 056 private final long shutdownTimeout = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT; 057 private AtomicBoolean closed = new AtomicBoolean(false); 058 059 @Inject 060 public WakeSharedPool(@Parameter(Parallelism.class) final int parallelism) { 061 this.pool = new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, 062 new Thread.UncaughtExceptionHandler() { 063 @Override 064 public void uncaughtException(final Thread t, final Throwable e) { 065 // TODO[JIRA REEF-911]: need to pass this upwards to REEF can grab it 066 } 067 }, 068 // async mode turned on so a task that invokes other tasks does not have to join on them. 069 // this is appropriate for event-based tasks, where once you submit an event to a stage it 070 // is always fire-and-forget. 071 true); 072 073 // register it with the StageManager, since the pool is meant to back stages 074 StageManager.instance().register(this); 075 } 076 077 @Inject 078 public WakeSharedPool() { 079 this(DEFAULT_PARALLELISM); 080 } 081 082 public void submit(final ForkJoinTask<?> t) { 083 if (ForkJoinTask.inForkJoinPool()) { 084 ForkJoinTask.invokeAll(t); 085 // alternatively just pool().pool.execute(t), which simply forces it to be this pool 086 // (right now we expect only one anyway) 087 } else { 088 pool.submit(t); 089 } 090 } 091 092 @Override 093 public void close() throws Exception { 094 LOG.info("ending pool stage: " + pool.toString()); 095 if (closed.compareAndSet(false, true)) { 096 pool.shutdown(); 097 if (!pool.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { 098 LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); 099 final List<Runnable> droppedRunnables = pool.shutdownNow(); 100 LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); 101 } 102 } 103 } 104 105 @NamedParameter 106 private static class Parallelism implements Name<Integer> { 107 } 108}