This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.rx.impl;
020
021import org.apache.reef.wake.rx.Observer;
022import org.apache.reef.wake.rx.Subject;
023
024import java.util.concurrent.TimeoutException;
025
026public class TimeoutSubject<T> implements Subject<T, T> {
027  private Thread timeBomb;
028  private Observer<T> destination;
029  private boolean finished;
030
031  public TimeoutSubject(final long timeout, final Observer<T> handler) {
032    this.finished = false;
033    this.destination = handler;
034    final TimeoutSubject<T> outer = this;
035    this.timeBomb = new Thread(new Runnable() {
036      @Override
037      public void run() {
038        boolean finishedCopy;
039        synchronized (outer) {
040          if (!finished) {
041            try {
042              outer.wait(timeout);
043            } catch (InterruptedException e) {
044              return;
045            }
046          }
047          finishedCopy = finished;
048          finished = true; // lock out the caller from putting event through now
049        }
050        if (!finishedCopy) destination.onError(new TimeoutException("TimeoutSubject expired"));
051      }
052    });
053    this.timeBomb.start();
054  }
055
056  @Override
057  public void onNext(T value) {
058    boolean wasFinished;
059    synchronized (this) {
060      wasFinished = finished;
061      if (!finished) {
062        this.notify();
063        finished = true;
064      }
065    }
066    if (!wasFinished) {
067      // TODO: change Subject to specify conversion to T
068      destination.onNext(value);
069      destination.onCompleted();
070    }
071  }
072
073  @Override
074  public void onError(Exception error) {
075    this.timeBomb.interrupt();
076    destination.onError(error);
077  }
078
079  @Override
080  public void onCompleted() {
081    throw new IllegalStateException("Should not be called directly");
082  }
083
084}