001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.EventHandler; 023import org.apache.reef.wake.Stage; 024import org.apache.reef.wake.StageConfiguration.StageHandler; 025import org.apache.reef.wake.StageConfiguration.StageName; 026import org.apache.reef.wake.StageConfiguration.TimerInitialDelay; 027import org.apache.reef.wake.StageConfiguration.TimerPeriod; 028import org.apache.reef.wake.WakeParameters; 029 030import javax.inject.Inject; 031import java.util.List; 032import java.util.concurrent.Executors; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Stage that triggers an event handler periodically. 041 */ 042public final class TimerStage implements Stage { 043 private static final Logger LOG = Logger.getLogger(TimerStage.class.getName()); 044 045 private final AtomicBoolean closed = new AtomicBoolean(false); 046 private final ScheduledExecutorService executor; 047 private final PeriodicEvent event = new PeriodicEvent(); 048 private final long shutdownTimeout = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT; 049 050 /** 051 * Constructs a timer stage with no initial delay. 052 * 053 * @param handler an event handler 054 * @param period a period in milli-seconds 055 */ 056 @Inject 057 public TimerStage(@Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler, 058 @Parameter(TimerPeriod.class) final long period) { 059 this(handler, 0, period); 060 } 061 062 /** 063 * Constructs a timer stage with no initial delay. 064 * 065 * @param name the stage name 066 * @param handler an event handler 067 * @param period a period in milli-seconds 068 */ 069 @Inject 070 public TimerStage(@Parameter(StageName.class) final String name, 071 @Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler, 072 @Parameter(TimerPeriod.class) final long period) { 073 this(name, handler, 0, period); 074 } 075 076 /** 077 * Constructs a timer stage. 078 * 079 * @param handler an event handler 080 * @param initialDelay an initial delay 081 * @param period a period in milli-seconds 082 */ 083 @Inject 084 public TimerStage(@Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler, 085 @Parameter(TimerInitialDelay.class) final long initialDelay, 086 @Parameter(TimerPeriod.class) final long period) { 087 this(handler.getClass().getName(), handler, initialDelay, period); 088 } 089 090 /** 091 * Constructs a timer stage. 092 * 093 * @param name the stage name 094 * @param handler an event handler 095 * @param initialDelay an initial delay 096 * @param period a period in milli-seconds 097 */ 098 @Inject 099 public TimerStage(@Parameter(StageName.class) final String name, 100 @Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler, 101 @Parameter(TimerInitialDelay.class) final long initialDelay, 102 @Parameter(TimerPeriod.class) final long period) { 103 this.executor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory(name)); 104 executor.scheduleAtFixedRate(new Runnable() { 105 106 @Override 107 public void run() { 108 if (LOG.isLoggable(Level.FINEST)) { 109 LOG.log(Level.FINEST, "{0} {1}", new Object[]{name, event}); 110 } 111 handler.onNext(event); 112 } 113 114 }, initialDelay, period, TimeUnit.MILLISECONDS); 115 StageManager.instance().register(this); 116 } 117 118 119 /** 120 * Closes resources. 121 * 122 * @throws Exception 123 */ 124 @Override 125 public void close() throws Exception { 126 if (closed.compareAndSet(false, true)) { 127 executor.shutdown(); 128 if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { 129 LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); 130 final List<Runnable> droppedRunnables = executor.shutdownNow(); 131 LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); 132 } 133 } 134 } 135 136} 137