This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.impl;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.wake.AbstractEStage;
023import org.apache.reef.wake.EventHandler;
024import org.apache.reef.wake.StageConfiguration.Capacity;
025import org.apache.reef.wake.StageConfiguration.StageHandler;
026import org.apache.reef.wake.StageConfiguration.StageName;
027
028import javax.inject.Inject;
029import java.util.concurrent.ArrayBlockingQueue;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Single thread stage that runs the event handler.
037 *
038 * @param <T> type
039 */
040public final class SingleThreadStage<T> extends AbstractEStage<T> {
041  private static final Logger LOG = Logger.getLogger(SingleThreadStage.class.getName());
042
043  private final BlockingQueue<T> queue;
044  private final Thread thread;
045  private final AtomicBoolean interrupted;
046
047  /**
048   * Constructs a single thread stage.
049   *
050   * @param handler  the event handler to execute
051   * @param capacity the queue capacity
052   */
053  @Inject
054  public SingleThreadStage(@Parameter(StageHandler.class) final EventHandler<T> handler,
055                           @Parameter(Capacity.class) final int capacity) {
056    this(handler.getClass().getName(), handler, capacity);
057  }
058
059  /**
060   * Constructs a single thread stage.
061   *
062   * @param name     the stage name
063   * @param handler  the event handler to execute
064   * @param capacity the queue capacity
065   */
066  @Inject
067  public SingleThreadStage(@Parameter(StageName.class) final String name,
068                           @Parameter(StageHandler.class) final EventHandler<T> handler,
069                           @Parameter(Capacity.class) final int capacity) {
070    super(name);
071    queue = new ArrayBlockingQueue<T>(capacity);
072    interrupted = new AtomicBoolean(false);
073    thread = new Thread(new Producer<T>(name, queue, handler, interrupted));
074    thread.setName("SingleThreadStage<" + name + ">");
075    thread.start();
076    StageManager.instance().register(this);
077  }
078
079  /**
080   * Puts the value to the queue, which will be processed by the handler later.
081   * if the queue is full, IllegalStateException is thrown
082   *
083   * @param value the value
084   * @throws IllegalStateException
085   */
086  @Override
087  public void onNext(final T value) {
088    beforeOnNext();
089    queue.add(value);
090  }
091
092  /**
093   * Closes the stage.
094   *
095   * @throws Exception
096   */
097  @Override
098  public void close() throws Exception {
099    if (closed.compareAndSet(false, true)) {
100      interrupted.set(true);
101      thread.interrupt();
102    }
103  }
104
105
106  /**
107   * Takes events from the queue and provides them to the handler.
108   */
109  private class Producer<U> implements Runnable {
110
111    private final String name;
112    private final BlockingQueue<U> queue;
113    private final EventHandler<U> handler;
114    private final AtomicBoolean interrupted;
115
116    Producer(final String name, final BlockingQueue<U> queue, final EventHandler<U> handler,
117             final AtomicBoolean interrupted) {
118      this.name = name;
119      this.queue = queue;
120      this.handler = handler;
121      this.interrupted = interrupted;
122    }
123
124    @Override
125    public void run() {
126      while (true) {
127        try {
128          final U value = queue.take();
129          handler.onNext(value);
130          SingleThreadStage.this.afterOnNext();
131        } catch (final InterruptedException e) {
132          if (interrupted.get()) {
133            LOG.log(Level.FINEST, name + " Closing Producer due to interruption");
134            break;
135          }
136        } catch (final Exception t) {
137          LOG.log(Level.SEVERE, name + " Exception from event handler", t);
138          throw t;
139        }
140      }
141    }
142  }
143
144}
145