001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.examples.accumulate; 020 021 022import org.apache.reef.wake.Stage; 023import org.apache.reef.wake.rx.Observer; 024 025import java.util.Map; 026import java.util.concurrent.ConcurrentSkipListMap; 027 028/** 029 * key-value pair Combiner stage. 030 * 031 * @param <K> key 032 * @param <V> value 033 */ 034public class CombinerStage<K extends Comparable<K>, V> implements Stage { 035 036 private final Combiner<K, V> c; 037 private final Observer<Map.Entry<K, V>> o; 038 private final OutputThread worker = new OutputThread(); 039 private final ConcurrentSkipListMap<K, V> register = new ConcurrentSkipListMap<>(); 040 private volatile boolean done = false; 041 042 public CombinerStage(final Combiner<K, V> c, final Observer<Map.Entry<K, V>> o) { 043 this.c = c; 044 this.o = o; 045 worker.start(); 046 } 047 048 public Observer<Map.Entry<K, V>> wireIn() { 049 return new Observer<Map.Entry<K, V>>() { 050 @Override 051 public void onNext(final Map.Entry<K, V> pair) { 052 V old; 053 V newVal; 054 final boolean wasEmpty = register.isEmpty(); 055 boolean succ = false; 056 057 while (!succ) { 058 old = register.get(pair.getKey()); 059 newVal = c.combine(pair.getKey(), old, pair.getValue()); 060 if (old == null) { 061 succ = null == register.putIfAbsent(pair.getKey(), newVal); 062 } else { 063 succ = register.replace(pair.getKey(), old, newVal); 064 } 065 } 066 067 if (wasEmpty) { 068 synchronized (register) { 069 register.notify(); 070 } 071 } 072 } 073 074 @Override 075 public void onError(final Exception error) { 076 o.onError(error); 077 } 078 079 @Override 080 public void onCompleted() { 081 synchronized (register) { 082 done = true; 083 if (register.isEmpty()) { 084 register.notify(); 085 } 086 } 087 } 088 }; 089 } 090 091 @Override 092 public void close() throws Exception { 093 worker.join(); 094 } 095 096 /** 097 * key-value pair Combiner Interface. 098 * 099 * @param <K> key 100 * @param <V> value 101 */ 102 public interface Combiner<K extends Comparable<K>, V> { 103 V combine(K key, V old, V cur); 104 } 105 106 /** 107 * A comparable key-value pair. 108 * 109 * @param <K> key 110 * @param <V> value 111 */ 112 public static class Pair<K extends Comparable<K>, V> implements Map.Entry<K, V>, Comparable<Map.Entry<K, V>> { 113 private final K k; 114 private final V v; 115 116 public Pair(final K k, final V v) { 117 this.k = k; 118 this.v = v; 119 } 120 121 @Override 122 public boolean equals(final Object o) { 123 if (this == o) { 124 return true; 125 } 126 if (o == null || getClass() != o.getClass()) { 127 return false; 128 } 129 130 Pair<K, V> pair = (Pair<K, V>) o; 131 return k.compareTo(pair.getKey()) == 0; 132 } 133 134 @Override 135 public int hashCode() { 136 return k.hashCode(); 137 } 138 139 @Override 140 public int compareTo(final Map.Entry<K, V> arg0) { 141 return k.compareTo(arg0.getKey()); 142 } 143 144 @Override 145 public K getKey() { 146 return k; 147 } 148 149 @Override 150 public V getValue() { 151 return v; 152 } 153 154 @Override 155 public V setValue(final V value) { 156 throw new UnsupportedOperationException(); 157 } 158 } 159 160 private class OutputThread extends Thread { 161 OutputThread() { 162 super("grouper-output-thread"); 163 } 164 165 @Override 166 public void run() { 167 while (true) { 168 if (register.isEmpty()) { 169 synchronized (register) { 170 while (register.isEmpty() && !done) { 171 try { 172 register.wait(); 173 } catch (final InterruptedException e) { 174 throw new IllegalStateException(e); 175 } 176 } 177 if (done) { 178 break; 179 } 180 } 181 } 182 Map.Entry<K, V> cursor = register.pollFirstEntry(); 183 while (cursor != null) { 184 o.onNext(cursor); 185 final K nextKey = register.higherKey(cursor.getKey()); 186 187 /* If there is more than one OutputThread worker then the remove() -> null case 188 * must be handled 189 */ 190 cursor = (nextKey == null) ? null : new Pair<>(nextKey, register.remove(nextKey)); 191 } 192 } 193 o.onCompleted(); 194 } 195 } 196 197}