This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.profiler;
020
021import net.sf.cglib.proxy.Enhancer;
022import net.sf.cglib.proxy.MethodInterceptor;
023import net.sf.cglib.proxy.MethodProxy;
024import org.apache.reef.tang.Aspect;
025import org.apache.reef.tang.InjectionFuture;
026import org.apache.reef.tang.types.ConstructorDef;
027import org.apache.reef.tang.util.MonotonicHashMap;
028import org.apache.reef.tang.util.ReflectionUtilities;
029import org.apache.reef.wake.EventHandler;
030import org.apache.reef.wake.Stage;
031import org.apache.reef.wake.rx.Observable;
032import org.apache.reef.wake.rx.Observer;
033import org.apache.reef.wake.rx.RxStage;
034
035import java.lang.reflect.Constructor;
036import java.lang.reflect.InvocationTargetException;
037import java.lang.reflect.Method;
038import java.lang.reflect.Modifier;
039import java.util.*;
040import java.util.concurrent.atomic.AtomicLong;
041import java.util.logging.Logger;
042
043public class WakeProfiler implements Aspect {
044  private final static Logger LOG = Logger.getLogger(WakeProfiler.class.toString());
045  private final Map<Object, Vertex<?>> vertex_object = new MonotonicHashMap<>();
046  private final Map<InjectionFuture<?>, Object> futures = new MonotonicHashMap<>();
047  private final Map<Object, Stats> stats = new MonotonicHashMap<>();
048
049  @Override
050  public Aspect createChildAspect() {
051    return this;
052  }
053
054  @SuppressWarnings("unchecked")
055  private <T> Vertex<T> getVertex(T t) {
056    if (t instanceof Set) {
057      return (Vertex<T>) newSetVertex((Set<?>) t);
058    } else {
059      Vertex<T> v = (Vertex<T>) vertex_object.get(t);
060      // Add dummy vertices for objects that were bound volatile.
061      if (v == null) {
062        v = new Vertex<>(t);
063        vertex_object.put(t, v);
064      }
065      return v;
066    }
067  }
068
069  @SuppressWarnings("unchecked")
070  private <T> Vertex<T> getFuture(InjectionFuture<T> future) {
071    return getVertex((T) futures.get(future));
072  }
073
074  @SuppressWarnings("unchecked")
075  private <T> Vertex<?> newSetVertex(Set<T> s) {
076    if (vertex_object.containsKey(s)) {
077      return (Vertex<Set<T>>) vertex_object.get(s);
078    }
079    if (s.size() > -1) {
080      LOG.fine("new set of size " + s.size());
081      Vertex<?>[] s_args = new Vertex[s.size()];
082      int k = 0;
083      for (Object p : s) {
084        s_args[k] = getVertex(p);
085        k++;
086      }
087      Vertex<Set<T>> sv = new Vertex<>(s, null, s_args);
088      vertex_object.put(s, sv);
089      return sv;
090//    } else if(s.size() == 1) {
091    } else {
092      Object p = s.iterator().next();
093      Vertex<?> w = getVertex(p);
094      // alias the singleton set to its member
095      vertex_object.put(s, w);
096      return w;
097    }
098//    } else {
099//    // ignore the empty set.
100//  } */
101  }
102
103  @SuppressWarnings("unchecked")
104  @Override
105  public <T> T inject(ConstructorDef<T> constructorDef, Constructor<T> constructor, Object[] args) throws InvocationTargetException, IllegalAccessException, IllegalArgumentException, InstantiationException {
106//    LOG.info("inject" + constructor + "->" + args.length);
107    Vertex<?>[] v_args = new Vertex[args.length];
108    for (int i = 0; i < args.length; i++) {
109      Object o = args[i];
110      Vertex<?> v = getVertex(o);
111      if (o instanceof Set) {
112        LOG.fine("Got a set arg for " + constructorDef + " length " + ((Set<?>) o).size());
113      } else if (o instanceof InjectionFuture) {
114//        LOG.info("Got injection future arg. "->" + o);
115      }
116      v_args[i] = v;
117    }
118
119    T ret;
120    final Class<T> clazz = constructor.getDeclaringClass();
121    boolean isEventHandler = false;
122    for (Method m : clazz.getDeclaredMethods()) {
123      if (m.getName().equals("onNext")) { // XXX hack: Interpose on "event handler in spirit"
124        isEventHandler = true;
125      }
126    }
127    if (isEventHandler) {
128      try {
129        if (Modifier.isFinal(clazz.getDeclaredMethod("onNext", Object.class).getModifiers())) {
130          throw new Exception(ReflectionUtilities.getFullName(clazz) + ".onNext() is final; cannot intercept it");
131        }
132        final Stats s = new Stats();
133        Enhancer e = new Enhancer();
134        e.setSuperclass(clazz);
135        e.setCallback(new MethodInterceptor() {
136
137          @Override
138          public Object intercept(Object object, Method method, Object[] args,
139                                  MethodProxy methodProxy) throws Throwable {
140
141            if (method.getName().equals("onNext")) {
142              long start = System.nanoTime();
143//              LOG.info(object + "." + method.getName() + " called");
144              Object o = methodProxy.invokeSuper(object, args);
145              long stop = System.nanoTime();
146
147              s.messageCount.incrementAndGet();
148              s.sumLatency.addAndGet(stop - start);
149
150              return o;
151
152            } else {
153              return methodProxy.invokeSuper(object, args);
154            }
155          }
156        });
157        ret = (T) e.create(constructor.getParameterTypes(), args);
158        stats.put(ret, s);
159      } catch (Exception e) {
160        LOG.warning("Wake profiler could not intercept event handler: " + e.getMessage());
161        ret = constructor.newInstance(args);
162      }
163    } else {
164      ret = constructor.newInstance(args);
165    }
166    Vertex<T> v = new Vertex<T>(ret, constructorDef, v_args);
167    vertex_object.put(ret, v);
168    return ret;
169  }
170
171  @Override
172  public <T> void injectionFutureInstantiated(InjectionFuture<T> arg0, T arg1) {
173    if (!futures.containsKey(arg0)) {
174      LOG.warning("adding future " + arg0 + " instance " + arg1);
175      futures.put(arg0, arg1);
176      getVertex(arg1);
177    }
178  }
179
180  private String jsonEscape(String s) {
181    return s
182        .replaceAll("\\\\", "\\\\\\\\")
183        .replaceAll("\\\"", "\\\\\"")
184        .replaceAll("/", "\\\\/")
185        .replaceAll("\b", "\\\\b")
186        .replaceAll("\f", "\\\\f")
187        .replaceAll("\n", "\\\\n")
188        .replaceAll("\r", "\\\\r")
189        .replaceAll("\t", "\\\\t");
190
191  }
192
193  private String join(String sep, List<String> tok) {
194    if (tok.size() == 0) {
195      return "";
196    }
197    StringBuffer sb = new StringBuffer(tok.get(0));
198    for (int i = 1; i < tok.size(); i++) {
199      sb.append(sep + tok.get(i));
200    }
201    return sb.toString();
202  }
203
204  private boolean whitelist(Object o) {
205    return (true
206        || (o instanceof InjectionFuture)
207        || (o instanceof Set)
208        || (o instanceof EventHandler)
209        || (o instanceof Stage)
210        || (o instanceof RxStage)
211        || (o instanceof Observer)
212        || (o instanceof Observable))
213//        && !(o instanceof Set)
214        ;
215  }
216
217  public String objectGraphToString() {
218    List<Vertex<?>> vertices = new ArrayList<>();
219    Map<Vertex<?>, Integer> off_vertex = new MonotonicHashMap<>();
220
221    StringBuffer sb = new StringBuffer("{\"nodes\":[\n");
222
223    List<String> nodes = new ArrayList<String>();
224    LinkedList<Vertex<?>> workQueue = new LinkedList<>();
225    for (Object o : vertex_object.keySet()) {
226      if (whitelist(o)) {
227        workQueue.add(getVertex(o));
228      }
229    }
230    for (Object o : futures.values()) {
231      if ((!vertex_object.containsKey(o)) && whitelist(o)) {
232        workQueue.add(getVertex(o));
233      }
234    }
235    while (!workQueue.isEmpty()) {
236      Vertex<?> v = workQueue.removeFirst();
237      LOG.warning("Work queue " + v);
238
239      Object o = v.getObject();
240      final String s;
241      final String tooltip;
242      if (o instanceof InjectionFuture) {
243        s = null;
244        tooltip = null;
245      } else if (o instanceof String) {
246        s = "\"" + ((String) o) + "\"";
247        tooltip = null;
248      } else if (o instanceof Number) {
249        s = o.toString();
250        tooltip = null;
251      } else if (o instanceof Set) {
252        LOG.warning("Set of size " + ((Set<?>) o).size() + " with " + v.getOutEdges().length + " out edges");
253        s = "{...}";
254        tooltip = null;
255////      } else if(false && (o instanceof EventHandler || o instanceof Stage)) {
256////        s = jsonEscape(v.getObject().toString());
257      } else {
258        Stats stat = stats.get(o);
259        if (stat != null) {
260          long cnt = stat.messageCount.get();
261          long lat = stat.sumLatency.get();
262          tooltip = ",\"count\":" + cnt + ",\"latency\":\"" + (((double) lat) / (((double) cnt) * 1000000.0) + "\""); // quote the latency, since it might be nan
263        } else {
264          tooltip = null;
265        }
266        s = removeEnhancements(o.getClass().getSimpleName());
267      }
268      if (s != null) {
269        off_vertex.put(v, vertices.size());
270        vertices.add(v);
271        if (tooltip == null) {
272          nodes.add("{\"name\":\"" + jsonEscape(s) + "\"}");
273        } else {
274          nodes.add("{\"name\":\"" + jsonEscape(s) + "\"" + tooltip + "}");
275        }
276
277      }
278    }
279    sb.append(join(",\n", nodes));
280    sb.append("],\n\"links\":[");
281    List<String> links = new ArrayList<>();
282    for (Vertex<?> v : vertices) {
283      for (Vertex<?> w : v.getOutEdges()) {
284        LOG.fine("pointing object" + v.getObject());
285        LOG.fine("pointed to object " + w.getObject());
286        if (w.getObject() instanceof InjectionFuture) {
287          Vertex<?> futureTarget = getFuture((InjectionFuture<?>) w.getObject()); //futures.get(w.getObject());
288          Integer off = off_vertex.get(futureTarget);
289          LOG.fine("future target " + futureTarget + " off = " + off);
290          if (off != null) {
291            links.add("{\"target\":" + off_vertex.get(v) + ",\"source\":" + off + ",\"value\":" + 1.0 + ",\"back\":true}");
292          }
293        } else {
294          Integer off = off_vertex.get(w);
295          if (off != null) {
296            Stats s = stats.get(w.getObject());
297            if (s != null) {
298              links.add("{\"source\":" + off_vertex.get(v) + ",\"target\":" + off + ",\"value\":" + (s.messageCount.get() + 3.0) + "}");
299            } else {
300              links.add("{\"source\":" + off_vertex.get(v) + ",\"target\":" + off + ",\"value\":" + 1.0 + "}");
301            }
302          }
303        }
304      }
305    }
306    sb.append(join(",\n", links));
307    sb.append("]}");
308    LOG.info("JSON: " + sb.toString());
309    return sb.toString();
310  }
311
312  private String removeEnhancements(String simpleName) {
313    return simpleName.replaceAll("\\$\\$.+$", "");
314  }
315
316  private final class Stats {
317    AtomicLong messageCount = new AtomicLong(0);
318    AtomicLong sumLatency = new AtomicLong(0);
319  }
320
321}