001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.client; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.proto.ReefServiceProtos; 024import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration; 025import org.apache.reef.runtime.common.utils.RemoteManager; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.wake.EventHandler; 028 029import javax.inject.Inject; 030import java.util.logging.Level; 031import java.util.logging.Logger; 032 033/** 034 * Represents the communication channel to the client. 035 */ 036@DriverSide 037public final class ClientConnection { 038 039 private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName()); 040 041 private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler; 042 private final String jobIdentifier; 043 044 @Inject 045 public ClientConnection( 046 final RemoteManager remoteManager, 047 final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID, 048 final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier) { 049 this.jobIdentifier = jobIdentifier; 050 if (clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) { 051 LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client."); 052 this.jobStatusHandler = new LoggingJobStatusHandler(); 053 } else { 054 this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class); 055 LOG.log(Level.FINE, "Instantiated 'ClientConnection'"); 056 } 057 } 058 059 /** 060 * Send the given JobStatus to the client. 061 * 062 * @param status 063 */ 064 public synchronized void send(final ReefServiceProtos.JobStatusProto status) { 065 LOG.log(Level.FINEST, "Sending:\n" + status); 066 this.jobStatusHandler.onNext(status); 067 } 068 069 /** 070 * Send the given byte[] as a message to the client. 071 * 072 * @param message 073 */ 074 public synchronized void sendMessage(final byte[] message) { 075 this.send(ReefServiceProtos.JobStatusProto.newBuilder() 076 .setIdentifier(this.jobIdentifier) 077 .setState(ReefServiceProtos.State.RUNNING) 078 .setMessage(ByteString.copyFrom(message)) 079 .build()); 080 } 081} 082