001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.time.runtime; 020 021import org.apache.reef.tang.InjectionFuture; 022import org.apache.reef.tang.annotations.Parameter; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.impl.PubSubEventHandler; 025import org.apache.reef.wake.time.Clock; 026import org.apache.reef.wake.time.Time; 027import org.apache.reef.wake.time.event.Alarm; 028import org.apache.reef.wake.time.event.StartTime; 029import org.apache.reef.wake.time.event.StopTime; 030import org.apache.reef.wake.time.runtime.event.*; 031 032import javax.inject.Inject; 033import java.util.Set; 034import java.util.TreeSet; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038/** 039 * Default implementation of clock. 040 * 041 * After invoking `RuntimeStart` and `StartTime` events initially, 042 * this invokes scheduled events on time. If there is no scheduled event, 043 * `IdleClock` event is invoked. 044 */ 045public final class RuntimeClock implements Clock { 046 047 private static final Logger LOG = Logger.getLogger(RuntimeClock.class.getName()); 048 private static final String CLASS_NAME = RuntimeClock.class.getCanonicalName(); 049 050 /** 051 * Injectable source of current time information. 052 * Usually an instance of RealTimer that wraps the system clock. 053 */ 054 private final Timer timer; 055 056 /** 057 * An ordered set of timed objects, in ascending order of their timestamps. 058 * It also serves as the main synchronization monitor for the class. 059 */ 060 private final TreeSet<Time> schedule = new TreeSet<>(); 061 062 /** Event handlers - populated with the injectable parameters provided to the RuntimeClock constructor. */ 063 private final PubSubEventHandler<Time> handlers = new PubSubEventHandler<>(); 064 065 private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler; 066 private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler; 067 private final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler; 068 private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler; 069 private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler; 070 071 /** 072 * Timestamp of the last client alarm in the schedule. 073 * We use it to schedule a graceful shutdown event immediately after all client alarms. 074 */ 075 private long lastClientAlarm = 0; 076 077 /** 078 * Number of client alarms in the schedule. 079 * We need it to determine whether event loop is idle (i.e. has no client alarms scheduled) 080 */ 081 private int numClientAlarms = 0; 082 083 /** Set to true when the clock is closed. */ 084 private boolean isClosed = false; 085 086 /** Exception that caused the clock to stop. */ 087 private Throwable exceptionCausedStop = null; 088 089 @Inject 090 private RuntimeClock( 091 final Timer timer, 092 @Parameter(Clock.StartHandler.class) 093 final InjectionFuture<Set<EventHandler<StartTime>>> startHandler, 094 @Parameter(Clock.StopHandler.class) 095 final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler, 096 @Parameter(Clock.RuntimeStartHandler.class) 097 final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler, 098 @Parameter(Clock.RuntimeStopHandler.class) 099 final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler, 100 @Parameter(Clock.IdleHandler.class) 101 final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) { 102 103 this.timer = timer; 104 this.startHandler = startHandler; 105 this.stopHandler = stopHandler; 106 this.runtimeStartHandler = runtimeStartHandler; 107 this.runtimeStopHandler = runtimeStopHandler; 108 this.idleHandler = idleHandler; 109 110 LOG.log(Level.FINE, "RuntimeClock instantiated."); 111 } 112 113 /** 114 * Schedule a new Alarm event in `offset` milliseconds into the future, 115 * and supply an event handler to be called at that time. 116 * @param offset Number of milliseconds into the future relative to current time. 117 * @param handler Event handler to be invoked. 118 * @return Newly scheduled alarm. 119 * @throws IllegalStateException if the clock is already closed. 120 */ 121 @Override 122 public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { 123 124 final Time alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler); 125 126 if (LOG.isLoggable(Level.FINEST)) { 127 128 final int eventQueueLen; 129 synchronized (this.schedule) { 130 eventQueueLen = this.numClientAlarms; 131 } 132 133 LOG.log(Level.FINEST, 134 "Schedule alarm: {0} Outstanding client alarms: {1}", 135 new Object[] {alarm, eventQueueLen}); 136 } 137 138 synchronized (this.schedule) { 139 140 if (this.isClosed) { 141 throw new IllegalStateException("Scheduling alarm on a closed clock"); 142 } 143 144 if (alarm.getTimestamp() > this.lastClientAlarm) { 145 this.lastClientAlarm = alarm.getTimestamp(); 146 } 147 148 assert this.numClientAlarms >= 0; 149 ++this.numClientAlarms; 150 151 this.schedule.add(alarm); 152 this.schedule.notify(); 153 } 154 155 return alarm; 156 } 157 158 /** 159 * Stop the clock. Remove all other events from the schedule and fire StopTimer 160 * event immediately. It is recommended to use close() method for graceful shutdown 161 * instead of stop(). 162 */ 163 @Override 164 public void stop() { 165 this.stop(null); 166 } 167 168 /** 169 * Stop the clock on exception. 170 * Remove all other events from the schedule and fire StopTimer event immediately. 171 * @param exception Exception that is the cause for the stop. Can be null. 172 */ 173 @Override 174 public void stop(final Throwable exception) { 175 176 LOG.entering(CLASS_NAME, "stop"); 177 178 synchronized (this.schedule) { 179 180 if (this.isClosed) { 181 LOG.log(Level.FINEST, "Clock has already been closed"); 182 return; 183 } 184 185 this.isClosed = true; 186 this.exceptionCausedStop = exception; 187 188 final Time stopEvent = new StopTime(this.timer.getCurrent()); 189 LOG.log(Level.FINE, 190 "Stop scheduled immediately: {0} Outstanding client alarms: {1}", 191 new Object[] {stopEvent, this.numClientAlarms}); 192 193 assert this.numClientAlarms >= 0; 194 this.numClientAlarms = 0; 195 196 this.schedule.clear(); 197 this.schedule.add(stopEvent); 198 this.schedule.notify(); 199 } 200 201 LOG.exiting(CLASS_NAME, "stop"); 202 } 203 204 /** 205 * Wait for all client alarms to finish executing and gracefully shutdown the clock. 206 */ 207 @Override 208 public void close() { 209 210 LOG.entering(CLASS_NAME, "close"); 211 212 synchronized (this.schedule) { 213 214 if (this.isClosed) { 215 LOG.exiting(CLASS_NAME, "close", "Clock has already been closed"); 216 return; 217 } 218 219 this.isClosed = true; 220 221 final Time stopEvent = new StopTime(Math.max(this.timer.getCurrent(), this.lastClientAlarm + 1)); 222 LOG.log(Level.FINE, 223 "Graceful shutdown scheduled: {0} Outstanding client alarms: {1}", 224 new Object[] {stopEvent, this.numClientAlarms}); 225 226 this.schedule.add(stopEvent); 227 this.schedule.notify(); 228 } 229 230 LOG.exiting(CLASS_NAME, "close"); 231 } 232 233 /** 234 * Check if there are no client alarms scheduled. 235 * @return True if there are no client alarms in the schedule, false otherwise. 236 */ 237 @Override 238 public boolean isIdle() { 239 synchronized (this.schedule) { 240 assert this.numClientAlarms >= 0; 241 return this.numClientAlarms == 0; 242 } 243 } 244 245 /** 246 * The clock is closed after a call to stop() or close(). 247 * A closed clock cannot add new alarms to the schedule, but, in case of the 248 * graceful shutdown, can still invoke previously scheduled ones. 249 * @return true if closed, false otherwise. 250 */ 251 @Override 252 public boolean isClosed() { 253 synchronized (this.schedule) { 254 return this.isClosed; 255 } 256 } 257 258 /** 259 * Register event handlers for the given event class. 260 * @param eventClass Event type to handle. Must be derived from Time. 261 * @param handlers One or many event handlers that can process given event type. 262 * @param <T> Event type - must be derived from class Time. (i.e. contain a timestamp). 263 */ 264 @SuppressWarnings("checkstyle:hiddenfield") 265 private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) { 266 for (final EventHandler<T> handler : handlers) { 267 LOG.log(Level.FINEST, "Subscribe: event {0} handler {1}", new Object[] {eventClass.getName(), handler}); 268 this.handlers.subscribe(eventClass, handler); 269 } 270 } 271 272 /** 273 * Main event loop. 274 * Set up the event handlers, and go into event loop that polls the schedule and process events in it. 275 */ 276 @Override 277 public void run() { 278 279 LOG.entering(CLASS_NAME, "run"); 280 281 try { 282 283 LOG.log(Level.FINE, "Subscribe event handlers"); 284 285 subscribe(StartTime.class, this.startHandler.get()); 286 subscribe(StopTime.class, this.stopHandler.get()); 287 subscribe(RuntimeStart.class, this.runtimeStartHandler.get()); 288 subscribe(RuntimeStop.class, this.runtimeStopHandler.get()); 289 subscribe(IdleClock.class, this.idleHandler.get()); 290 291 LOG.log(Level.FINE, "Initiate runtime start"); 292 this.handlers.onNext(new RuntimeStart(this.timer.getCurrent())); 293 294 LOG.log(Level.FINE, "Initiate start time"); 295 this.handlers.onNext(new StartTime(this.timer.getCurrent())); 296 297 while (true) { 298 299 LOG.log(Level.FINEST, "Enter clock main loop."); 300 301 try { 302 303 if (this.isIdle()) { 304 // Handle an idle clock event, without locking this.schedule 305 this.handlers.onNext(new IdleClock(this.timer.getCurrent())); 306 } 307 308 final Time event; 309 final int eventQueueLen; 310 synchronized (this.schedule) { 311 312 while (this.schedule.isEmpty()) { 313 this.schedule.wait(); 314 } 315 316 assert this.schedule.first() != null; 317 318 // Wait until the first scheduled time is ready. 319 // NOTE: while waiting, another alarm could be scheduled with a shorter duration 320 // so the next time I go around the loop I need to revise my duration. 321 while (true) { 322 final long waitDuration = this.timer.getDuration(this.schedule.first()); 323 if (waitDuration <= 0) { 324 break; 325 } 326 this.schedule.wait(waitDuration); 327 } 328 329 // Remove the event from the schedule and process it: 330 event = this.schedule.pollFirst(); 331 332 if (event instanceof ClientAlarm) { 333 --this.numClientAlarms; 334 assert this.numClientAlarms >= 0; 335 } 336 337 eventQueueLen = this.numClientAlarms; 338 } 339 340 assert event != null; 341 342 LOG.log(Level.FINER, 343 "Process event: {0} Outstanding client alarms: {1}", new Object[] {event, eventQueueLen}); 344 345 if (event instanceof Alarm) { 346 ((Alarm) event).run(); 347 } else { 348 this.handlers.onNext(event); 349 if (event instanceof StopTime) { 350 break; // we're done. 351 } 352 } 353 354 } catch (final InterruptedException expected) { 355 LOG.log(Level.FINEST, "Wait interrupted; continue event loop."); 356 } 357 } 358 359 this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.exceptionCausedStop)); 360 361 } catch (final Exception e) { 362 363 LOG.log(Level.SEVERE, "Error in runtime clock", e); 364 this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e)); 365 366 } finally { 367 LOG.log(Level.FINE, "Runtime clock exit"); 368 } 369 370 LOG.exiting(CLASS_NAME, "run"); 371 } 372}