This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.client;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.driver.parameters.ClientCloseHandlers;
024import org.apache.reef.driver.parameters.ClientCloseWithMessageHandlers;
025import org.apache.reef.driver.parameters.ClientMessageHandlers;
026import org.apache.reef.proto.ClientRuntimeProtocol;
027import org.apache.reef.runtime.common.driver.DriverStatusManager;
028import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
029import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
030import org.apache.reef.runtime.common.utils.RemoteManager;
031import org.apache.reef.tang.InjectionFuture;
032import org.apache.reef.tang.annotations.Parameter;
033import org.apache.reef.wake.EventHandler;
034
035import javax.inject.Inject;
036import java.util.Set;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Represents the Client in the Driver.
042 */
043@Private
044@DriverSide
045public final class ClientManager implements EventHandler<ClientRuntimeProtocol.JobControlProto> {
046
047  private final static Logger LOG = Logger.getLogger(ClientManager.class.getName());
048
049
050  private final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers;
051
052  private final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers;
053
054  private final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers;
055
056  private final DriverStatusManager driverStatusManager;
057
058  private volatile EventHandler<Void> clientCloseDispatcher;
059
060  private volatile EventHandler<byte[]> clientCloseWithMessageDispatcher;
061
062  private volatile EventHandler<byte[]> clientMessageDispatcher;
063
064
065  @Inject
066  ClientManager(final @Parameter(ClientCloseHandlers.class) InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers,
067                final @Parameter(ClientCloseWithMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers,
068                final @Parameter(ClientMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers,
069                final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
070                final RemoteManager remoteManager,
071                final DriverStatusManager driverStatusManager) {
072    this.driverStatusManager = driverStatusManager;
073    this.clientCloseHandlers = clientCloseHandlers;
074    this.clientCloseWithMessageHandlers = clientCloseWithMessageHandlers;
075    this.clientMessageHandlers = clientMessageHandlers;
076
077    if (!clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
078      remoteManager.registerHandler(clientRID, ClientRuntimeProtocol.JobControlProto.class, this);
079    } else {
080      LOG.log(Level.FINE, "Not registering a handler for JobControlProto, as there is no client.");
081    }
082  }
083
084  /**
085   * This method reacts to control messages passed by the client to the driver. It will forward
086   * messages related to the ClientObserver interface to the Driver. It will also initiate a shutdown
087   * if the client indicates a close message.
088   *
089   * @param jobControlProto contains the client initiated control message
090   */
091  @Override
092  public synchronized void onNext(final ClientRuntimeProtocol.JobControlProto jobControlProto) {
093    if (jobControlProto.hasSignal()) {
094      if (jobControlProto.getSignal() == ClientRuntimeProtocol.Signal.SIG_TERMINATE) {
095        try {
096          if (jobControlProto.hasMessage()) {
097            getClientCloseWithMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
098          } else {
099            getClientCloseDispatcher().onNext(null);
100          }
101        } finally {
102          this.driverStatusManager.onComplete();
103        }
104      } else {
105        LOG.log(Level.FINEST, "Unsupported signal: " + jobControlProto.getSignal());
106      }
107    } else if (jobControlProto.hasMessage()) {
108      getClientMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
109    }
110  }
111
112  private synchronized EventHandler<Void> getClientCloseDispatcher() {
113    if (clientCloseDispatcher != null) {
114      return clientCloseDispatcher;
115    } else {
116      synchronized (this) {
117        if (clientCloseDispatcher == null)
118          clientCloseDispatcher = new BroadCastEventHandler<>(clientCloseHandlers.get());
119      }
120      return clientCloseDispatcher;
121    }
122  }
123
124  private EventHandler<byte[]> getClientCloseWithMessageDispatcher() {
125    if (clientCloseWithMessageDispatcher != null) {
126      return clientCloseWithMessageDispatcher;
127    } else {
128      synchronized (this) {
129        if (clientCloseWithMessageDispatcher == null)
130          clientCloseWithMessageDispatcher = new BroadCastEventHandler<>(clientCloseWithMessageHandlers.get());
131      }
132      return clientCloseWithMessageDispatcher;
133    }
134  }
135
136  private EventHandler<byte[]> getClientMessageDispatcher() {
137    if (clientMessageDispatcher != null) {
138      return clientMessageDispatcher;
139    } else {
140      synchronized (this) {
141        if (clientMessageDispatcher == null)
142          clientMessageDispatcher = new BroadCastEventHandler<>(clientMessageHandlers.get());
143      }
144      return clientMessageDispatcher;
145    }
146  }
147
148}