001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.join;
020
021import org.apache.reef.wake.rx.Observer;
022import org.apache.reef.wake.rx.StaticObservable;
023
024import java.util.concurrent.ConcurrentSkipListSet;
025
026/**
027 * Blocking join.
028 */
029public class BlockingJoin implements StaticObservable {
030  private final Observer<TupleEvent> out;
031  private final ConcurrentSkipListSet<TupleEvent> left = new ConcurrentSkipListSet<>();
032  private boolean leftDone = false;
033
034  public BlockingJoin(final Observer<TupleEvent> out) {
035    this.out = out;
036  }
037
038  private synchronized void tellEveryoneLeftIsDone() {
039    leftDone = true;
040    notifyAll();
041  }
042
043  private synchronized void waitUntilLeftIsDone() {
044    while (!leftDone) {
045      try {
046        wait();
047      } catch (final InterruptedException e) {
048        throw new IllegalStateException(
049            "No support for interrupted threads here!", e);
050      }
051    }
052  }
053
054  public Observer<TupleEvent> wireLeft() {
055    return new Observer<TupleEvent>() {
056
057      @Override
058      public void onNext(final TupleEvent value) {
059        left.add(value);
060      }
061
062      @Override
063      public void onError(final Exception error) {
064
065      }
066
067      @Override
068      public void onCompleted() {
069        tellEveryoneLeftIsDone();
070      }
071
072    };
073  }
074
075  public Observer<TupleEvent> wireRight() {
076    return new Observer<TupleEvent>() {
077
078      @Override
079      public void onNext(final TupleEvent value) {
080        if (!leftDone) {
081          waitUntilLeftIsDone();
082        }
083        if (left.contains(value)) {
084          out.onNext(value);
085        }
086      }
087
088      @Override
089      public void onError(final Exception error) {
090      }
091
092      @Override
093      public void onCompleted() {
094        waitUntilLeftIsDone();
095        out.onCompleted();
096      }
097    };
098  }
099}