001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.rx.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.StageConfiguration.StageName; 023import org.apache.reef.wake.StageConfiguration.StageObserver; 024import org.apache.reef.wake.impl.StageManager; 025import org.apache.reef.wake.rx.AbstractRxStage; 026import org.apache.reef.wake.rx.Observer; 027 028import javax.inject.Inject; 029 030/** 031 * Stage that executes the observer synchronously. 032 * 033 * @param <T> type 034 */ 035public final class RxSyncStage<T> extends AbstractRxStage<T> { 036 037 private final Observer<T> observer; 038 039 /** 040 * Constructs a Rx synchronous stage. 041 * 042 * @param observer the observer 043 */ 044 @Inject 045 public RxSyncStage(@Parameter(StageObserver.class) final Observer<T> observer) { 046 this(observer.getClass().getName(), observer); 047 } 048 049 /** 050 * Constructs a Rx synchronous stage. 051 * 052 * @param name the stage name 053 * @param observer the observer 054 */ 055 @Inject 056 public RxSyncStage(@Parameter(StageName.class) final String name, 057 @Parameter(StageObserver.class) final Observer<T> observer) { 058 super(name); 059 this.observer = observer; 060 StageManager.instance().register(this); 061 } 062 063 /** 064 * Provides the observer with the new value. 065 * 066 * @param value the new value 067 */ 068 @Override 069 public void onNext(final T value) { 070 beforeOnNext(); 071 observer.onNext(value); 072 afterOnNext(); 073 } 074 075 /** 076 * Notifies the observer that the provider has experienced an error 077 * condition. 078 * 079 * @param error the error 080 */ 081 @Override 082 public void onError(final Exception error) { 083 observer.onError(error); 084 } 085 086 /** 087 * Notifies the observer that the provider has finished sending push-based 088 * notifications. 089 */ 090 @Override 091 public void onCompleted() { 092 observer.onCompleted(); 093 } 094 095 /** 096 * Closes the stage. 097 * 098 * @throws Exception 099 */ 100 @Override 101 public void close() throws Exception { 102 } 103 104}