This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.join;
020
021import org.apache.reef.wake.rx.Observer;
022import org.apache.reef.wake.rx.StaticObservable;
023
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027/**
028 * Non-blocking Join.
029 */
030public class NonBlockingJoin implements StaticObservable {
031  private final AtomicBoolean leftDone = new AtomicBoolean(false);
032  private final AtomicBoolean completed = new AtomicBoolean(false);
033  private final AtomicBoolean sentCompleted = new AtomicBoolean(false);
034
035  private final Observer<TupleEvent> out;
036
037  private final ConcurrentSkipListSet<TupleEvent> leftTable = new ConcurrentSkipListSet<>();
038  private final ConcurrentSkipListSet<TupleEvent> rightTable = new ConcurrentSkipListSet<>();
039
040  public NonBlockingJoin(final Observer<TupleEvent> out) {
041    this.out = out;
042  }
043
044  private void drainRight() {
045    TupleEvent t;
046    if (leftDone.get()) {
047      while ((t = rightTable.pollFirst()) != null) {
048        if (leftTable.contains(t)) {
049          out.onNext(t);
050        }
051      }
052      if (completed.get()) {
053        // There cannot be any more additions to rightTable after
054        // completed is set to true, so this ensures that rightTable is
055        // really empty. (Someone could have inserted into it during the
056        // race between the previous while loop and the check of
057        // completed.)
058        while ((t = rightTable.pollFirst()) != null) {
059          if (leftTable.contains(t)) {
060            out.onNext(t);
061          }
062        }
063        if (!sentCompleted.getAndSet(true)) {
064          out.onCompleted();
065        }
066      }
067    }
068  }
069
070  public Observer<TupleEvent> wireLeft() {
071    return new Observer<TupleEvent>() {
072
073      @Override
074      public void onNext(final TupleEvent value) {
075        leftTable.add(value);
076      }
077
078      @Override
079      public void onError(final Exception error) {
080        leftTable.clear();
081        rightTable.clear();
082        out.onError(error);
083      }
084
085      @Override
086      public void onCompleted() {
087        leftDone.set(true);
088        drainRight();
089      }
090
091    };
092  }
093
094  public Observer<TupleEvent> wireRight() {
095    return new Observer<TupleEvent>() {
096
097      @Override
098      public void onNext(final TupleEvent value) {
099        if (leftTable.contains(value)) {
100          out.onNext(value);
101        } else if (!leftDone.get()) {
102          rightTable.add(value);
103        }
104        drainRight();
105      }
106
107      @Override
108      public void onError(final Exception error) {
109        leftTable.clear();
110        rightTable.clear();
111        out.onError(error);
112      }
113
114      @Override
115      public void onCompleted() {
116        completed.set(true);
117        drainRight();
118      }
119    };
120
121  }
122
123}