001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.time.runtime; 020 021import org.apache.reef.tang.InjectionFuture; 022import org.apache.reef.tang.annotations.Parameter; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.impl.PubSubEventHandler; 025import org.apache.reef.wake.time.Clock; 026import org.apache.reef.wake.time.Time; 027import org.apache.reef.wake.time.event.Alarm; 028import org.apache.reef.wake.time.event.StartTime; 029import org.apache.reef.wake.time.event.StopTime; 030import org.apache.reef.wake.time.runtime.event.*; 031 032import javax.inject.Inject; 033import java.util.Set; 034import java.util.TreeSet; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038/** 039 * Default implementation of clock. 040 * 041 * After invoking `RuntimeStart` and `StartTime` events initially, 042 * this invokes scheduled events on time. If there is no scheduled event, 043 * `IdleClock` event is invoked. 044 */ 045public final class RuntimeClock implements Clock { 046 047 private static final Logger LOG = Logger.getLogger(Clock.class.toString()); 048 049 private final Timer timer; 050 051 private final TreeSet<Time> schedule; 052 053 private final PubSubEventHandler<Time> handlers; 054 055 private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler; 056 private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler; 057 private final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler; 058 private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler; 059 private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler; 060 061 private Throwable stoppedOnException; 062 private boolean closed = false; 063 064 @Inject 065 RuntimeClock(final Timer timer, 066 @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler, 067 @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler, 068 @Parameter(Clock.RuntimeStartHandler.class) 069 final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler, 070 @Parameter(Clock.RuntimeStopHandler.class) 071 final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler, 072 @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) { 073 this.timer = timer; 074 this.schedule = new TreeSet<>(); 075 this.handlers = new PubSubEventHandler<>(); 076 077 this.startHandler = startHandler; 078 this.stopHandler = stopHandler; 079 this.runtimeStartHandler = runtimeStartHandler; 080 this.runtimeStopHandler = runtimeStopHandler; 081 this.idleHandler = idleHandler; 082 083 this.stoppedOnException = null; 084 085 LOG.log(Level.FINE, "RuntimeClock instantiated."); 086 } 087 088 @Override 089 public void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { 090 synchronized (this.schedule) { 091 if (this.closed) { 092 throw new IllegalStateException("Scheduling alarm on a closed clock"); 093 } 094 095 this.schedule.add(new ClientAlarm(this.timer.getCurrent() + offset, handler)); 096 this.schedule.notifyAll(); 097 } 098 } 099 100 public void registerEventHandler(final Class<? extends Time> clazz, final EventHandler<Time> handler) { 101 this.handlers.subscribe(clazz, handler); 102 } 103 104 public void scheduleRuntimeAlarm(final int offset, final EventHandler<Alarm> handler) { 105 synchronized (this.schedule) { 106 this.schedule.add(new RuntimeAlarm(this.timer.getCurrent() + offset, handler)); 107 this.schedule.notifyAll(); 108 } 109 } 110 111 @Override 112 public void stop() { 113 this.stop(null); 114 } 115 116 @Override 117 public void stop(final Throwable stopOnException) { 118 LOG.entering(RuntimeClock.class.getCanonicalName(), "stop"); 119 synchronized (this.schedule) { 120 this.schedule.clear(); 121 this.schedule.add(new StopTime(timer.getCurrent())); 122 this.schedule.notifyAll(); 123 this.closed = true; 124 if (this.stoppedOnException == null) { 125 this.stoppedOnException = stopOnException; 126 } 127 } 128 LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop"); 129 } 130 131 @Override 132 public void close() { 133 LOG.entering(RuntimeClock.class.getCanonicalName(), "close"); 134 synchronized (this.schedule) { 135 if (this.closed) { 136 LOG.log(Level.INFO, "Clock is already closed"); 137 return; 138 } 139 this.schedule.clear(); 140 this.schedule.add(new StopTime(findAcceptableStopTime())); 141 this.schedule.notifyAll(); 142 this.closed = true; 143 LOG.log(Level.INFO, "Clock.close()"); 144 } 145 LOG.exiting(RuntimeClock.class.getCanonicalName(), "close"); 146 } 147 148 /** 149 * Finds an acceptable stop time, which is the 150 * a time beyond that of any client alarm. 151 * 152 * @return an acceptable stop time 153 */ 154 private long findAcceptableStopTime() { 155 long time = timer.getCurrent(); 156 for (final Time t : this.schedule) { 157 if (t instanceof ClientAlarm) { 158 assert time <= t.getTimeStamp(); 159 time = t.getTimeStamp(); 160 } 161 } 162 return time + 1; 163 } 164 165 166 @Override 167 public boolean isIdle() { 168 synchronized (this.schedule) { 169 for (final Time t : this.schedule) { 170 if (t instanceof ClientAlarm) { 171 return false; 172 } 173 } 174 return true; 175 } 176 } 177 178 @SuppressWarnings("checkstyle:hiddenfield") 179 private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) { 180 for (final EventHandler<T> handler : handlers) { 181 this.handlers.subscribe(eventClass, handler); 182 } 183 } 184 185 /** 186 * Logs the currently running threads. 187 * 188 * @param level the level used for the log entry 189 * @param prefix put before the comma-separated list of threads 190 */ 191 private void logThreads(final Level level, final String prefix) { 192 final StringBuilder sb = new StringBuilder(prefix); 193 for (final Thread t : Thread.getAllStackTraces().keySet()) { 194 sb.append(t.getName()); 195 sb.append(", "); 196 } 197 LOG.log(level, sb.toString()); 198 } 199 200 @Override 201 public void run() { 202 LOG.entering(RuntimeClock.class.getCanonicalName(), "run"); 203 204 try { 205 LOG.log(Level.FINE, "Subscribe event handlers"); 206 subscribe(StartTime.class, this.startHandler.get()); 207 subscribe(StopTime.class, this.stopHandler.get()); 208 subscribe(RuntimeStart.class, this.runtimeStartHandler.get()); 209 subscribe(RuntimeStop.class, this.runtimeStopHandler.get()); 210 subscribe(IdleClock.class, this.idleHandler.get()); 211 212 LOG.log(Level.FINE, "Initiate runtime start"); 213 this.handlers.onNext(new RuntimeStart(this.timer.getCurrent())); 214 215 LOG.log(Level.FINE, "Initiate start time"); 216 final StartTime start = new StartTime(this.timer.getCurrent()); 217 this.handlers.onNext(start); 218 219 while (true) { 220 LOG.log(Level.FINEST, "Entering clock main loop iteration."); 221 try { 222 if (this.isIdle()) { 223 // Handle an idle clock event, without locking this.schedule 224 this.handlers.onNext(new IdleClock(timer.getCurrent())); 225 } 226 227 Time time = null; 228 synchronized (this.schedule) { 229 while (this.schedule.isEmpty()) { 230 this.schedule.wait(); 231 } 232 233 assert this.schedule.first() != null; 234 235 // Wait until the first scheduled time is ready 236 for (long duration = this.timer.getDuration(this.schedule.first().getTimeStamp()); 237 duration > 0; 238 duration = this.timer.getDuration(this.schedule.first().getTimeStamp())) { 239 // note: while I'm waiting, another alarm could be scheduled with a shorter duration 240 // so the next time I go around the loop I need to revise my duration 241 this.schedule.wait(duration); 242 } 243 // Remove the event from the schedule and process it: 244 time = this.schedule.pollFirst(); 245 assert time != null; 246 } 247 248 if (time instanceof Alarm) { 249 final Alarm alarm = (Alarm) time; 250 alarm.handle(); 251 } else { 252 this.handlers.onNext(time); 253 if (time instanceof StopTime) { 254 break; // we're done. 255 } 256 } 257 } catch (final InterruptedException expected) { 258 // waiting interrupted - return to loop 259 } 260 } 261 if (this.stoppedOnException == null) { 262 this.handlers.onNext(new RuntimeStop(this.timer.getCurrent())); 263 } else { 264 this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.stoppedOnException)); 265 } 266 } catch (final Exception e) { 267 e.printStackTrace(); 268 this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e)); 269 } finally { 270 logThreads(Level.FINE, "Threads running after exiting the clock main loop: "); 271 LOG.log(Level.FINE, "Runtime clock exit"); 272 } 273 LOG.exiting(RuntimeClock.class.getCanonicalName(), "run"); 274 275 } 276 277 278}