This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.time.runtime;
020
021import org.apache.reef.tang.InjectionFuture;
022import org.apache.reef.tang.annotations.Parameter;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.impl.PubSubEventHandler;
025import org.apache.reef.wake.time.Clock;
026import org.apache.reef.wake.time.Time;
027import org.apache.reef.wake.time.event.Alarm;
028import org.apache.reef.wake.time.event.StartTime;
029import org.apache.reef.wake.time.event.StopTime;
030import org.apache.reef.wake.time.runtime.event.*;
031
032import javax.inject.Inject;
033import java.util.Set;
034import java.util.TreeSet;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038/**
039 * Default implementation of clock.
040 *
041 * After invoking `RuntimeStart` and `StartTime` events initially,
042 * this invokes scheduled events on time. If there is no scheduled event,
043 * `IdleClock` event is invoked.
044 */
045public final class RuntimeClock implements Clock {
046
047  private static final Logger LOG = Logger.getLogger(Clock.class.toString());
048
049  private final Timer timer;
050
051  private final TreeSet<Time> schedule;
052
053  private final PubSubEventHandler<Time> handlers;
054
055  private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler;
056  private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler;
057  private final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler;
058  private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler;
059  private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler;
060
061  private Throwable stoppedOnException;
062  private boolean closed = false;
063
064  @Inject
065  RuntimeClock(final Timer timer,
066               @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler,
067               @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler,
068               @Parameter(Clock.RuntimeStartHandler.class)
069               final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler,
070               @Parameter(Clock.RuntimeStopHandler.class)
071               final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler,
072               @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) {
073    this.timer = timer;
074    this.schedule = new TreeSet<>();
075    this.handlers = new PubSubEventHandler<>();
076
077    this.startHandler = startHandler;
078    this.stopHandler = stopHandler;
079    this.runtimeStartHandler = runtimeStartHandler;
080    this.runtimeStopHandler = runtimeStopHandler;
081    this.idleHandler = idleHandler;
082
083    this.stoppedOnException = null;
084
085    LOG.log(Level.FINE, "RuntimeClock instantiated.");
086  }
087
088  @Override
089  public void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
090    synchronized (this.schedule) {
091      if (this.closed) {
092        throw new IllegalStateException("Scheduling alarm on a closed clock");
093      }
094
095      this.schedule.add(new ClientAlarm(this.timer.getCurrent() + offset, handler));
096      this.schedule.notifyAll();
097    }
098  }
099
100  public void registerEventHandler(final Class<? extends Time> clazz, final EventHandler<Time> handler) {
101    this.handlers.subscribe(clazz, handler);
102  }
103
104  public void scheduleRuntimeAlarm(final int offset, final EventHandler<Alarm> handler) {
105    synchronized (this.schedule) {
106      this.schedule.add(new RuntimeAlarm(this.timer.getCurrent() + offset, handler));
107      this.schedule.notifyAll();
108    }
109  }
110
111  @Override
112  public void stop() {
113    this.stop(null);
114  }
115
116  @Override
117  public void stop(final Throwable stopOnException) {
118    LOG.entering(RuntimeClock.class.getCanonicalName(), "stop");
119    synchronized (this.schedule) {
120      this.schedule.clear();
121      this.schedule.add(new StopTime(timer.getCurrent()));
122      this.schedule.notifyAll();
123      this.closed = true;
124      if (this.stoppedOnException == null) {
125        this.stoppedOnException = stopOnException;
126      }
127    }
128    LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop");
129  }
130
131  @Override
132  public void close() {
133    LOG.entering(RuntimeClock.class.getCanonicalName(), "close");
134    synchronized (this.schedule) {
135      if (this.closed) {
136        LOG.log(Level.INFO, "Clock is already closed");
137        return;
138      }
139      this.schedule.clear();
140      this.schedule.add(new StopTime(findAcceptableStopTime()));
141      this.schedule.notifyAll();
142      this.closed = true;
143      LOG.log(Level.INFO, "Clock.close()");
144    }
145    LOG.exiting(RuntimeClock.class.getCanonicalName(), "close");
146  }
147
148  /**
149   * Finds an acceptable stop time, which is the
150   * a time beyond that of any client alarm.
151   *
152   * @return an acceptable stop time
153   */
154  private long findAcceptableStopTime() {
155    long time = timer.getCurrent();
156    for (final Time t : this.schedule) {
157      if (t instanceof ClientAlarm) {
158        assert time <= t.getTimeStamp();
159        time = t.getTimeStamp();
160      }
161    }
162    return time + 1;
163  }
164
165
166  @Override
167  public boolean isIdle() {
168    synchronized (this.schedule) {
169      for (final Time t : this.schedule) {
170        if (t instanceof ClientAlarm) {
171          return false;
172        }
173      }
174      return true;
175    }
176  }
177
178  @SuppressWarnings("checkstyle:hiddenfield")
179  private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) {
180    for (final EventHandler<T> handler : handlers) {
181      this.handlers.subscribe(eventClass, handler);
182    }
183  }
184
185  /**
186   * Logs the currently running threads.
187   *
188   * @param level  the level used for the log entry
189   * @param prefix put before the comma-separated list of threads
190   */
191  private void logThreads(final Level level, final String prefix) {
192    final StringBuilder sb = new StringBuilder(prefix);
193    for (final Thread t : Thread.getAllStackTraces().keySet()) {
194      sb.append(t.getName());
195      sb.append(", ");
196    }
197    LOG.log(level, sb.toString());
198  }
199
200  @Override
201  public void run() {
202    LOG.entering(RuntimeClock.class.getCanonicalName(), "run");
203
204    try {
205      LOG.log(Level.FINE, "Subscribe event handlers");
206      subscribe(StartTime.class, this.startHandler.get());
207      subscribe(StopTime.class, this.stopHandler.get());
208      subscribe(RuntimeStart.class, this.runtimeStartHandler.get());
209      subscribe(RuntimeStop.class, this.runtimeStopHandler.get());
210      subscribe(IdleClock.class, this.idleHandler.get());
211
212      LOG.log(Level.FINE, "Initiate runtime start");
213      this.handlers.onNext(new RuntimeStart(this.timer.getCurrent()));
214
215      LOG.log(Level.FINE, "Initiate start time");
216      final StartTime start = new StartTime(this.timer.getCurrent());
217      this.handlers.onNext(start);
218
219      while (true) {
220        LOG.log(Level.FINEST, "Entering clock main loop iteration.");
221        try {
222          if (this.isIdle()) {
223            // Handle an idle clock event, without locking this.schedule
224            this.handlers.onNext(new IdleClock(timer.getCurrent()));
225          }
226
227          Time time = null;
228          synchronized (this.schedule) {
229            while (this.schedule.isEmpty()) {
230              this.schedule.wait();
231            }
232
233            assert this.schedule.first() != null;
234
235            // Wait until the first scheduled time is ready
236            for (long duration = this.timer.getDuration(this.schedule.first().getTimeStamp());
237                 duration > 0;
238                 duration = this.timer.getDuration(this.schedule.first().getTimeStamp())) {
239              // note: while I'm waiting, another alarm could be scheduled with a shorter duration
240              // so the next time I go around the loop I need to revise my duration
241              this.schedule.wait(duration);
242            }
243            // Remove the event from the schedule and process it:
244            time = this.schedule.pollFirst();
245            assert time != null;
246          }
247
248          if (time instanceof Alarm) {
249            final Alarm alarm = (Alarm) time;
250            alarm.handle();
251          } else {
252            this.handlers.onNext(time);
253            if (time instanceof StopTime) {
254              break; // we're done.
255            }
256          }
257        } catch (final InterruptedException expected) {
258          // waiting interrupted - return to loop
259        }
260      }
261      if (this.stoppedOnException == null) {
262        this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
263      } else {
264        this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.stoppedOnException));
265      }
266    } catch (final Exception e) {
267      e.printStackTrace();
268      this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));
269    } finally {
270      logThreads(Level.FINE, "Threads running after exiting the clock main loop: ");
271      LOG.log(Level.FINE, "Runtime clock exit");
272    }
273    LOG.exiting(RuntimeClock.class.getCanonicalName(), "run");
274
275  }
276
277
278}