This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.time.runtime;
020
021import org.apache.reef.tang.InjectionFuture;
022import org.apache.reef.tang.annotations.Parameter;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.impl.PubSubEventHandler;
025import org.apache.reef.wake.time.Clock;
026import org.apache.reef.wake.time.Time;
027import org.apache.reef.wake.time.event.Alarm;
028import org.apache.reef.wake.time.event.StartTime;
029import org.apache.reef.wake.time.event.StopTime;
030import org.apache.reef.wake.time.runtime.event.*;
031
032import javax.inject.Inject;
033import java.util.Set;
034import java.util.TreeSet;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038public final class RuntimeClock implements Clock {
039
040  private final static Logger LOG = Logger.getLogger(Clock.class.toString());
041
042  private final Timer timer;
043
044  private final TreeSet<Time> schedule;
045
046  private final PubSubEventHandler<Time> handlers;
047
048  private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler;
049  private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler;
050  private final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler;
051  private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler;
052  private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler;
053
054  private boolean closed = false;
055
056  @Inject
057  RuntimeClock(final Timer timer,
058               @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler,
059               @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler,
060               @Parameter(Clock.RuntimeStartHandler.class) final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler,
061               @Parameter(Clock.RuntimeStopHandler.class) final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler,
062               @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) {
063    this.timer = timer;
064    this.schedule = new TreeSet<>();
065    this.handlers = new PubSubEventHandler<>();
066
067    this.startHandler = startHandler;
068    this.stopHandler = stopHandler;
069    this.runtimeStartHandler = runtimeStartHandler;
070    this.runtimeStopHandler = runtimeStopHandler;
071    this.idleHandler = idleHandler;
072
073    LOG.log(Level.FINE, "RuntimeClock instantiated.");
074  }
075
076  @Override
077  public final void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
078    synchronized (this.schedule) {
079      if (this.closed) {
080        throw new IllegalStateException("Scheduling alarm on a closed clock");
081      }
082
083      this.schedule.add(new ClientAlarm(this.timer.getCurrent() + offset, handler));
084      this.schedule.notifyAll();
085    }
086  }
087
088  public final void registerEventHandler(final Class<? extends Time> clazz, final EventHandler<Time> handler) {
089    this.handlers.subscribe(clazz, handler);
090  }
091
092  public final void scheduleRuntimeAlarm(final int offset, final EventHandler<Alarm> handler) {
093    synchronized (this.schedule) {
094      this.schedule.add(new RuntimeAlarm(this.timer.getCurrent() + offset, handler));
095      this.schedule.notifyAll();
096    }
097  }
098
099  @Override
100  public final void stop() {
101    LOG.entering(RuntimeClock.class.getCanonicalName(), "stop");
102    synchronized (this.schedule) {
103      this.schedule.clear();
104      this.schedule.add(new StopTime(timer.getCurrent()));
105      this.schedule.notifyAll();
106      this.closed = true;
107    }
108    LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop");
109  }
110
111  @Override
112  public final void close() {
113    LOG.entering(RuntimeClock.class.getCanonicalName(), "close");
114    synchronized (this.schedule) {
115      this.schedule.clear();
116      this.schedule.add(new StopTime(findAcceptableStopTime()));
117      this.schedule.notifyAll();
118      this.closed = true;
119      LOG.log(Level.INFO, "Clock.close()");
120    }
121    LOG.exiting(RuntimeClock.class.getCanonicalName(), "close");
122  }
123
124
125  /**
126   * Finds an acceptable stop time, which is the
127   * a time beyond that of any client alarm.
128   *
129   * @return an acceptable stop time
130   */
131  private final long findAcceptableStopTime() {
132    long time = timer.getCurrent();
133    for (Time t : this.schedule) {
134      if (t instanceof ClientAlarm) {
135        assert (time <= t.getTimeStamp());
136        time = t.getTimeStamp();
137      }
138    }
139    return time + 1;
140  }
141
142
143  @Override
144  public final boolean isIdle() {
145    synchronized (this.schedule) {
146      for (Time t : this.schedule) {
147        if (t instanceof ClientAlarm) return false;
148      }
149      return true;
150    }
151  }
152
153  private final <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) {
154    for (final EventHandler<T> handler : handlers) {
155      this.handlers.subscribe(eventClass, handler);
156    }
157  }
158
159  /**
160   * Logs the currently running threads.
161   *
162   * @param level  the level used for the log entry
163   * @param prefix put before the comma-separated list of threads
164   */
165  private void logThreads(final Level level, final String prefix) {
166    final StringBuilder sb = new StringBuilder(prefix);
167    for (final Thread t : Thread.getAllStackTraces().keySet()) {
168      sb.append(t.getName());
169      sb.append(", ");
170    }
171    LOG.log(level, sb.toString());
172  }
173
174  @Override
175  public final void run() {
176    LOG.entering(RuntimeClock.class.getCanonicalName(), "run");
177
178    try {
179      LOG.log(Level.FINE, "Subscribe event handlers");
180      subscribe(StartTime.class, this.startHandler.get());
181      subscribe(StopTime.class, this.stopHandler.get());
182      subscribe(RuntimeStart.class, this.runtimeStartHandler.get());
183      subscribe(RuntimeStop.class, this.runtimeStopHandler.get());
184      subscribe(IdleClock.class, this.idleHandler.get());
185
186      LOG.log(Level.FINE, "Initiate runtime start");
187      this.handlers.onNext(new RuntimeStart(this.timer.getCurrent()));
188
189      LOG.log(Level.FINE, "Initiate start time");
190      final StartTime start = new StartTime(this.timer.getCurrent());
191      this.handlers.onNext(start);
192
193      while (true) {
194        LOG.log(Level.FINEST, "Entering clock main loop iteration.");
195        try {
196          Time time = null;
197          synchronized (this.schedule) {
198            if (this.isIdle()) {
199              this.handlers.onNext(new IdleClock(timer.getCurrent()));
200            }
201
202            while (this.schedule.isEmpty()) {
203              this.schedule.wait();
204            }
205
206            assert (this.schedule.first() != null);
207
208            // Wait until the first scheduled time is ready
209            for (long duration = this.timer.getDuration(this.schedule.first().getTimeStamp());
210                 duration > 0;
211                 duration = this.timer.getDuration(this.schedule.first().getTimeStamp())) {
212              // note: while I'm waiting, another alarm could be scheduled with a shorter duration
213              // so the next time I go around the loop I need to revise my duration
214              this.schedule.wait(duration);
215            }
216            // Remove the event from the schedule and process it:
217            time = this.schedule.pollFirst();
218            assert (time != null);
219          }
220
221          if (time instanceof Alarm) {
222            final Alarm alarm = (Alarm) time;
223            alarm.handle();
224          } else {
225            this.handlers.onNext(time);
226            if (time instanceof StopTime) break; // we're done.
227          }
228        } catch (InterruptedException e) {
229        }
230      }
231      this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
232    } catch (Exception e) {
233      e.printStackTrace();
234      this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));
235    } finally {
236      logThreads(Level.FINE, "Threads running after exiting the clock main loop: ");
237      LOG.log(Level.FINE, "Runtime clock exit");
238    }
239    LOG.exiting(RuntimeClock.class.getCanonicalName(), "run");
240
241  }
242
243
244}