This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.rx.impl;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.wake.StageConfiguration.StageName;
023import org.apache.reef.wake.StageConfiguration.StageObserver;
024import org.apache.reef.wake.impl.StageManager;
025import org.apache.reef.wake.rx.AbstractRxStage;
026import org.apache.reef.wake.rx.Observer;
027
028import javax.inject.Inject;
029
030/**
031 * Stage that executes the observer synchronously.
032 *
033 * @param <T> type
034 */
035public final class RxSyncStage<T> extends AbstractRxStage<T> {
036
037  private final Observer<T> observer;
038
039  /**
040   * Constructs a Rx synchronous stage.
041   *
042   * @param observer the observer
043   */
044  @Inject
045  public RxSyncStage(@Parameter(StageObserver.class) final Observer<T> observer) {
046    this(observer.getClass().getName(), observer);
047  }
048
049  /**
050   * Constructs a Rx synchronous stage.
051   *
052   * @param name     the stage name
053   * @param observer the observer
054   */
055  @Inject
056  public RxSyncStage(@Parameter(StageName.class) final String name,
057                     @Parameter(StageObserver.class) final Observer<T> observer) {
058    super(name);
059    this.observer = observer;
060    StageManager.instance().register(this);
061  }
062
063  /**
064   * Provides the observer with the new value.
065   *
066   * @param value the new value
067   */
068  @Override
069  public void onNext(final T value) {
070    beforeOnNext();
071    observer.onNext(value);
072    afterOnNext();
073  }
074
075  /**
076   * Notifies the observer that the provider has experienced an error
077   * condition.
078   *
079   * @param error the error
080   */
081  @Override
082  public void onError(final Exception error) {
083    observer.onError(error);
084  }
085
086  /**
087   * Notifies the observer that the provider has finished sending push-based
088   * notifications.
089   */
090  @Override
091  public void onCompleted() {
092    observer.onCompleted();
093  }
094
095  /**
096   * Closes the stage.
097   *
098   * @throws Exception
099   */
100  @Override
101  public void close() throws Exception {
102  }
103
104}