001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.examples.join; 020 021import org.apache.reef.wake.rx.Observer; 022import org.apache.reef.wake.rx.StaticObservable; 023 024import java.util.concurrent.ConcurrentSkipListSet; 025import java.util.concurrent.atomic.AtomicBoolean; 026 027/** 028 * Non-blocking Join. 029 */ 030public class NonBlockingJoin implements StaticObservable { 031 private final AtomicBoolean leftDone = new AtomicBoolean(false); 032 private final AtomicBoolean completed = new AtomicBoolean(false); 033 private final AtomicBoolean sentCompleted = new AtomicBoolean(false); 034 035 private final Observer<TupleEvent> out; 036 037 private final ConcurrentSkipListSet<TupleEvent> leftTable = new ConcurrentSkipListSet<>(); 038 private final ConcurrentSkipListSet<TupleEvent> rightTable = new ConcurrentSkipListSet<>(); 039 040 public NonBlockingJoin(final Observer<TupleEvent> out) { 041 this.out = out; 042 } 043 044 private void drainRight() { 045 TupleEvent t; 046 if (leftDone.get()) { 047 while ((t = rightTable.pollFirst()) != null) { 048 if (leftTable.contains(t)) { 049 out.onNext(t); 050 } 051 } 052 if (completed.get()) { 053 // There cannot be any more additions to rightTable after 054 // completed is set to true, so this ensures that rightTable is 055 // really empty. (Someone could have inserted into it during the 056 // race between the previous while loop and the check of 057 // completed.) 058 while ((t = rightTable.pollFirst()) != null) { 059 if (leftTable.contains(t)) { 060 out.onNext(t); 061 } 062 } 063 if (!sentCompleted.getAndSet(true)) { 064 out.onCompleted(); 065 } 066 } 067 } 068 } 069 070 public Observer<TupleEvent> wireLeft() { 071 return new Observer<TupleEvent>() { 072 073 @Override 074 public void onNext(final TupleEvent value) { 075 leftTable.add(value); 076 } 077 078 @Override 079 public void onError(final Exception error) { 080 leftTable.clear(); 081 rightTable.clear(); 082 out.onError(error); 083 } 084 085 @Override 086 public void onCompleted() { 087 leftDone.set(true); 088 drainRight(); 089 } 090 091 }; 092 } 093 094 public Observer<TupleEvent> wireRight() { 095 return new Observer<TupleEvent>() { 096 097 @Override 098 public void onNext(final TupleEvent value) { 099 if (leftTable.contains(value)) { 100 out.onNext(value); 101 } else if (!leftDone.get()) { 102 rightTable.add(value); 103 } 104 drainRight(); 105 } 106 107 @Override 108 public void onError(final Exception error) { 109 leftTable.clear(); 110 rightTable.clear(); 111 out.onError(error); 112 } 113 114 @Override 115 public void onCompleted() { 116 completed.set(true); 117 drainRight(); 118 } 119 }; 120 121 } 122 123}