001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.profiler; 020 021import net.sf.cglib.proxy.Enhancer; 022import net.sf.cglib.proxy.MethodInterceptor; 023import net.sf.cglib.proxy.MethodProxy; 024import org.apache.reef.tang.Aspect; 025import org.apache.reef.tang.InjectionFuture; 026import org.apache.reef.tang.types.ConstructorDef; 027import org.apache.reef.tang.util.MonotonicHashMap; 028import org.apache.reef.tang.util.ReflectionUtilities; 029import org.apache.reef.wake.EventHandler; 030import org.apache.reef.wake.Stage; 031import org.apache.reef.wake.rx.Observable; 032import org.apache.reef.wake.rx.Observer; 033import org.apache.reef.wake.rx.RxStage; 034 035import java.lang.reflect.Constructor; 036import java.lang.reflect.InvocationTargetException; 037import java.lang.reflect.Method; 038import java.lang.reflect.Modifier; 039import java.util.*; 040import java.util.concurrent.atomic.AtomicLong; 041import java.util.logging.Logger; 042 043public class WakeProfiler implements Aspect { 044 private final static Logger LOG = Logger.getLogger(WakeProfiler.class.toString()); 045 private final Map<Object, Vertex<?>> vertex_object = new MonotonicHashMap<>(); 046 private final Map<InjectionFuture<?>, Object> futures = new MonotonicHashMap<>(); 047 private final Map<Object, Stats> stats = new MonotonicHashMap<>(); 048 049 @Override 050 public Aspect createChildAspect() { 051 return this; 052 } 053 054 @SuppressWarnings("unchecked") 055 private <T> Vertex<T> getVertex(T t) { 056 if (t instanceof Set) { 057 return (Vertex<T>) newSetVertex((Set<?>) t); 058 } else { 059 Vertex<T> v = (Vertex<T>) vertex_object.get(t); 060 // Add dummy vertices for objects that were bound volatile. 061 if (v == null) { 062 v = new Vertex<>(t); 063 vertex_object.put(t, v); 064 } 065 return v; 066 } 067 } 068 069 @SuppressWarnings("unchecked") 070 private <T> Vertex<T> getFuture(InjectionFuture<T> future) { 071 return getVertex((T) futures.get(future)); 072 } 073 074 @SuppressWarnings("unchecked") 075 private <T> Vertex<?> newSetVertex(Set<T> s) { 076 if (vertex_object.containsKey(s)) { 077 return (Vertex<Set<T>>) vertex_object.get(s); 078 } 079 if (s.size() > -1) { 080 LOG.fine("new set of size " + s.size()); 081 Vertex<?>[] s_args = new Vertex[s.size()]; 082 int k = 0; 083 for (Object p : s) { 084 s_args[k] = getVertex(p); 085 k++; 086 } 087 Vertex<Set<T>> sv = new Vertex<>(s, null, s_args); 088 vertex_object.put(s, sv); 089 return sv; 090// } else if(s.size() == 1) { 091 } else { 092 Object p = s.iterator().next(); 093 Vertex<?> w = getVertex(p); 094 // alias the singleton set to its member 095 vertex_object.put(s, w); 096 return w; 097 } 098// } else { 099// // ignore the empty set. 100// } */ 101 } 102 103 @SuppressWarnings("unchecked") 104 @Override 105 public <T> T inject(ConstructorDef<T> constructorDef, Constructor<T> constructor, Object[] args) throws InvocationTargetException, IllegalAccessException, IllegalArgumentException, InstantiationException { 106// LOG.info("inject" + constructor + "->" + args.length); 107 Vertex<?>[] v_args = new Vertex[args.length]; 108 for (int i = 0; i < args.length; i++) { 109 Object o = args[i]; 110 Vertex<?> v = getVertex(o); 111 if (o instanceof Set) { 112 LOG.fine("Got a set arg for " + constructorDef + " length " + ((Set<?>) o).size()); 113 } else if (o instanceof InjectionFuture) { 114// LOG.info("Got injection future arg. "->" + o); 115 } 116 v_args[i] = v; 117 } 118 119 T ret; 120 final Class<T> clazz = constructor.getDeclaringClass(); 121 boolean isEventHandler = false; 122 for (Method m : clazz.getDeclaredMethods()) { 123 if (m.getName().equals("onNext")) { // XXX hack: Interpose on "event handler in spirit" 124 isEventHandler = true; 125 } 126 } 127 if (isEventHandler) { 128 try { 129 if (Modifier.isFinal(clazz.getDeclaredMethod("onNext", Object.class).getModifiers())) { 130 throw new Exception(ReflectionUtilities.getFullName(clazz) + ".onNext() is final; cannot intercept it"); 131 } 132 final Stats s = new Stats(); 133 Enhancer e = new Enhancer(); 134 e.setSuperclass(clazz); 135 e.setCallback(new MethodInterceptor() { 136 137 @Override 138 public Object intercept(Object object, Method method, Object[] args, 139 MethodProxy methodProxy) throws Throwable { 140 141 if (method.getName().equals("onNext")) { 142 long start = System.nanoTime(); 143// LOG.info(object + "." + method.getName() + " called"); 144 Object o = methodProxy.invokeSuper(object, args); 145 long stop = System.nanoTime(); 146 147 s.messageCount.incrementAndGet(); 148 s.sumLatency.addAndGet(stop - start); 149 150 return o; 151 152 } else { 153 return methodProxy.invokeSuper(object, args); 154 } 155 } 156 }); 157 ret = (T) e.create(constructor.getParameterTypes(), args); 158 stats.put(ret, s); 159 } catch (Exception e) { 160 LOG.warning("Wake profiler could not intercept event handler: " + e.getMessage()); 161 ret = constructor.newInstance(args); 162 } 163 } else { 164 ret = constructor.newInstance(args); 165 } 166 Vertex<T> v = new Vertex<T>(ret, constructorDef, v_args); 167 vertex_object.put(ret, v); 168 return ret; 169 } 170 171 @Override 172 public <T> void injectionFutureInstantiated(InjectionFuture<T> arg0, T arg1) { 173 if (!futures.containsKey(arg0)) { 174 LOG.warning("adding future " + arg0 + " instance " + arg1); 175 futures.put(arg0, arg1); 176 getVertex(arg1); 177 } 178 } 179 180 private String jsonEscape(String s) { 181 return s 182 .replaceAll("\\\\", "\\\\\\\\") 183 .replaceAll("\\\"", "\\\\\"") 184 .replaceAll("/", "\\\\/") 185 .replaceAll("\b", "\\\\b") 186 .replaceAll("\f", "\\\\f") 187 .replaceAll("\n", "\\\\n") 188 .replaceAll("\r", "\\\\r") 189 .replaceAll("\t", "\\\\t"); 190 191 } 192 193 private String join(String sep, List<String> tok) { 194 if (tok.size() == 0) { 195 return ""; 196 } 197 StringBuffer sb = new StringBuffer(tok.get(0)); 198 for (int i = 1; i < tok.size(); i++) { 199 sb.append(sep + tok.get(i)); 200 } 201 return sb.toString(); 202 } 203 204 private boolean whitelist(Object o) { 205 return (true 206 || (o instanceof InjectionFuture) 207 || (o instanceof Set) 208 || (o instanceof EventHandler) 209 || (o instanceof Stage) 210 || (o instanceof RxStage) 211 || (o instanceof Observer) 212 || (o instanceof Observable)) 213// && !(o instanceof Set) 214 ; 215 } 216 217 public String objectGraphToString() { 218 List<Vertex<?>> vertices = new ArrayList<>(); 219 Map<Vertex<?>, Integer> off_vertex = new MonotonicHashMap<>(); 220 221 StringBuffer sb = new StringBuffer("{\"nodes\":[\n"); 222 223 List<String> nodes = new ArrayList<String>(); 224 LinkedList<Vertex<?>> workQueue = new LinkedList<>(); 225 for (Object o : vertex_object.keySet()) { 226 if (whitelist(o)) { 227 workQueue.add(getVertex(o)); 228 } 229 } 230 for (Object o : futures.values()) { 231 if ((!vertex_object.containsKey(o)) && whitelist(o)) { 232 workQueue.add(getVertex(o)); 233 } 234 } 235 while (!workQueue.isEmpty()) { 236 Vertex<?> v = workQueue.removeFirst(); 237 LOG.warning("Work queue " + v); 238 239 Object o = v.getObject(); 240 final String s; 241 final String tooltip; 242 if (o instanceof InjectionFuture) { 243 s = null; 244 tooltip = null; 245 } else if (o instanceof String) { 246 s = "\"" + ((String) o) + "\""; 247 tooltip = null; 248 } else if (o instanceof Number) { 249 s = o.toString(); 250 tooltip = null; 251 } else if (o instanceof Set) { 252 LOG.warning("Set of size " + ((Set<?>) o).size() + " with " + v.getOutEdges().length + " out edges"); 253 s = "{...}"; 254 tooltip = null; 255//// } else if(false && (o instanceof EventHandler || o instanceof Stage)) { 256//// s = jsonEscape(v.getObject().toString()); 257 } else { 258 Stats stat = stats.get(o); 259 if (stat != null) { 260 long cnt = stat.messageCount.get(); 261 long lat = stat.sumLatency.get(); 262 tooltip = ",\"count\":" + cnt + ",\"latency\":\"" + (((double) lat) / (((double) cnt) * 1000000.0) + "\""); // quote the latency, since it might be nan 263 } else { 264 tooltip = null; 265 } 266 s = removeEnhancements(o.getClass().getSimpleName()); 267 } 268 if (s != null) { 269 off_vertex.put(v, vertices.size()); 270 vertices.add(v); 271 if (tooltip == null) { 272 nodes.add("{\"name\":\"" + jsonEscape(s) + "\"}"); 273 } else { 274 nodes.add("{\"name\":\"" + jsonEscape(s) + "\"" + tooltip + "}"); 275 } 276 277 } 278 } 279 sb.append(join(",\n", nodes)); 280 sb.append("],\n\"links\":["); 281 List<String> links = new ArrayList<>(); 282 for (Vertex<?> v : vertices) { 283 for (Vertex<?> w : v.getOutEdges()) { 284 LOG.fine("pointing object" + v.getObject()); 285 LOG.fine("pointed to object " + w.getObject()); 286 if (w.getObject() instanceof InjectionFuture) { 287 Vertex<?> futureTarget = getFuture((InjectionFuture<?>) w.getObject()); //futures.get(w.getObject()); 288 Integer off = off_vertex.get(futureTarget); 289 LOG.fine("future target " + futureTarget + " off = " + off); 290 if (off != null) { 291 links.add("{\"target\":" + off_vertex.get(v) + ",\"source\":" + off + ",\"value\":" + 1.0 + ",\"back\":true}"); 292 } 293 } else { 294 Integer off = off_vertex.get(w); 295 if (off != null) { 296 Stats s = stats.get(w.getObject()); 297 if (s != null) { 298 links.add("{\"source\":" + off_vertex.get(v) + ",\"target\":" + off + ",\"value\":" + (s.messageCount.get() + 3.0) + "}"); 299 } else { 300 links.add("{\"source\":" + off_vertex.get(v) + ",\"target\":" + off + ",\"value\":" + 1.0 + "}"); 301 } 302 } 303 } 304 } 305 } 306 sb.append(join(",\n", links)); 307 sb.append("]}"); 308 LOG.info("JSON: " + sb.toString()); 309 return sb.toString(); 310 } 311 312 private String removeEnhancements(String simpleName) { 313 return simpleName.replaceAll("\\$\\$.+$", ""); 314 } 315 316 private final class Stats { 317 AtomicLong messageCount = new AtomicLong(0); 318 AtomicLong sumLatency = new AtomicLong(0); 319 } 320 321}