001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.client; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.client.CompletedJob; 025import org.apache.reef.client.FailedJob; 026import org.apache.reef.client.JobMessage; 027import org.apache.reef.client.RunningJob; 028import org.apache.reef.client.parameters.JobCompletedHandler; 029import org.apache.reef.client.parameters.JobFailedHandler; 030import org.apache.reef.client.parameters.JobMessageHandler; 031import org.apache.reef.client.parameters.JobRunningHandler; 032import org.apache.reef.driver.parameters.DriverIdentifier; 033import org.apache.reef.proto.ClientRuntimeProtocol.JobControlProto; 034import org.apache.reef.proto.ClientRuntimeProtocol.Signal; 035import org.apache.reef.proto.ReefServiceProtos; 036import org.apache.reef.proto.ReefServiceProtos.JobStatusProto; 037import org.apache.reef.runtime.common.utils.ExceptionCodec; 038import org.apache.reef.runtime.common.utils.RemoteManager; 039import org.apache.reef.tang.annotations.Parameter; 040import org.apache.reef.util.Optional; 041import org.apache.reef.wake.EventHandler; 042 043import javax.inject.Inject; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046 047/** 048 * Implementation of RunningJob. 049 */ 050@ClientSide 051@Private 052public final class RunningJobImpl implements RunningJob, EventHandler<JobStatusProto> { 053 054 private static final Logger LOG = Logger.getLogger(RunningJob.class.getName()); 055 056 private final String jobId; 057 058 private final EventHandler<JobControlProto> jobControlHandler; 059 private final EventHandler<RunningJob> runningJobEventHandler; 060 private final EventHandler<CompletedJob> completedJobEventHandler; 061 private final EventHandler<FailedJob> failedJobEventHandler; 062 private final EventHandler<JobMessage> jobMessageEventHandler; 063 private final ExceptionCodec exceptionCodec; 064 065 @Inject 066 RunningJobImpl(final RemoteManager remoteManager, 067 @Parameter(DriverIdentifier.class) final String driverIdentifier, 068 @Parameter(REEFImplementation.DriverRemoteIdentifier.class) final String driverRID, 069 @Parameter(JobRunningHandler.class) final EventHandler<RunningJob> runningJobEventHandler, 070 @Parameter(JobCompletedHandler.class) final EventHandler<CompletedJob> completedJobEventHandler, 071 @Parameter(JobFailedHandler.class) final EventHandler<FailedJob> failedJobEventHandler, 072 @Parameter(JobMessageHandler.class) final EventHandler<JobMessage> jobMessageEventHandler, 073 final ExceptionCodec exceptionCodec) { 074 075 this.jobId = driverIdentifier; 076 this.runningJobEventHandler = runningJobEventHandler; 077 this.completedJobEventHandler = completedJobEventHandler; 078 this.failedJobEventHandler = failedJobEventHandler; 079 this.jobMessageEventHandler = jobMessageEventHandler; 080 this.exceptionCodec = exceptionCodec; 081 this.jobControlHandler = remoteManager.getHandler(driverRID, JobControlProto.class); 082 083 this.runningJobEventHandler.onNext(this); 084 LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'"); 085 } 086 087 @Override 088 public synchronized void close() { 089 this.jobControlHandler.onNext( 090 JobControlProto.newBuilder() 091 .setIdentifier(this.jobId) 092 .setSignal(Signal.SIG_TERMINATE) 093 .build() 094 ); 095 } 096 097 @Override 098 public synchronized void close(final byte[] message) { 099 this.jobControlHandler.onNext( 100 JobControlProto.newBuilder() 101 .setIdentifier(this.jobId) 102 .setSignal(Signal.SIG_TERMINATE) 103 .setMessage(ByteString.copyFrom(message)) 104 .build() 105 ); 106 } 107 108 @Override 109 public String getId() { 110 return this.jobId; 111 } 112 113 @Override 114 public synchronized void send(final byte[] message) { 115 this.jobControlHandler.onNext( 116 JobControlProto.newBuilder() 117 .setIdentifier(this.jobId) 118 .setMessage(ByteString.copyFrom(message)) 119 .build() 120 ); 121 } 122 123 @Override 124 public synchronized void onNext(final JobStatusProto value) { 125 126 final ReefServiceProtos.State state = value.getState(); 127 LOG.log(Level.FINEST, "Received job status: {0} from {1}", 128 new Object[]{state, value.getIdentifier()}); 129 130 if (value.hasMessage()) { 131 this.jobMessageEventHandler.onNext( 132 new JobMessage(getId(), value.getMessage().toByteArray())); 133 } 134 if (state == ReefServiceProtos.State.DONE) { 135 this.completedJobEventHandler.onNext(new CompletedJobImpl(this.getId())); 136 } else if (state == ReefServiceProtos.State.FAILED) { 137 this.onJobFailure(value); 138 } 139 } 140 141 /** 142 * Inform the client of a failed job. 143 * 144 * @param jobStatusProto status of the failed job 145 */ 146 private synchronized void onJobFailure(final JobStatusProto jobStatusProto) { 147 assert jobStatusProto.getState() == ReefServiceProtos.State.FAILED; 148 149 final String id = this.jobId; 150 final Optional<byte[]> data = jobStatusProto.hasException() ? 151 Optional.of(jobStatusProto.getException().toByteArray()) : 152 Optional.<byte[]>empty(); 153 final Optional<Throwable> cause = this.exceptionCodec.fromBytes(data); 154 155 final String message = cause.isPresent() ? 156 cause.get().getMessage() : 157 "No Message sent by the Job"; 158 final Optional<String> description = Optional.of(message); 159 160 final FailedJob failedJob = new FailedJob(id, message, description, cause, data); 161 this.failedJobEventHandler.onNext(failedJob); 162 } 163 164 @Override 165 public String toString() { 166 return "RunningJob{'" + this.jobId + "'}"; 167 } 168}