001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.accumulate;
020
021
022import org.apache.reef.wake.Stage;
023import org.apache.reef.wake.rx.Observer;
024
025import java.util.Map;
026import java.util.concurrent.ConcurrentSkipListMap;
027
028/**
029 * key-value pair Combiner stage.
030 *
031 * @param <K> key
032 * @param <V> value
033 */
034public class CombinerStage<K extends Comparable<K>, V> implements Stage {
035
036  private final Combiner<K, V> c;
037  private final Observer<Map.Entry<K, V>> o;
038  private final OutputThread worker = new OutputThread();
039  private final ConcurrentSkipListMap<K, V> register = new ConcurrentSkipListMap<>();
040  private volatile boolean done = false;
041
042  public CombinerStage(final Combiner<K, V> c, final Observer<Map.Entry<K, V>> o) {
043    this.c = c;
044    this.o = o;
045    worker.start();
046  }
047
048  public Observer<Map.Entry<K, V>> wireIn() {
049    return new Observer<Map.Entry<K, V>>() {
050      @Override
051      public void onNext(final Map.Entry<K, V> pair) {
052        V old;
053        V newVal;
054        final boolean wasEmpty = register.isEmpty();
055        boolean succ = false;
056
057        while (!succ) {
058          old = register.get(pair.getKey());
059          newVal = c.combine(pair.getKey(), old, pair.getValue());
060          if (old == null) {
061            succ = null == register.putIfAbsent(pair.getKey(), newVal);
062          } else {
063            succ = register.replace(pair.getKey(), old, newVal);
064          }
065        }
066
067        if (wasEmpty) {
068          synchronized (register) {
069            register.notify();
070          }
071        }
072      }
073
074      @Override
075      public void onError(final Exception error) {
076        o.onError(error);
077      }
078
079      @Override
080      public void onCompleted() {
081        synchronized (register) {
082          done = true;
083          if (register.isEmpty()) {
084            register.notify();
085          }
086        }
087      }
088    };
089  }
090
091  @Override
092  public void close() throws Exception {
093    worker.join();
094  }
095
096  /**
097   * key-value pair Combiner Interface.
098   *
099   * @param <K> key
100   * @param <V> value
101   */
102  public interface Combiner<K extends Comparable<K>, V> {
103    V combine(K key, V old, V cur);
104  }
105
106  /**
107   * A comparable key-value pair.
108   *
109   * @param <K> key
110   * @param <V> value
111   */
112  public static class Pair<K extends Comparable<K>, V> implements Map.Entry<K, V>, Comparable<Map.Entry<K, V>> {
113    private final K k;
114    private final V v;
115
116    public Pair(final K k, final V v) {
117      this.k = k;
118      this.v = v;
119    }
120
121    @Override
122    public boolean equals(final Object o) {
123      if (this == o) {
124        return true;
125      }
126      if (o == null || getClass() != o.getClass()) {
127        return false;
128      }
129
130      Pair<K, V> pair = (Pair<K, V>) o;
131      return k.compareTo(pair.getKey()) == 0;
132    }
133
134    @Override
135    public int hashCode() {
136      return k.hashCode();
137    }
138
139    @Override
140    public int compareTo(final Map.Entry<K, V> arg0) {
141      return k.compareTo(arg0.getKey());
142    }
143
144    @Override
145    public K getKey() {
146      return k;
147    }
148
149    @Override
150    public V getValue() {
151      return v;
152    }
153
154    @Override
155    public V setValue(final V value) {
156      throw new UnsupportedOperationException();
157    }
158  }
159
160  private class OutputThread extends Thread {
161    OutputThread() {
162      super("grouper-output-thread");
163    }
164
165    @Override
166    public void run() {
167      while (true) {
168        if (register.isEmpty()) {
169          synchronized (register) {
170            while (register.isEmpty() && !done) {
171              try {
172                register.wait();
173              } catch (final InterruptedException e) {
174                throw new IllegalStateException(e);
175              }
176            }
177            if (done) {
178              break;
179            }
180          }
181        }
182        Map.Entry<K, V> cursor = register.pollFirstEntry();
183        while (cursor != null) {
184          o.onNext(cursor);
185          final K nextKey = register.higherKey(cursor.getKey());
186
187          /* If there is more than one OutputThread worker then the remove() -> null case
188           * must be handled
189           */
190          cursor = (nextKey == null) ? null : new Pair<>(nextKey, register.remove(nextKey));
191        }
192      }
193      o.onCompleted();
194    }
195  }
196
197}