001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.client; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.driver.parameters.ClientCloseHandlers; 024import org.apache.reef.driver.parameters.ClientCloseWithMessageHandlers; 025import org.apache.reef.driver.parameters.ClientMessageHandlers; 026import org.apache.reef.proto.ClientRuntimeProtocol; 027import org.apache.reef.runtime.common.driver.DriverStatusManager; 028import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration; 029import org.apache.reef.runtime.common.utils.BroadCastEventHandler; 030import org.apache.reef.runtime.common.utils.RemoteManager; 031import org.apache.reef.tang.InjectionFuture; 032import org.apache.reef.tang.annotations.Parameter; 033import org.apache.reef.wake.EventHandler; 034 035import javax.inject.Inject; 036import java.util.Set; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Represents the Client in the Driver. 042 */ 043@Private 044@DriverSide 045public final class ClientManager implements EventHandler<ClientRuntimeProtocol.JobControlProto> { 046 047 private final static Logger LOG = Logger.getLogger(ClientManager.class.getName()); 048 049 050 private final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers; 051 052 private final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers; 053 054 private final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers; 055 056 private final DriverStatusManager driverStatusManager; 057 058 private volatile EventHandler<Void> clientCloseDispatcher; 059 060 private volatile EventHandler<byte[]> clientCloseWithMessageDispatcher; 061 062 private volatile EventHandler<byte[]> clientMessageDispatcher; 063 064 065 @Inject 066 ClientManager(final @Parameter(ClientCloseHandlers.class) InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers, 067 final @Parameter(ClientCloseWithMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers, 068 final @Parameter(ClientMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers, 069 final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID, 070 final RemoteManager remoteManager, 071 final DriverStatusManager driverStatusManager) { 072 this.driverStatusManager = driverStatusManager; 073 this.clientCloseHandlers = clientCloseHandlers; 074 this.clientCloseWithMessageHandlers = clientCloseWithMessageHandlers; 075 this.clientMessageHandlers = clientMessageHandlers; 076 077 if (!clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) { 078 remoteManager.registerHandler(clientRID, ClientRuntimeProtocol.JobControlProto.class, this); 079 } else { 080 LOG.log(Level.FINE, "Not registering a handler for JobControlProto, as there is no client."); 081 } 082 } 083 084 /** 085 * This method reacts to control messages passed by the client to the driver. It will forward 086 * messages related to the ClientObserver interface to the Driver. It will also initiate a shutdown 087 * if the client indicates a close message. 088 * 089 * @param jobControlProto contains the client initiated control message 090 */ 091 @Override 092 public synchronized void onNext(final ClientRuntimeProtocol.JobControlProto jobControlProto) { 093 if (jobControlProto.hasSignal()) { 094 if (jobControlProto.getSignal() == ClientRuntimeProtocol.Signal.SIG_TERMINATE) { 095 try { 096 if (jobControlProto.hasMessage()) { 097 getClientCloseWithMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray()); 098 } else { 099 getClientCloseDispatcher().onNext(null); 100 } 101 } finally { 102 this.driverStatusManager.onComplete(); 103 } 104 } else { 105 LOG.log(Level.FINEST, "Unsupported signal: " + jobControlProto.getSignal()); 106 } 107 } else if (jobControlProto.hasMessage()) { 108 getClientMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray()); 109 } 110 } 111 112 private synchronized EventHandler<Void> getClientCloseDispatcher() { 113 if (clientCloseDispatcher != null) { 114 return clientCloseDispatcher; 115 } else { 116 synchronized (this) { 117 if (clientCloseDispatcher == null) 118 clientCloseDispatcher = new BroadCastEventHandler<>(clientCloseHandlers.get()); 119 } 120 return clientCloseDispatcher; 121 } 122 } 123 124 private EventHandler<byte[]> getClientCloseWithMessageDispatcher() { 125 if (clientCloseWithMessageDispatcher != null) { 126 return clientCloseWithMessageDispatcher; 127 } else { 128 synchronized (this) { 129 if (clientCloseWithMessageDispatcher == null) 130 clientCloseWithMessageDispatcher = new BroadCastEventHandler<>(clientCloseWithMessageHandlers.get()); 131 } 132 return clientCloseWithMessageDispatcher; 133 } 134 } 135 136 private EventHandler<byte[]> getClientMessageDispatcher() { 137 if (clientMessageDispatcher != null) { 138 return clientMessageDispatcher; 139 } else { 140 synchronized (this) { 141 if (clientMessageDispatcher == null) 142 clientMessageDispatcher = new BroadCastEventHandler<>(clientMessageHandlers.get()); 143 } 144 return clientMessageDispatcher; 145 } 146 } 147 148}