001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.time.runtime; 020 021import org.apache.reef.tang.InjectionFuture; 022import org.apache.reef.tang.annotations.Parameter; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.impl.PubSubEventHandler; 025import org.apache.reef.wake.time.Clock; 026import org.apache.reef.wake.time.Time; 027import org.apache.reef.wake.time.event.Alarm; 028import org.apache.reef.wake.time.event.StartTime; 029import org.apache.reef.wake.time.event.StopTime; 030import org.apache.reef.wake.time.runtime.event.*; 031 032import javax.inject.Inject; 033import java.util.Set; 034import java.util.TreeSet; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038public final class RuntimeClock implements Clock { 039 040 private final static Logger LOG = Logger.getLogger(Clock.class.toString()); 041 042 private final Timer timer; 043 044 private final TreeSet<Time> schedule; 045 046 private final PubSubEventHandler<Time> handlers; 047 048 private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler; 049 private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler; 050 private final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler; 051 private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler; 052 private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler; 053 054 private boolean closed = false; 055 056 @Inject 057 RuntimeClock(final Timer timer, 058 @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler, 059 @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler, 060 @Parameter(Clock.RuntimeStartHandler.class) final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler, 061 @Parameter(Clock.RuntimeStopHandler.class) final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler, 062 @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) { 063 this.timer = timer; 064 this.schedule = new TreeSet<>(); 065 this.handlers = new PubSubEventHandler<>(); 066 067 this.startHandler = startHandler; 068 this.stopHandler = stopHandler; 069 this.runtimeStartHandler = runtimeStartHandler; 070 this.runtimeStopHandler = runtimeStopHandler; 071 this.idleHandler = idleHandler; 072 073 LOG.log(Level.FINE, "RuntimeClock instantiated."); 074 } 075 076 @Override 077 public final void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { 078 synchronized (this.schedule) { 079 if (this.closed) { 080 throw new IllegalStateException("Scheduling alarm on a closed clock"); 081 } 082 083 this.schedule.add(new ClientAlarm(this.timer.getCurrent() + offset, handler)); 084 this.schedule.notifyAll(); 085 } 086 } 087 088 public final void registerEventHandler(final Class<? extends Time> clazz, final EventHandler<Time> handler) { 089 this.handlers.subscribe(clazz, handler); 090 } 091 092 public final void scheduleRuntimeAlarm(final int offset, final EventHandler<Alarm> handler) { 093 synchronized (this.schedule) { 094 this.schedule.add(new RuntimeAlarm(this.timer.getCurrent() + offset, handler)); 095 this.schedule.notifyAll(); 096 } 097 } 098 099 @Override 100 public final void stop() { 101 LOG.entering(RuntimeClock.class.getCanonicalName(), "stop"); 102 synchronized (this.schedule) { 103 this.schedule.clear(); 104 this.schedule.add(new StopTime(timer.getCurrent())); 105 this.schedule.notifyAll(); 106 this.closed = true; 107 } 108 LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop"); 109 } 110 111 @Override 112 public final void close() { 113 LOG.entering(RuntimeClock.class.getCanonicalName(), "close"); 114 synchronized (this.schedule) { 115 this.schedule.clear(); 116 this.schedule.add(new StopTime(findAcceptableStopTime())); 117 this.schedule.notifyAll(); 118 this.closed = true; 119 LOG.log(Level.INFO, "Clock.close()"); 120 } 121 LOG.exiting(RuntimeClock.class.getCanonicalName(), "close"); 122 } 123 124 125 /** 126 * Finds an acceptable stop time, which is the 127 * a time beyond that of any client alarm. 128 * 129 * @return an acceptable stop time 130 */ 131 private final long findAcceptableStopTime() { 132 long time = timer.getCurrent(); 133 for (Time t : this.schedule) { 134 if (t instanceof ClientAlarm) { 135 assert (time <= t.getTimeStamp()); 136 time = t.getTimeStamp(); 137 } 138 } 139 return time + 1; 140 } 141 142 143 @Override 144 public final boolean isIdle() { 145 synchronized (this.schedule) { 146 for (Time t : this.schedule) { 147 if (t instanceof ClientAlarm) return false; 148 } 149 return true; 150 } 151 } 152 153 private final <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) { 154 for (final EventHandler<T> handler : handlers) { 155 this.handlers.subscribe(eventClass, handler); 156 } 157 } 158 159 /** 160 * Logs the currently running threads. 161 * 162 * @param level the level used for the log entry 163 * @param prefix put before the comma-separated list of threads 164 */ 165 private void logThreads(final Level level, final String prefix) { 166 final StringBuilder sb = new StringBuilder(prefix); 167 for (final Thread t : Thread.getAllStackTraces().keySet()) { 168 sb.append(t.getName()); 169 sb.append(", "); 170 } 171 LOG.log(level, sb.toString()); 172 } 173 174 @Override 175 public final void run() { 176 LOG.entering(RuntimeClock.class.getCanonicalName(), "run"); 177 178 try { 179 LOG.log(Level.FINE, "Subscribe event handlers"); 180 subscribe(StartTime.class, this.startHandler.get()); 181 subscribe(StopTime.class, this.stopHandler.get()); 182 subscribe(RuntimeStart.class, this.runtimeStartHandler.get()); 183 subscribe(RuntimeStop.class, this.runtimeStopHandler.get()); 184 subscribe(IdleClock.class, this.idleHandler.get()); 185 186 LOG.log(Level.FINE, "Initiate runtime start"); 187 this.handlers.onNext(new RuntimeStart(this.timer.getCurrent())); 188 189 LOG.log(Level.FINE, "Initiate start time"); 190 final StartTime start = new StartTime(this.timer.getCurrent()); 191 this.handlers.onNext(start); 192 193 while (true) { 194 LOG.log(Level.FINEST, "Entering clock main loop iteration."); 195 try { 196 Time time = null; 197 synchronized (this.schedule) { 198 if (this.isIdle()) { 199 this.handlers.onNext(new IdleClock(timer.getCurrent())); 200 } 201 202 while (this.schedule.isEmpty()) { 203 this.schedule.wait(); 204 } 205 206 assert (this.schedule.first() != null); 207 208 // Wait until the first scheduled time is ready 209 for (long duration = this.timer.getDuration(this.schedule.first().getTimeStamp()); 210 duration > 0; 211 duration = this.timer.getDuration(this.schedule.first().getTimeStamp())) { 212 // note: while I'm waiting, another alarm could be scheduled with a shorter duration 213 // so the next time I go around the loop I need to revise my duration 214 this.schedule.wait(duration); 215 } 216 // Remove the event from the schedule and process it: 217 time = this.schedule.pollFirst(); 218 assert (time != null); 219 } 220 221 if (time instanceof Alarm) { 222 final Alarm alarm = (Alarm) time; 223 alarm.handle(); 224 } else { 225 this.handlers.onNext(time); 226 if (time instanceof StopTime) break; // we're done. 227 } 228 } catch (InterruptedException e) { 229 } 230 } 231 this.handlers.onNext(new RuntimeStop(this.timer.getCurrent())); 232 } catch (Exception e) { 233 e.printStackTrace(); 234 this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e)); 235 } finally { 236 logThreads(Level.FINE, "Threads running after exiting the clock main loop: "); 237 LOG.log(Level.FINE, "Runtime clock exit"); 238 } 239 LOG.exiting(RuntimeClock.class.getCanonicalName(), "run"); 240 241 } 242 243 244}