001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.impl; 020 021import org.apache.reef.wake.EStage; 022import org.apache.reef.wake.EventHandler; 023import org.apache.reef.wake.WakeParameters; 024import org.apache.reef.wake.impl.DefaultThreadFactory; 025import org.apache.reef.wake.impl.ThreadPoolStage; 026import org.apache.reef.wake.remote.exception.RemoteRuntimeException; 027 028import java.util.List; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.TimeUnit; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Receive incoming events and dispatch to correct handlers. 037 */ 038public class RemoteReceiverStage implements EStage<TransportEvent> { 039 040 private static final Logger LOG = Logger.getLogger(RemoteReceiverStage.class.getName()); 041 042 private final EventHandler<TransportEvent> handler; 043 private final ThreadPoolStage<TransportEvent> stage; 044 private final ExecutorService executor; // for decoupling 045 046 private final long shutdownTimeout = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT; 047 048 /** 049 * Constructs a remote receiver stage. 050 * 051 * @param handler the handler of remote events 052 * @param errorHandler the exception handler 053 * @param numThreads the number of threads 054 */ 055 public RemoteReceiverStage(final EventHandler<RemoteEvent<byte[]>> handler, 056 final EventHandler<Throwable> errorHandler, final int numThreads) { 057 058 this.handler = new RemoteReceiverEventHandler(handler); 059 060 this.executor = Executors.newFixedThreadPool( 061 numThreads, new DefaultThreadFactory(RemoteReceiverStage.class.getName())); 062 063 this.stage = new ThreadPoolStage<>(this.handler, this.executor, errorHandler); 064 } 065 066 /** 067 * Handles the received event. 068 * 069 * @param value the event 070 */ 071 @Override 072 public void onNext(final TransportEvent value) { 073 LOG.log(Level.FINEST, "{0}", value); 074 stage.onNext(value); 075 } 076 077 /** 078 * Closes the stage. 079 */ 080 @Override 081 public void close() throws Exception { 082 LOG.log(Level.FINE, "close"); 083 084 if (this.executor != null) { 085 this.executor.shutdown(); 086 try { 087 // wait for threads to finish for timeout 088 if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { 089 LOG.log(Level.WARNING, "Executor did not terminate in {0} ms.", shutdownTimeout); 090 final List<Runnable> droppedRunnables = executor.shutdownNow(); 091 LOG.log(Level.WARNING, "Executor dropped {0} tasks.", droppedRunnables.size()); 092 } 093 } catch (final InterruptedException e) { 094 LOG.log(Level.WARNING, "Close interrupted", e); 095 throw new RemoteRuntimeException(e); 096 } 097 } 098 } 099}