001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.impl; 020 021import org.apache.reef.tang.annotations.Parameter; 022import org.apache.reef.wake.AbstractEStage; 023import org.apache.reef.wake.EventHandler; 024import org.apache.reef.wake.StageConfiguration.Capacity; 025import org.apache.reef.wake.StageConfiguration.StageHandler; 026import org.apache.reef.wake.StageConfiguration.StageName; 027 028import javax.inject.Inject; 029import java.util.concurrent.ArrayBlockingQueue; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Single thread stage that runs the event handler. 037 * 038 * @param <T> type 039 */ 040public final class SingleThreadStage<T> extends AbstractEStage<T> { 041 private static final Logger LOG = Logger.getLogger(SingleThreadStage.class.getName()); 042 043 private final BlockingQueue<T> queue; 044 private final Thread thread; 045 private final AtomicBoolean interrupted; 046 047 /** 048 * Constructs a single thread stage. 049 * 050 * @param handler the event handler to execute 051 * @param capacity the queue capacity 052 */ 053 @Inject 054 public SingleThreadStage(@Parameter(StageHandler.class) final EventHandler<T> handler, 055 @Parameter(Capacity.class) final int capacity) { 056 this(handler.getClass().getName(), handler, capacity); 057 } 058 059 /** 060 * Constructs a single thread stage. 061 * 062 * @param name the stage name 063 * @param handler the event handler to execute 064 * @param capacity the queue capacity 065 */ 066 @Inject 067 public SingleThreadStage(@Parameter(StageName.class) final String name, 068 @Parameter(StageHandler.class) final EventHandler<T> handler, 069 @Parameter(Capacity.class) final int capacity) { 070 super(name); 071 queue = new ArrayBlockingQueue<T>(capacity); 072 interrupted = new AtomicBoolean(false); 073 thread = new Thread(new Producer<T>(name, queue, handler, interrupted)); 074 thread.setName("SingleThreadStage<" + name + ">"); 075 thread.start(); 076 StageManager.instance().register(this); 077 } 078 079 /** 080 * Puts the value to the queue, which will be processed by the handler later. 081 * if the queue is full, IllegalStateException is thrown 082 * 083 * @param value the value 084 * @throws IllegalStateException 085 */ 086 @Override 087 public void onNext(final T value) { 088 beforeOnNext(); 089 queue.add(value); 090 } 091 092 /** 093 * Closes the stage. 094 * 095 * @throws Exception 096 */ 097 @Override 098 public void close() throws Exception { 099 if (closed.compareAndSet(false, true)) { 100 interrupted.set(true); 101 thread.interrupt(); 102 } 103 } 104 105 106 /** 107 * Takes events from the queue and provides them to the handler. 108 */ 109 private class Producer<U> implements Runnable { 110 111 private final String name; 112 private final BlockingQueue<U> queue; 113 private final EventHandler<U> handler; 114 private final AtomicBoolean interrupted; 115 116 Producer(final String name, final BlockingQueue<U> queue, final EventHandler<U> handler, 117 final AtomicBoolean interrupted) { 118 this.name = name; 119 this.queue = queue; 120 this.handler = handler; 121 this.interrupted = interrupted; 122 } 123 124 @Override 125 public void run() { 126 while (true) { 127 try { 128 final U value = queue.take(); 129 handler.onNext(value); 130 SingleThreadStage.this.afterOnNext(); 131 } catch (final InterruptedException e) { 132 if (interrupted.get()) { 133 LOG.log(Level.FINEST, name + " Closing Producer due to interruption"); 134 break; 135 } 136 } catch (final Exception t) { 137 LOG.log(Level.SEVERE, name + " Exception from event handler", t); 138 throw t; 139 } 140 } 141 } 142 } 143 144} 145