001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.AbstractEStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.StageConfiguration.ErrorHandler; 025import org.apache.reef.wake.StageConfiguration.StageHandler; 026import org.apache.reef.wake.StageConfiguration.StageName; 027 028import javax.inject.Inject; 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032/** 033 * Stage that synchronously executes an event handler. 034 * 035 * @param <T> type 036 */ 037public final class SyncStage<T> extends AbstractEStage<T> { 038 039 private static final Logger LOG = Logger.getLogger(SyncStage.class.getName()); 040 041 private final EventHandler<T> handler; 042 private final EventHandler<Throwable> errorHandler; 043 044 /** 045 * Constructs a synchronous stage. 046 * 047 * @param handler the event handler 048 */ 049 @Inject 050 public SyncStage(@Parameter(StageHandler.class) final EventHandler<T> handler) { 051 this(handler.getClass().getName(), handler, null); 052 } 053 054 /** 055 * Constructs a synchronous stage. 056 * 057 * @param name the stage name 058 * @param handler the event handler 059 */ 060 @Inject 061 public SyncStage(@Parameter(StageName.class) final String name, 062 @Parameter(StageHandler.class) final EventHandler<T> handler) { 063 this(name, handler, null); 064 } 065 066 /** 067 * Constructs a synchronous stage. 068 * 069 * @param name the stage name 070 * @param handler the event handler 071 * @param errorHandler the error handler 072 */ 073 @Inject 074 public SyncStage(@Parameter(StageName.class) final String name, 075 @Parameter(StageHandler.class) final EventHandler<T> handler, 076 @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) { 077 super(name); 078 this.handler = handler; 079 this.errorHandler = errorHandler; 080 StageManager.instance().register(this); 081 } 082 083 /** 084 * Invokes the handler for the event. 085 * 086 * @param value the event 087 */ 088 @Override 089 @SuppressWarnings("checkstyle:illegalcatch") 090 public void onNext(final T value) { 091 beforeOnNext(); 092 try { 093 handler.onNext(value); 094 } catch (final Throwable t) { 095 if (errorHandler != null) { 096 errorHandler.onNext(t); 097 } else { 098 LOG.log(Level.SEVERE, name + " Exception from event handler", t); 099 throw t; 100 } 101 } 102 afterOnNext(); 103 } 104 105 /** 106 * Closes resources. 107 * 108 * @throws Exception 109 */ 110 @Override 111 public void close() throws Exception { 112 } 113 114}