This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.join;
020
021import org.apache.reef.wake.rx.Observer;
022import org.apache.reef.wake.rx.StaticObservable;
023
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027
028public class NonBlockingJoin implements StaticObservable {
029  private final AtomicBoolean leftDone = new AtomicBoolean(false);
030  private final AtomicBoolean completed = new AtomicBoolean(false);
031  private final AtomicBoolean sentCompleted = new AtomicBoolean(false);
032
033  private final Observer<TupleEvent> out;
034
035  private final ConcurrentSkipListSet<TupleEvent> leftTable = new ConcurrentSkipListSet<TupleEvent>();
036  private final ConcurrentSkipListSet<TupleEvent> rightTable = new ConcurrentSkipListSet<TupleEvent>();
037
038  public NonBlockingJoin(Observer<TupleEvent> out) {
039    this.out = out;
040  }
041
042  private void drainRight() {
043    TupleEvent t;
044    if (leftDone.get()) {
045      while ((t = rightTable.pollFirst()) != null) {
046        if (leftTable.contains(t)) {
047          out.onNext(t);
048        }
049      }
050      if (completed.get()) {
051        // There cannot be any more additions to rightTable after
052        // completed is set to true, so this ensures that rightTable is
053        // really empty. (Someone could have inserted into it during the
054        // race between the previous while loop and the check of
055        // completed.)
056        while ((t = rightTable.pollFirst()) != null) {
057          if (leftTable.contains(t)) {
058            out.onNext(t);
059          }
060        }
061        if (sentCompleted.getAndSet(true) == false) {
062          out.onCompleted();
063        }
064      }
065    }
066  }
067
068  public Observer<TupleEvent> wireLeft() {
069    return new Observer<TupleEvent>() {
070
071      @Override
072      public void onNext(TupleEvent value) {
073        leftTable.add(value);
074      }
075
076      @Override
077      public void onError(Exception error) {
078        leftTable.clear();
079        rightTable.clear();
080        out.onError(error);
081      }
082
083      @Override
084      public void onCompleted() {
085        leftDone.set(true);
086        drainRight();
087      }
088
089    };
090  }
091
092  public Observer<TupleEvent> wireRight() {
093    return new Observer<TupleEvent>() {
094
095      @Override
096      public void onNext(TupleEvent value) {
097        if (leftTable.contains(value)) {
098          out.onNext(value);
099        } else if (!leftDone.get()) {
100          rightTable.add(value);
101        }
102        drainRight();
103      }
104
105      @Override
106      public void onError(Exception error) {
107        leftTable.clear();
108        rightTable.clear();
109        out.onError(error);
110      }
111
112      @Override
113      public void onCompleted() {
114        completed.set(true);
115        drainRight();
116      }
117    };
118
119  }
120
121}