This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.impl;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.wake.AbstractEStage;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.StageConfiguration.ErrorHandler;
025import org.apache.reef.wake.StageConfiguration.StageHandler;
026import org.apache.reef.wake.StageConfiguration.StageName;
027
028import javax.inject.Inject;
029import java.util.logging.Level;
030import java.util.logging.Logger;
031
032/**
033 * Stage that synchronously executes an event handler.
034 *
035 * @param <T> type
036 */
037public final class SyncStage<T> extends AbstractEStage<T> {
038
039  private static final Logger LOG = Logger.getLogger(SyncStage.class.getName());
040
041  private final EventHandler<T> handler;
042  private final EventHandler<Throwable> errorHandler;
043
044  /**
045   * Constructs a synchronous stage.
046   *
047   * @param handler the event handler
048   */
049  @Inject
050  public SyncStage(@Parameter(StageHandler.class) final EventHandler<T> handler) {
051    this(handler.getClass().getName(), handler, null);
052  }
053
054  /**
055   * Constructs a synchronous stage.
056   *
057   * @param name the stage name
058   * @param handler the event handler
059   */
060  @Inject
061  public SyncStage(@Parameter(StageName.class) final String name,
062                   @Parameter(StageHandler.class) final EventHandler<T> handler) {
063    this(name, handler, null);
064  }
065
066  /**
067   * Constructs a synchronous stage.
068   *
069   * @param name the stage name
070   * @param handler      the event handler
071   * @param errorHandler the error handler
072   */
073  @Inject
074  public SyncStage(@Parameter(StageName.class) final String name,
075                   @Parameter(StageHandler.class) final EventHandler<T> handler,
076                   @Parameter(ErrorHandler.class) final EventHandler<Throwable> errorHandler) {
077    super(name);
078    this.handler = handler;
079    this.errorHandler = errorHandler;
080    StageManager.instance().register(this);
081  }
082
083  /**
084   * Invokes the handler for the event.
085   *
086   * @param value the event
087   */
088  @Override
089  @SuppressWarnings("checkstyle:illegalcatch")
090  public void onNext(final T value) {
091    beforeOnNext();
092    try {
093      handler.onNext(value);
094    } catch (final Throwable t) {
095      if (errorHandler != null) {
096        errorHandler.onNext(t);
097      } else {
098        LOG.log(Level.SEVERE, name + " Exception from event handler", t);
099        throw t;
100      }
101    }
102    afterOnNext();
103  }
104
105  /**
106   * Closes resources.
107   *
108   * @throws Exception
109   */
110  @Override
111  public void close() throws Exception {
112  }
113
114}