This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.rx.impl;
020
021import org.apache.reef.wake.rx.Observer;
022import org.apache.reef.wake.rx.Subject;
023
024import javax.inject.Inject;
025
026/**
027 * A Subject that relays all messages to its subscribers.
028 *
029 * @param <T>
030 */
031public class SimpleSubject<T> implements Subject<T, T> {
032
033  private final Observer<T> observer;
034
035  /**
036   * Constructs a simple subject.
037   *
038   * @param observer the observer
039   */
040  @Inject
041  public SimpleSubject(final Observer<T> observer) {
042    this.observer = observer;
043  }
044
045  /**
046   * Provides the observer with the new value.
047   *
048   * @param value the new value
049   */
050  @Override
051  public void onNext(final T value) {
052    this.observer.onNext(value);
053  }
054
055  /**
056   * Provides the observer with the error.
057   *
058   * @param error the error
059   */
060  @Override
061  public void onError(final Exception error) {
062    this.observer.onError(error);
063  }
064
065  /**
066   * Provides the observer with it has finished sending push-based
067   * notifications.
068   */
069  @Override
070  public void onCompleted() {
071    this.observer.onCompleted();
072  }
073}