This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.time.runtime;
020
021import org.apache.reef.tang.InjectionFuture;
022import org.apache.reef.tang.annotations.Parameter;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.impl.PubSubEventHandler;
025import org.apache.reef.wake.time.Clock;
026import org.apache.reef.wake.time.Time;
027import org.apache.reef.wake.time.event.Alarm;
028import org.apache.reef.wake.time.event.StartTime;
029import org.apache.reef.wake.time.event.StopTime;
030import org.apache.reef.wake.time.runtime.event.*;
031
032import javax.inject.Inject;
033import java.util.Set;
034import java.util.TreeSet;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038/**
039 * Default implementation of clock.
040 *
041 * After invoking `RuntimeStart` and `StartTime` events initially,
042 * this invokes scheduled events on time. If there is no scheduled event,
043 * `IdleClock` event is invoked.
044 */
045public final class RuntimeClock implements Clock {
046
047  private static final Logger LOG = Logger.getLogger(RuntimeClock.class.getName());
048  private static final String CLASS_NAME = RuntimeClock.class.getCanonicalName();
049
050  /**
051   * Injectable source of current time information.
052   * Usually an instance of RealTimer that wraps the system clock.
053   */
054  private final Timer timer;
055
056  /**
057   * An ordered set of timed objects, in ascending order of their timestamps.
058   * It also serves as the main synchronization monitor for the class.
059   */
060  private final TreeSet<Time> schedule = new TreeSet<>();
061
062  /** Event handlers - populated with the injectable parameters provided to the RuntimeClock constructor. */
063  private final PubSubEventHandler<Time> handlers = new PubSubEventHandler<>();
064
065  private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler;
066  private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler;
067  private final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler;
068  private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler;
069  private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler;
070
071  /**
072   * Timestamp of the last client alarm in the schedule.
073   * We use it to schedule a graceful shutdown event immediately after all client alarms.
074   */
075  private long lastClientAlarm = 0;
076
077  /**
078   * Number of client alarms in the schedule.
079   * We need it to determine whether event loop is idle (i.e. has no client alarms scheduled)
080   */
081  private int numClientAlarms = 0;
082
083  /** Set to true when the clock is closed. */
084  private boolean isClosed = false;
085
086  /** Exception that caused the clock to stop. */
087  private Throwable exceptionCausedStop = null;
088
089  @Inject
090  private RuntimeClock(
091      final Timer timer,
092      @Parameter(Clock.StartHandler.class)
093          final InjectionFuture<Set<EventHandler<StartTime>>> startHandler,
094      @Parameter(Clock.StopHandler.class)
095          final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler,
096      @Parameter(Clock.RuntimeStartHandler.class)
097          final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler,
098      @Parameter(Clock.RuntimeStopHandler.class)
099          final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler,
100      @Parameter(Clock.IdleHandler.class)
101          final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) {
102
103    this.timer = timer;
104    this.startHandler = startHandler;
105    this.stopHandler = stopHandler;
106    this.runtimeStartHandler = runtimeStartHandler;
107    this.runtimeStopHandler = runtimeStopHandler;
108    this.idleHandler = idleHandler;
109
110    LOG.log(Level.FINE, "RuntimeClock instantiated.");
111  }
112
113  /**
114   * Schedule a new Alarm event in `offset` milliseconds into the future,
115   * and supply an event handler to be called at that time.
116   * @param offset Number of milliseconds into the future relative to current time.
117   * @param handler Event handler to be invoked.
118   * @return Newly scheduled alarm.
119   * @throws IllegalStateException if the clock is already closed.
120   */
121  @Override
122  public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
123
124    final Time alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler);
125
126    if (LOG.isLoggable(Level.FINEST)) {
127
128      final int eventQueueLen;
129      synchronized (this.schedule) {
130        eventQueueLen = this.numClientAlarms;
131      }
132
133      LOG.log(Level.FINEST,
134          "Schedule alarm: {0} Outstanding client alarms: {1}",
135          new Object[] {alarm, eventQueueLen});
136    }
137
138    synchronized (this.schedule) {
139
140      if (this.isClosed) {
141        throw new IllegalStateException("Scheduling alarm on a closed clock");
142      }
143
144      if (alarm.getTimestamp() > this.lastClientAlarm) {
145        this.lastClientAlarm = alarm.getTimestamp();
146      }
147
148      assert this.numClientAlarms >= 0;
149      ++this.numClientAlarms;
150
151      this.schedule.add(alarm);
152      this.schedule.notify();
153    }
154
155    return alarm;
156  }
157
158  /**
159   * Stop the clock. Remove all other events from the schedule and fire StopTimer
160   * event immediately. It is recommended to use close() method for graceful shutdown
161   * instead of stop().
162   */
163  @Override
164  public void stop() {
165    this.stop(null);
166  }
167
168  /**
169   * Stop the clock on exception.
170   * Remove all other events from the schedule and fire StopTimer event immediately.
171   * @param exception Exception that is the cause for the stop. Can be null.
172   */
173  @Override
174  public void stop(final Throwable exception) {
175
176    LOG.entering(CLASS_NAME, "stop");
177
178    synchronized (this.schedule) {
179
180      if (this.isClosed) {
181        LOG.log(Level.FINEST, "Clock has already been closed");
182        return;
183      }
184
185      this.isClosed = true;
186      this.exceptionCausedStop = exception;
187
188      final Time stopEvent = new StopTime(this.timer.getCurrent());
189      LOG.log(Level.FINE,
190          "Stop scheduled immediately: {0} Outstanding client alarms: {1}",
191          new Object[] {stopEvent, this.numClientAlarms});
192
193      assert this.numClientAlarms >= 0;
194      this.numClientAlarms = 0;
195
196      this.schedule.clear();
197      this.schedule.add(stopEvent);
198      this.schedule.notify();
199    }
200
201    LOG.exiting(CLASS_NAME, "stop");
202  }
203
204  /**
205   * Wait for all client alarms to finish executing and gracefully shutdown the clock.
206   */
207  @Override
208  public void close() {
209
210    LOG.entering(CLASS_NAME, "close");
211
212    synchronized (this.schedule) {
213
214      if (this.isClosed) {
215        LOG.exiting(CLASS_NAME, "close", "Clock has already been closed");
216        return;
217      }
218
219      this.isClosed = true;
220
221      final Time stopEvent = new StopTime(Math.max(this.timer.getCurrent(), this.lastClientAlarm + 1));
222      LOG.log(Level.FINE,
223          "Graceful shutdown scheduled: {0} Outstanding client alarms: {1}",
224          new Object[] {stopEvent, this.numClientAlarms});
225
226      this.schedule.add(stopEvent);
227      this.schedule.notify();
228    }
229
230    LOG.exiting(CLASS_NAME, "close");
231  }
232
233  /**
234   * Check if there are no client alarms scheduled.
235   * @return True if there are no client alarms in the schedule, false otherwise.
236   */
237  @Override
238  public boolean isIdle() {
239    synchronized (this.schedule) {
240      assert this.numClientAlarms >= 0;
241      return this.numClientAlarms == 0;
242    }
243  }
244
245  /**
246   * The clock is closed after a call to stop() or close().
247   * A closed clock cannot add new alarms to the schedule, but, in case of the
248   * graceful shutdown, can still invoke previously scheduled ones.
249   * @return true if closed, false otherwise.
250   */
251  @Override
252  public boolean isClosed() {
253    synchronized (this.schedule) {
254      return this.isClosed;
255    }
256  }
257
258  /**
259   * Register event handlers for the given event class.
260   * @param eventClass Event type to handle. Must be derived from Time.
261   * @param handlers One or many event handlers that can process given event type.
262   * @param <T> Event type - must be derived from class Time. (i.e. contain a timestamp).
263   */
264  @SuppressWarnings("checkstyle:hiddenfield")
265  private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) {
266    for (final EventHandler<T> handler : handlers) {
267      LOG.log(Level.FINEST, "Subscribe: event {0} handler {1}", new Object[] {eventClass.getName(), handler});
268      this.handlers.subscribe(eventClass, handler);
269    }
270  }
271
272  /**
273   * Main event loop.
274   * Set up the event handlers, and go into event loop that polls the schedule and process events in it.
275   */
276  @Override
277  public void run() {
278
279    LOG.entering(CLASS_NAME, "run");
280
281    try {
282
283      LOG.log(Level.FINE, "Subscribe event handlers");
284
285      subscribe(StartTime.class, this.startHandler.get());
286      subscribe(StopTime.class, this.stopHandler.get());
287      subscribe(RuntimeStart.class, this.runtimeStartHandler.get());
288      subscribe(RuntimeStop.class, this.runtimeStopHandler.get());
289      subscribe(IdleClock.class, this.idleHandler.get());
290
291      LOG.log(Level.FINE, "Initiate runtime start");
292      this.handlers.onNext(new RuntimeStart(this.timer.getCurrent()));
293
294      LOG.log(Level.FINE, "Initiate start time");
295      this.handlers.onNext(new StartTime(this.timer.getCurrent()));
296
297      while (true) {
298
299        LOG.log(Level.FINEST, "Enter clock main loop.");
300
301        try {
302
303          if (this.isIdle()) {
304            // Handle an idle clock event, without locking this.schedule
305            this.handlers.onNext(new IdleClock(this.timer.getCurrent()));
306          }
307
308          final Time event;
309          final int eventQueueLen;
310          synchronized (this.schedule) {
311
312            while (this.schedule.isEmpty()) {
313              this.schedule.wait();
314            }
315
316            assert this.schedule.first() != null;
317
318            // Wait until the first scheduled time is ready.
319            // NOTE: while waiting, another alarm could be scheduled with a shorter duration
320            // so the next time I go around the loop I need to revise my duration.
321            while (true) {
322              final long waitDuration = this.timer.getDuration(this.schedule.first());
323              if (waitDuration <= 0) {
324                break;
325              }
326              this.schedule.wait(waitDuration);
327            }
328
329            // Remove the event from the schedule and process it:
330            event = this.schedule.pollFirst();
331
332            if (event instanceof ClientAlarm) {
333              --this.numClientAlarms;
334              assert this.numClientAlarms >= 0;
335            }
336
337            eventQueueLen = this.numClientAlarms;
338          }
339
340          assert event != null;
341
342          LOG.log(Level.FINER,
343              "Process event: {0} Outstanding client alarms: {1}", new Object[] {event, eventQueueLen});
344
345          if (event instanceof Alarm) {
346            ((Alarm) event).run();
347          } else {
348            this.handlers.onNext(event);
349            if (event instanceof StopTime) {
350              break; // we're done.
351            }
352          }
353
354        } catch (final InterruptedException expected) {
355          LOG.log(Level.FINEST, "Wait interrupted; continue event loop.");
356        }
357      }
358
359      this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.exceptionCausedStop));
360
361    } catch (final Exception e) {
362
363      LOG.log(Level.SEVERE, "Error in runtime clock", e);
364      this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));
365
366    } finally {
367      LOG.log(Level.FINE, "Runtime clock exit");
368    }
369
370    LOG.exiting(CLASS_NAME, "run");
371  }
372}