001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.wake.remote.impl; 020 021import org.apache.reef.wake.EStage; 022import org.apache.reef.wake.EventHandler; 023import org.apache.reef.wake.WakeParameters; 024import org.apache.reef.wake.impl.DefaultThreadFactory; 025import org.apache.reef.wake.impl.ThreadPoolStage; 026import org.apache.reef.wake.remote.exception.RemoteRuntimeException; 027 028import java.net.SocketAddress; 029import java.util.List; 030import java.util.concurrent.*; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Receive incoming events and dispatch to correct handlers in order. 036 */ 037public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { 038 039 private static final Logger LOG = Logger.getLogger(OrderedRemoteReceiverStage.class.getName()); 040 041 private static final String CLASS_NAME = OrderedRemoteReceiverStage.class.getSimpleName(); 042 043 private static final long SHUTDOWN_TIMEOUT = WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT; 044 045 private final ExecutorService pushExecutor; 046 private final ExecutorService pullExecutor; 047 048 private final ThreadPoolStage<TransportEvent> pushStage; 049 050 /** 051 * Constructs an ordered remote receiver stage. 052 * 053 * @param handler the handler of remote events 054 * @param errorHandler the exception handler 055 */ 056 public OrderedRemoteReceiverStage( 057 final EventHandler<RemoteEvent<byte[]>> handler, final EventHandler<Throwable> errorHandler) { 058 059 this.pushExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(CLASS_NAME + ":Push")); 060 this.pullExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(CLASS_NAME + ":Pull")); 061 062 final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap = new ConcurrentHashMap<>(); 063 064 final ThreadPoolStage<OrderedEventStream> pullStage = new ThreadPoolStage<>( 065 new OrderedPullEventHandler(handler), this.pullExecutor, errorHandler); 066 067 this.pushStage = new ThreadPoolStage<>( 068 new OrderedPushEventHandler(streamMap, pullStage), this.pushExecutor, errorHandler); // for decoupling 069 } 070 071 @Override 072 public void onNext(final TransportEvent value) { 073 LOG.log(Level.FINEST, "Push: {0}", value); 074 this.pushStage.onNext(value); 075 } 076 077 @Override 078 public void close() throws Exception { 079 close("PushExecutor", this.pushExecutor); 080 close("PullExecutor", this.pullExecutor); 081 } 082 083 private static void close(final String name, final ExecutorService executor) { 084 LOG.log(Level.FINE, "Close {0} begin", name); 085 executor.shutdown(); 086 try { 087 // wait for threads to finish for timeout 088 if (!executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { 089 LOG.log(Level.WARNING, "{0}: Executor did not terminate in {1} ms.", new Object[] {name, SHUTDOWN_TIMEOUT}); 090 final List<Runnable> droppedRunnables = executor.shutdownNow(); 091 LOG.log(Level.WARNING, "{0}: Executor dropped {1} tasks.", new Object[] {name, droppedRunnables.size()}); 092 } 093 } catch (final InterruptedException e) { 094 LOG.log(Level.WARNING, "Close interrupted"); 095 throw new RemoteRuntimeException(e); 096 } 097 LOG.log(Level.FINE, "Close {0} end", name); 098 } 099} 100 101class OrderedPushEventHandler implements EventHandler<TransportEvent> { 102 103 private static final Logger LOG = Logger.getLogger(OrderedPushEventHandler.class.getName()); 104 105 private final RemoteEventCodec<byte[]> codec; 106 private final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap; // per remote address 107 private final ThreadPoolStage<OrderedEventStream> pullStage; 108 109 OrderedPushEventHandler(final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap, 110 final ThreadPoolStage<OrderedEventStream> pullStage) { 111 this.codec = new RemoteEventCodec<>(new ByteCodec()); 112 this.streamMap = streamMap; 113 this.pullStage = pullStage; 114 } 115 116 @Override 117 public void onNext(final TransportEvent value) { 118 final RemoteEvent<byte[]> re = codec.decode(value.getData()); 119 re.setLocalAddress(value.getLocalAddress()); 120 re.setRemoteAddress(value.getRemoteAddress()); 121 122 if (LOG.isLoggable(Level.FINER)) { 123 LOG.log(Level.FINER, "{0} {1}", new Object[]{value, re}); 124 } 125 126 LOG.log(Level.FINER, "Value length is {0}", value.getData().length); 127 128 final SocketAddress addr = re.remoteAddress(); 129 OrderedEventStream stream = streamMap.get(re.remoteAddress()); 130 if (stream == null) { 131 stream = new OrderedEventStream(); 132 if (streamMap.putIfAbsent(addr, stream) != null) { 133 stream = streamMap.get(addr); 134 } 135 } 136 stream.add(re); 137 pullStage.onNext(stream); 138 } 139} 140 141class OrderedPullEventHandler implements EventHandler<OrderedEventStream> { 142 143 private static final Logger LOG = Logger.getLogger(OrderedPullEventHandler.class.getName()); 144 145 private final EventHandler<RemoteEvent<byte[]>> handler; 146 147 OrderedPullEventHandler(final EventHandler<RemoteEvent<byte[]>> handler) { 148 this.handler = handler; 149 } 150 151 @Override 152 public void onNext(final OrderedEventStream stream) { 153 if (LOG.isLoggable(Level.FINER)) { 154 LOG.log(Level.FINER, "{0}", stream); 155 } 156 157 synchronized (stream) { 158 RemoteEvent<byte[]> event; 159 while ((event = stream.consume()) != null) { 160 handler.onNext(event); 161 } 162 } 163 } 164} 165 166class OrderedEventStream { 167 private static final Logger LOG = Logger.getLogger(OrderedEventStream.class.getName()); 168 private final BlockingQueue<RemoteEvent<byte[]>> queue; // a queue of remote events 169 private long nextSeq; // the number of the next event to consume 170 171 OrderedEventStream() { 172 queue = new PriorityBlockingQueue<>(11, new RemoteEventComparator<byte[]>()); 173 nextSeq = 0; 174 } 175 176 synchronized void add(final RemoteEvent<byte[]> event) { 177 queue.add(event); 178 } 179 180 synchronized RemoteEvent<byte[]> consume() { 181 RemoteEvent<byte[]> event = queue.peek(); 182 if (event != null) { 183 184 if (event.getSeq() == nextSeq) { 185 event = queue.poll(); 186 ++nextSeq; 187 return event; 188 } else { 189 LOG.log(Level.FINER, "Event sequence is {0} does not match expected {1}", 190 new Object[]{event.getSeq(), nextSeq}); 191 } 192 } else { 193 LOG.log(Level.FINER, "Event is null"); 194 } 195 196 return null; 197 } 198}