This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.join;
020
021import org.apache.reef.wake.rx.Observer;
022import org.apache.reef.wake.rx.StaticObservable;
023
024import java.util.concurrent.ConcurrentSkipListSet;
025
026
027public class BlockingJoin implements StaticObservable {
028  private final Observer<TupleEvent> out;
029  private final ConcurrentSkipListSet<TupleEvent> left = new ConcurrentSkipListSet<>();
030  boolean leftDone = false;
031
032  public BlockingJoin(Observer<TupleEvent> out) {
033    this.out = out;
034  }
035
036  private synchronized void tellEveryoneLeftIsDone() {
037    leftDone = true;
038    notifyAll();
039  }
040
041  private synchronized void waitUntilLeftIsDone() {
042    while (!leftDone) {
043      try {
044        wait();
045      } catch (InterruptedException e) {
046        throw new IllegalStateException(
047            "No support for interrupted threads here!", e);
048      }
049    }
050  }
051
052  public Observer<TupleEvent> wireLeft() {
053    return new Observer<TupleEvent>() {
054
055      @Override
056      public void onNext(TupleEvent value) {
057        left.add(value);
058      }
059
060      @Override
061      public void onError(Exception error) {
062
063      }
064
065      @Override
066      public void onCompleted() {
067        tellEveryoneLeftIsDone();
068      }
069
070    };
071  }
072
073  public Observer<TupleEvent> wireRight() {
074    return new Observer<TupleEvent>() {
075
076      @Override
077      public void onNext(TupleEvent value) {
078        if (!leftDone) waitUntilLeftIsDone();
079        if (left.contains(value)) {
080          out.onNext(value);
081        }
082      }
083
084      @Override
085      public void onError(Exception error) {
086      }
087
088      @Override
089      public void onCompleted() {
090        waitUntilLeftIsDone();
091        out.onCompleted();
092      }
093    };
094  }
095}