001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.profiler; 020 021import net.sf.cglib.proxy.Enhancer; 022import net.sf.cglib.proxy.MethodInterceptor; 023import net.sf.cglib.proxy.MethodProxy; 024import org.apache.reef.tang.Aspect; 025import org.apache.reef.tang.InjectionFuture; 026import org.apache.reef.tang.types.ConstructorDef; 027import org.apache.reef.tang.util.MonotonicHashMap; 028import org.apache.reef.tang.util.ReflectionUtilities; 029 030import java.lang.reflect.Constructor; 031import java.lang.reflect.InvocationTargetException; 032import java.lang.reflect.Method; 033import java.lang.reflect.Modifier; 034import java.util.*; 035import java.util.concurrent.atomic.AtomicLong; 036import java.util.logging.Logger; 037 038/** 039 * A graphical profiler class that instruments Tang-based Wake applications. 040 */ 041public class WakeProfiler implements Aspect { 042 private static final Logger LOG = Logger.getLogger(WakeProfiler.class.toString()); 043 private final Map<Object, Vertex<?>> vertexObject = new MonotonicHashMap<>(); 044 private final Map<InjectionFuture<?>, Object> futures = new MonotonicHashMap<>(); 045 private final Map<Object, Stats> stats = new MonotonicHashMap<>(); 046 047 @Override 048 public Aspect createChildAspect() { 049 return this; 050 } 051 052 @SuppressWarnings("unchecked") 053 private <T> Vertex<T> getVertex(final T t) { 054 if (t instanceof Set) { 055 return (Vertex<T>) newSetVertex((Set<?>) t); 056 } else { 057 Vertex<T> v = (Vertex<T>) vertexObject.get(t); 058 // Add dummy vertices for objects that were bound volatile. 059 if (v == null) { 060 v = new Vertex<>(t); 061 vertexObject.put(t, v); 062 } 063 return v; 064 } 065 } 066 067 @SuppressWarnings("unchecked") 068 private <T> Vertex<T> getFuture(final InjectionFuture<T> future) { 069 return getVertex((T) futures.get(future)); 070 } 071 072 @SuppressWarnings("unchecked") 073 private <T> Vertex<?> newSetVertex(final Set<T> s) { 074 if (vertexObject.containsKey(s)) { 075 return vertexObject.get(s); 076 } 077 if (s.size() > 1) { 078 LOG.fine("new set of size " + s.size()); 079 final Vertex<?>[] sArgs = new Vertex[s.size()]; 080 int k = 0; 081 for (final Object p : s) { 082 sArgs[k] = getVertex(p); 083 k++; 084 } 085 final Vertex<Set<T>> sv = new Vertex<>(s, null, sArgs); 086 vertexObject.put(s, sv); 087 return sv; 088 } else { 089 final Object p = s.iterator().next(); 090 final Vertex<?> w = getVertex(p); 091 // alias the singleton set to its member 092 vertexObject.put(s, w); 093 return w; 094 } 095 } 096 097 @SuppressWarnings("unchecked") 098 @Override 099 public <T> T inject(final ConstructorDef<T> constructorDef, final Constructor<T> constructor, final Object[] args) 100 throws InvocationTargetException, IllegalAccessException, IllegalArgumentException, InstantiationException { 101 final Vertex<?>[] vArgs = new Vertex[args.length]; 102 for (int i = 0; i < args.length; i++) { 103 final Object o = args[i]; 104 final Vertex<?> v = getVertex(o); 105 if (o instanceof Set) { 106 LOG.fine("Got a set arg for " + constructorDef + " length " + ((Set<?>) o).size()); 107 } 108 vArgs[i] = v; 109 } 110 111 T ret; 112 final Class<T> clazz = constructor.getDeclaringClass(); 113 boolean isEventHandler = false; 114 for (final Method m : clazz.getDeclaredMethods()) { 115 if (m.getName().equals("onNext")) { // XXX hack: Interpose on "event handler in spirit" 116 isEventHandler = true; 117 } 118 } 119 if (isEventHandler) { 120 try { 121 if (Modifier.isFinal(clazz.getDeclaredMethod("onNext", Object.class).getModifiers())) { 122 throw new Exception(ReflectionUtilities.getFullName(clazz) + ".onNext() is final; cannot intercept it"); 123 } 124 final Stats s = new Stats(); 125 final Enhancer e = new Enhancer(); 126 e.setSuperclass(clazz); 127 e.setCallback(new MethodInterceptor() { 128 129 @Override 130 public Object intercept(final Object object, final Method method, final Object[] args, 131 final MethodProxy methodProxy) throws Throwable { 132 133 if (method.getName().equals("onNext")) { 134 final long start = System.nanoTime(); 135 final Object o = methodProxy.invokeSuper(object, args); 136 final long stop = System.nanoTime(); 137 138 s.getMessageCount().incrementAndGet(); 139 s.getSumLatency().addAndGet(stop - start); 140 141 return o; 142 143 } else { 144 return methodProxy.invokeSuper(object, args); 145 } 146 } 147 }); 148 ret = (T) e.create(constructor.getParameterTypes(), args); 149 stats.put(ret, s); 150 } catch (final Exception e) { 151 LOG.warning("Wake profiler could not intercept event handler: " + e.getMessage()); 152 ret = constructor.newInstance(args); 153 } 154 } else { 155 ret = constructor.newInstance(args); 156 } 157 final Vertex<T> v = new Vertex<>(ret, constructorDef, vArgs); 158 vertexObject.put(ret, v); 159 return ret; 160 } 161 162 @Override 163 public <T> void injectionFutureInstantiated(final InjectionFuture<T> arg0, final T arg1) { 164 if (!futures.containsKey(arg0)) { 165 LOG.warning("adding future " + arg0 + " instance " + arg1); 166 futures.put(arg0, arg1); 167 getVertex(arg1); 168 } 169 } 170 171 private String jsonEscape(final String s) { 172 return s 173 .replaceAll("\\\\", "\\\\\\\\") 174 .replaceAll("\\\"", "\\\\\"") 175 .replaceAll("/", "\\\\/") 176 .replaceAll("\b", "\\\\b") 177 .replaceAll("\f", "\\\\f") 178 .replaceAll("\n", "\\\\n") 179 .replaceAll("\r", "\\\\r") 180 .replaceAll("\t", "\\\\t"); 181 182 } 183 184 private String join(final String sep, final List<String> tok) { 185 if (tok.size() == 0) { 186 return ""; 187 } 188 final StringBuffer sb = new StringBuffer(tok.get(0)); 189 for (int i = 1; i < tok.size(); i++) { 190 sb.append(sep + tok.get(i)); 191 } 192 return sb.toString(); 193 } 194 195 private boolean whitelist(final Object o) { 196 return true; 197 } 198 199 public String objectGraphToString() { 200 final List<Vertex<?>> vertices = new ArrayList<>(); 201 final Map<Vertex<?>, Integer> offVertex = new MonotonicHashMap<>(); 202 203 final StringBuffer sb = new StringBuffer("{\"nodes\":[\n"); 204 205 final List<String> nodes = new ArrayList<>(); 206 final LinkedList<Vertex<?>> workQueue = new LinkedList<>(); 207 for (final Object o : vertexObject.keySet()) { 208 if (whitelist(o)) { 209 workQueue.add(getVertex(o)); 210 } 211 } 212 for (final Object o : futures.values()) { 213 if (!vertexObject.containsKey(o) && whitelist(o)) { 214 workQueue.add(getVertex(o)); 215 } 216 } 217 while (!workQueue.isEmpty()) { 218 final Vertex<?> v = workQueue.removeFirst(); 219 LOG.warning("Work queue " + v); 220 221 final Object o = v.getObject(); 222 final String s; 223 final String tooltip; 224 if (o instanceof InjectionFuture) { 225 s = null; 226 tooltip = null; 227 } else if (o instanceof String) { 228 s = "\"" + ((String) o) + "\""; 229 tooltip = null; 230 } else if (o instanceof Number) { 231 s = o.toString(); 232 tooltip = null; 233 } else if (o instanceof Set) { 234 LOG.warning("Set of size " + ((Set<?>) o).size() + " with " + v.getOutEdges().length + " out edges"); 235 s = "{...}"; 236 tooltip = null; 237 } else { 238 final Stats stat = stats.get(o); 239 if (stat != null) { 240 final long cnt = stat.getMessageCount().get(); 241 final long lat = stat.getSumLatency().get(); 242 tooltip = ",\"count\":" + cnt + ",\"latency\":\"" + (((double) lat) / (((double) cnt) * 1000000.0) + "\""); 243 // quote the latency, since it might be nan 244 } else { 245 tooltip = null; 246 } 247 s = removeEnhancements(o.getClass().getSimpleName()); 248 } 249 if (s != null) { 250 offVertex.put(v, vertices.size()); 251 vertices.add(v); 252 if (tooltip == null) { 253 nodes.add("{\"name\":\"" + jsonEscape(s) + "\"}"); 254 } else { 255 nodes.add("{\"name\":\"" + jsonEscape(s) + "\"" + tooltip + "}"); 256 } 257 258 } 259 } 260 sb.append(join(",\n", nodes)); 261 sb.append("],\n\"links\":["); 262 final List<String> links = new ArrayList<>(); 263 for (final Vertex<?> v : vertices) { 264 for (final Vertex<?> w : v.getOutEdges()) { 265 LOG.fine("pointing object" + v.getObject()); 266 LOG.fine("pointed to object " + w.getObject()); 267 if (w.getObject() instanceof InjectionFuture) { 268 final Vertex<?> futureTarget = getFuture((InjectionFuture<?>) w.getObject()); //futures.get(w.getObject()); 269 final Integer off = offVertex.get(futureTarget); 270 LOG.fine("future target " + futureTarget + " off = " + off); 271 if (off != null) { 272 links.add("{\"target\":" + offVertex.get(v) + ",\"source\":" + off + ",\"value\":" + 1.0 + 273 ",\"back\":true}"); 274 } 275 } else { 276 final Integer off = offVertex.get(w); 277 if (off != null) { 278 final Stats s = stats.get(w.getObject()); 279 if (s != null) { 280 links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + 281 (s.getMessageCount().get() + 3.0) + "}"); 282 } else { 283 links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + 1.0 + "}"); 284 } 285 } 286 } 287 } 288 } 289 sb.append(join(",\n", links)); 290 sb.append("]}"); 291 LOG.info("JSON: " + sb.toString()); 292 return sb.toString(); 293 } 294 295 private String removeEnhancements(final String simpleName) { 296 return simpleName.replaceAll("\\$\\$.+$", ""); 297 } 298 299 private final class Stats { 300 private AtomicLong messageCount = new AtomicLong(0); 301 private AtomicLong sumLatency = new AtomicLong(0); 302 303 AtomicLong getMessageCount() { 304 return messageCount; 305 } 306 307 AtomicLong getSumLatency() { 308 return sumLatency; 309 } 310 } 311 312}