001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.impl; 020 021import org.apache.reef.wake.EventHandler; 022import org.apache.reef.wake.Stage; 023import org.apache.reef.wake.WakeParameters; 024import org.apache.reef.wake.impl.DefaultThreadFactory; 025import org.apache.reef.wake.remote.Encoder; 026import org.apache.reef.wake.remote.exception.RemoteRuntimeException; 027import org.apache.reef.wake.remote.transport.Transport; 028 029import java.util.List; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.TimeUnit; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036/** 037 * Stage to manage resources related to sending event remotely. 038 */ 039public class RemoteSenderStage implements Stage { 040 041 private static final long SHUTDOWN_TIMEOUT = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT; 042 043 private static final Logger LOG = Logger.getLogger(RemoteSenderStage.class.getName()); 044 045 private final ExecutorService executor; 046 private final Encoder encoder; 047 private final Transport transport; 048 049 /** 050 * Constructs a remote sender stage. 051 * 052 * @param encoder the encoder of the event 053 * @param transport the transport to send events 054 * @param numThreads the number of threads 055 */ 056 public RemoteSenderStage(final Encoder encoder, final Transport transport, final int numThreads) { 057 this.encoder = encoder; 058 this.transport = transport; 059 this.executor = Executors.newFixedThreadPool( 060 numThreads, new DefaultThreadFactory(RemoteSenderStage.class.getName())); 061 } 062 063 /** 064 * Returns a new remote sender event handler. 065 * 066 * @return a remote sender event handler 067 */ 068 public <T> EventHandler<RemoteEvent<T>> getHandler() { 069 return new RemoteSenderEventHandler<T>(encoder, transport, executor); 070 } 071 072 /** 073 * Closes the stage. 074 */ 075 @Override 076 public void close() throws Exception { 077 LOG.log(Level.FINE, "close {0}", transport); 078 executor.shutdown(); 079 try { 080 // wait for threads to finish for timeout 081 if (!executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { 082 LOG.log(Level.WARNING, "Executor did not terminate in {0} ms.", SHUTDOWN_TIMEOUT); 083 final List<Runnable> droppedRunnables = executor.shutdownNow(); 084 LOG.log(Level.WARNING, "Executor dropped {0} tasks.", droppedRunnables.size()); 085 } 086 } catch (final InterruptedException e) { 087 LOG.log(Level.WARNING, "Close interrupted", e); 088 throw new RemoteRuntimeException(e); 089 } 090 } 091}