This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.profiler;
020
021import net.sf.cglib.proxy.Enhancer;
022import net.sf.cglib.proxy.MethodInterceptor;
023import net.sf.cglib.proxy.MethodProxy;
024import org.apache.reef.tang.Aspect;
025import org.apache.reef.tang.InjectionFuture;
026import org.apache.reef.tang.types.ConstructorDef;
027import org.apache.reef.tang.util.MonotonicHashMap;
028import org.apache.reef.tang.util.ReflectionUtilities;
029
030import java.lang.reflect.Constructor;
031import java.lang.reflect.InvocationTargetException;
032import java.lang.reflect.Method;
033import java.lang.reflect.Modifier;
034import java.util.*;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.logging.Logger;
037
038/**
039 * A graphical profiler class that instruments Tang-based Wake applications.
040 */
041public class WakeProfiler implements Aspect {
042  private static final Logger LOG = Logger.getLogger(WakeProfiler.class.toString());
043  private final Map<Object, Vertex<?>> vertexObject = new MonotonicHashMap<>();
044  private final Map<InjectionFuture<?>, Object> futures = new MonotonicHashMap<>();
045  private final Map<Object, Stats> stats = new MonotonicHashMap<>();
046
047  @Override
048  public Aspect createChildAspect() {
049    return this;
050  }
051
052  @SuppressWarnings("unchecked")
053  private <T> Vertex<T> getVertex(final T t) {
054    if (t instanceof Set) {
055      return (Vertex<T>) newSetVertex((Set<?>) t);
056    } else {
057      Vertex<T> v = (Vertex<T>) vertexObject.get(t);
058      // Add dummy vertices for objects that were bound volatile.
059      if (v == null) {
060        v = new Vertex<>(t);
061        vertexObject.put(t, v);
062      }
063      return v;
064    }
065  }
066
067  @SuppressWarnings("unchecked")
068  private <T> Vertex<T> getFuture(final InjectionFuture<T> future) {
069    return getVertex((T) futures.get(future));
070  }
071
072  @SuppressWarnings("unchecked")
073  private <T> Vertex<?> newSetVertex(final Set<T> s) {
074    if (vertexObject.containsKey(s)) {
075      return vertexObject.get(s);
076    }
077    if (s.size() > 1) {
078      LOG.fine("new set of size " + s.size());
079      final Vertex<?>[] sArgs = new Vertex[s.size()];
080      int k = 0;
081      for (final Object p : s) {
082        sArgs[k] = getVertex(p);
083        k++;
084      }
085      final Vertex<Set<T>> sv = new Vertex<>(s, null, sArgs);
086      vertexObject.put(s, sv);
087      return sv;
088    } else {
089      final Object p = s.iterator().next();
090      final Vertex<?> w = getVertex(p);
091      // alias the singleton set to its member
092      vertexObject.put(s, w);
093      return w;
094    }
095  }
096
097  @SuppressWarnings("unchecked")
098  @Override
099  public <T> T inject(final ConstructorDef<T> constructorDef, final Constructor<T> constructor, final Object[] args)
100      throws InvocationTargetException, IllegalAccessException, IllegalArgumentException, InstantiationException {
101    final Vertex<?>[] vArgs = new Vertex[args.length];
102    for (int i = 0; i < args.length; i++) {
103      final Object o = args[i];
104      final Vertex<?> v = getVertex(o);
105      if (o instanceof Set) {
106        LOG.fine("Got a set arg for " + constructorDef + " length " + ((Set<?>) o).size());
107      }
108      vArgs[i] = v;
109    }
110
111    T ret;
112    final Class<T> clazz = constructor.getDeclaringClass();
113    boolean isEventHandler = false;
114    for (final Method m : clazz.getDeclaredMethods()) {
115      if (m.getName().equals("onNext")) { // XXX hack: Interpose on "event handler in spirit"
116        isEventHandler = true;
117      }
118    }
119    if (isEventHandler) {
120      try {
121        if (Modifier.isFinal(clazz.getDeclaredMethod("onNext", Object.class).getModifiers())) {
122          throw new Exception(ReflectionUtilities.getFullName(clazz) + ".onNext() is final; cannot intercept it");
123        }
124        final Stats s = new Stats();
125        final Enhancer e = new Enhancer();
126        e.setSuperclass(clazz);
127        e.setCallback(new MethodInterceptor() {
128
129          @Override
130          public Object intercept(final Object object, final Method method, final Object[] args,
131                                  final MethodProxy methodProxy) throws Throwable {
132
133            if (method.getName().equals("onNext")) {
134              final long start = System.nanoTime();
135              final Object o = methodProxy.invokeSuper(object, args);
136              final long stop = System.nanoTime();
137
138              s.getMessageCount().incrementAndGet();
139              s.getSumLatency().addAndGet(stop - start);
140
141              return o;
142
143            } else {
144              return methodProxy.invokeSuper(object, args);
145            }
146          }
147        });
148        ret = (T) e.create(constructor.getParameterTypes(), args);
149        stats.put(ret, s);
150      } catch (final Exception e) {
151        LOG.warning("Wake profiler could not intercept event handler: " + e.getMessage());
152        ret = constructor.newInstance(args);
153      }
154    } else {
155      ret = constructor.newInstance(args);
156    }
157    final Vertex<T> v = new Vertex<>(ret, constructorDef, vArgs);
158    vertexObject.put(ret, v);
159    return ret;
160  }
161
162  @Override
163  public <T> void injectionFutureInstantiated(final InjectionFuture<T> arg0, final T arg1) {
164    if (!futures.containsKey(arg0)) {
165      LOG.warning("adding future " + arg0 + " instance " + arg1);
166      futures.put(arg0, arg1);
167      getVertex(arg1);
168    }
169  }
170
171  private String jsonEscape(final String s) {
172    return s
173        .replaceAll("\\\\", "\\\\\\\\")
174        .replaceAll("\\\"", "\\\\\"")
175        .replaceAll("/", "\\\\/")
176        .replaceAll("\b", "\\\\b")
177        .replaceAll("\f", "\\\\f")
178        .replaceAll("\n", "\\\\n")
179        .replaceAll("\r", "\\\\r")
180        .replaceAll("\t", "\\\\t");
181
182  }
183
184  private String join(final String sep, final List<String> tok) {
185    if (tok.size() == 0) {
186      return "";
187    }
188    final StringBuffer sb = new StringBuffer(tok.get(0));
189    for (int i = 1; i < tok.size(); i++) {
190      sb.append(sep + tok.get(i));
191    }
192    return sb.toString();
193  }
194
195  private boolean whitelist(final Object o) {
196    return true;
197  }
198
199  public String objectGraphToString() {
200    final List<Vertex<?>> vertices = new ArrayList<>();
201    final Map<Vertex<?>, Integer> offVertex = new MonotonicHashMap<>();
202
203    final StringBuffer sb = new StringBuffer("{\"nodes\":[\n");
204
205    final List<String> nodes = new ArrayList<>();
206    final LinkedList<Vertex<?>> workQueue = new LinkedList<>();
207    for (final Object o : vertexObject.keySet()) {
208      if (whitelist(o)) {
209        workQueue.add(getVertex(o));
210      }
211    }
212    for (final Object o : futures.values()) {
213      if (!vertexObject.containsKey(o) && whitelist(o)) {
214        workQueue.add(getVertex(o));
215      }
216    }
217    while (!workQueue.isEmpty()) {
218      final Vertex<?> v = workQueue.removeFirst();
219      LOG.warning("Work queue " + v);
220
221      final Object o = v.getObject();
222      final String s;
223      final String tooltip;
224      if (o instanceof InjectionFuture) {
225        s = null;
226        tooltip = null;
227      } else if (o instanceof String) {
228        s = "\"" + ((String) o) + "\"";
229        tooltip = null;
230      } else if (o instanceof Number) {
231        s = o.toString();
232        tooltip = null;
233      } else if (o instanceof Set) {
234        LOG.warning("Set of size " + ((Set<?>) o).size() + " with " + v.getOutEdges().length + " out edges");
235        s = "{...}";
236        tooltip = null;
237      } else {
238        final Stats stat = stats.get(o);
239        if (stat != null) {
240          final long cnt = stat.getMessageCount().get();
241          final long lat = stat.getSumLatency().get();
242          tooltip = ",\"count\":" + cnt + ",\"latency\":\"" + (((double) lat) / (((double) cnt) * 1000000.0) + "\"");
243          // quote the latency, since it might be nan
244        } else {
245          tooltip = null;
246        }
247        s = removeEnhancements(o.getClass().getSimpleName());
248      }
249      if (s != null) {
250        offVertex.put(v, vertices.size());
251        vertices.add(v);
252        if (tooltip == null) {
253          nodes.add("{\"name\":\"" + jsonEscape(s) + "\"}");
254        } else {
255          nodes.add("{\"name\":\"" + jsonEscape(s) + "\"" + tooltip + "}");
256        }
257
258      }
259    }
260    sb.append(join(",\n", nodes));
261    sb.append("],\n\"links\":[");
262    final List<String> links = new ArrayList<>();
263    for (final Vertex<?> v : vertices) {
264      for (final Vertex<?> w : v.getOutEdges()) {
265        LOG.fine("pointing object" + v.getObject());
266        LOG.fine("pointed to object " + w.getObject());
267        if (w.getObject() instanceof InjectionFuture) {
268          final Vertex<?> futureTarget = getFuture((InjectionFuture<?>) w.getObject()); //futures.get(w.getObject());
269          final Integer off = offVertex.get(futureTarget);
270          LOG.fine("future target " + futureTarget + " off = " + off);
271          if (off != null) {
272            links.add("{\"target\":" + offVertex.get(v) + ",\"source\":" + off + ",\"value\":" + 1.0 +
273                ",\"back\":true}");
274          }
275        } else {
276          final Integer off = offVertex.get(w);
277          if (off != null) {
278            final Stats s = stats.get(w.getObject());
279            if (s != null) {
280              links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" +
281                  (s.getMessageCount().get() + 3.0) + "}");
282            } else {
283              links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + 1.0 + "}");
284            }
285          }
286        }
287      }
288    }
289    sb.append(join(",\n", links));
290    sb.append("]}");
291    LOG.info("JSON: " + sb.toString());
292    return sb.toString();
293  }
294
295  private String removeEnhancements(final String simpleName) {
296    return simpleName.replaceAll("\\$\\$.+$", "");
297  }
298
299  private final class Stats {
300    private AtomicLong messageCount = new AtomicLong(0);
301    private AtomicLong sumLatency = new AtomicLong(0);
302
303    AtomicLong getMessageCount() {
304      return messageCount;
305    }
306
307    AtomicLong getSumLatency() {
308      return sumLatency;
309    }
310  }
311
312}