This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.impl;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.wake.EventHandler;
023import org.apache.reef.wake.Stage;
024import org.apache.reef.wake.StageConfiguration.StageHandler;
025import org.apache.reef.wake.StageConfiguration.StageName;
026import org.apache.reef.wake.StageConfiguration.TimerInitialDelay;
027import org.apache.reef.wake.StageConfiguration.TimerPeriod;
028import org.apache.reef.wake.WakeParameters;
029
030import javax.inject.Inject;
031import java.util.List;
032import java.util.concurrent.Executors;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Stage that triggers an event handler periodically.
041 */
042public final class TimerStage implements Stage {
043  private static final Logger LOG = Logger.getLogger(TimerStage.class.getName());
044
045  private final AtomicBoolean closed = new AtomicBoolean(false);
046  private final ScheduledExecutorService executor;
047  private final PeriodicEvent event = new PeriodicEvent();
048  private final long shutdownTimeout = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT;
049
050  /**
051   * Constructs a timer stage with no initial delay.
052   *
053   * @param handler an event handler
054   * @param period  a period in milli-seconds
055   */
056  @Inject
057  public TimerStage(@Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler,
058                    @Parameter(TimerPeriod.class) final long period) {
059    this(handler, 0, period);
060  }
061
062  /**
063   * Constructs a timer stage with no initial delay.
064   *
065   * @param name the stage name
066   * @param handler an event handler
067   * @param period  a period in milli-seconds
068   */
069  @Inject
070  public TimerStage(@Parameter(StageName.class) final String name,
071                    @Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler,
072                    @Parameter(TimerPeriod.class) final long period) {
073    this(name, handler, 0, period);
074  }
075
076  /**
077   * Constructs a timer stage.
078   *
079   * @param handler      an event handler
080   * @param initialDelay an initial delay
081   * @param period       a period in milli-seconds
082   */
083  @Inject
084  public TimerStage(@Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler,
085                    @Parameter(TimerInitialDelay.class) final long initialDelay,
086                    @Parameter(TimerPeriod.class) final long period) {
087    this(handler.getClass().getName(), handler, initialDelay, period);
088  }
089
090  /**
091   * Constructs a timer stage.
092   *
093   * @param name         the stage name
094   * @param handler      an event handler
095   * @param initialDelay an initial delay
096   * @param period       a period in milli-seconds
097   */
098  @Inject
099  public TimerStage(@Parameter(StageName.class) final String name,
100                    @Parameter(StageHandler.class) final EventHandler<PeriodicEvent> handler,
101                    @Parameter(TimerInitialDelay.class) final long initialDelay,
102                    @Parameter(TimerPeriod.class) final long period) {
103    this.executor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory(name));
104    executor.scheduleAtFixedRate(new Runnable() {
105
106      @Override
107      public void run() {
108        if (LOG.isLoggable(Level.FINEST)) {
109          LOG.log(Level.FINEST, "{0} {1}", new Object[]{name, event});
110        }
111        handler.onNext(event);
112      }
113
114    }, initialDelay, period, TimeUnit.MILLISECONDS);
115    StageManager.instance().register(this);
116  }
117
118
119  /**
120   * Closes resources.
121   *
122   * @throws Exception
123   */
124  @Override
125  public void close() throws Exception {
126    if (closed.compareAndSet(false, true)) {
127      executor.shutdown();
128      if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
129        LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
130        final List<Runnable> droppedRunnables = executor.shutdownNow();
131        LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
132      }
133    }
134  }
135
136}
137