001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.client;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.parameters.ClientCloseHandlers;
024import org.apache.reef.driver.parameters.ClientCloseWithMessageHandlers;
025import org.apache.reef.driver.parameters.ClientMessageHandlers;
026import org.apache.reef.proto.ClientRuntimeProtocol;
027import org.apache.reef.runtime.common.driver.DriverStatusManager;
028import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
029import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
030import org.apache.reef.runtime.common.utils.RemoteManager;
031import org.apache.reef.tang.InjectionFuture;
032import org.apache.reef.tang.annotations.Parameter;
033import org.apache.reef.wake.EventHandler;
034
035import javax.inject.Inject;
036import java.util.Set;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Represents the Client in the Driver.
042 */
043@Private
044@DriverSide
045public final class ClientManager implements EventHandler<ClientRuntimeProtocol.JobControlProto> {
046
047  private static final Logger LOG = Logger.getLogger(ClientManager.class.getName());
048
049
050  private final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers;
051
052  private final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers;
053
054  private final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers;
055
056  private final DriverStatusManager driverStatusManager;
057
058  private volatile EventHandler<Void> clientCloseDispatcher;
059
060  private volatile EventHandler<byte[]> clientCloseWithMessageDispatcher;
061
062  private volatile EventHandler<byte[]> clientMessageDispatcher;
063
064
065  @Inject
066  ClientManager(@Parameter(ClientCloseHandlers.class)
067                final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers,
068                @Parameter(ClientCloseWithMessageHandlers.class)
069                final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers,
070                @Parameter(ClientMessageHandlers.class)
071                final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers,
072                @Parameter(ClientRemoteIdentifier.class) final String clientRID,
073                final RemoteManager remoteManager,
074                final DriverStatusManager driverStatusManager) {
075    this.driverStatusManager = driverStatusManager;
076    this.clientCloseHandlers = clientCloseHandlers;
077    this.clientCloseWithMessageHandlers = clientCloseWithMessageHandlers;
078    this.clientMessageHandlers = clientMessageHandlers;
079
080    if (!clientRID.equals(ClientRemoteIdentifier.NONE)) {
081      remoteManager.registerHandler(clientRID, ClientRuntimeProtocol.JobControlProto.class, this);
082    } else {
083      LOG.log(Level.FINE, "Not registering a handler for JobControlProto, as there is no client.");
084    }
085  }
086
087  /**
088   * This method reacts to control messages passed by the client to the driver. It will forward
089   * messages related to the ClientObserver interface to the Driver. It will also initiate a shutdown
090   * if the client indicates a close message.
091   *
092   * @param jobControlProto contains the client initiated control message
093   */
094  @Override
095  public synchronized void onNext(final ClientRuntimeProtocol.JobControlProto jobControlProto) {
096    if (jobControlProto.hasSignal()) {
097      if (jobControlProto.getSignal() == ClientRuntimeProtocol.Signal.SIG_TERMINATE) {
098        try {
099          if (jobControlProto.hasMessage()) {
100            getClientCloseWithMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
101          } else {
102            getClientCloseDispatcher().onNext(null);
103          }
104        } finally {
105          this.driverStatusManager.onComplete();
106        }
107      } else {
108        LOG.log(Level.FINEST, "Unsupported signal: " + jobControlProto.getSignal());
109      }
110    } else if (jobControlProto.hasMessage()) {
111      getClientMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
112    }
113  }
114
115  private synchronized EventHandler<Void> getClientCloseDispatcher() {
116    if (clientCloseDispatcher != null) {
117      return clientCloseDispatcher;
118    } else {
119      synchronized (this) {
120        if (clientCloseDispatcher == null) {
121          clientCloseDispatcher = new BroadCastEventHandler<>(clientCloseHandlers.get());
122        }
123      }
124      return clientCloseDispatcher;
125    }
126  }
127
128  private EventHandler<byte[]> getClientCloseWithMessageDispatcher() {
129    if (clientCloseWithMessageDispatcher != null) {
130      return clientCloseWithMessageDispatcher;
131    } else {
132      synchronized (this) {
133        if (clientCloseWithMessageDispatcher == null) {
134          clientCloseWithMessageDispatcher = new BroadCastEventHandler<>(clientCloseWithMessageHandlers.get());
135        }
136      }
137      return clientCloseWithMessageDispatcher;
138    }
139  }
140
141  private EventHandler<byte[]> getClientMessageDispatcher() {
142    if (clientMessageDispatcher != null) {
143      return clientMessageDispatcher;
144    } else {
145      synchronized (this) {
146        if (clientMessageDispatcher == null) {
147          clientMessageDispatcher = new BroadCastEventHandler<>(clientMessageHandlers.get());
148        }
149      }
150      return clientMessageDispatcher;
151    }
152  }
153
154}