This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.client;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.client.CompletedJob;
025import org.apache.reef.client.FailedJob;
026import org.apache.reef.client.JobMessage;
027import org.apache.reef.client.RunningJob;
028import org.apache.reef.client.parameters.JobCompletedHandler;
029import org.apache.reef.client.parameters.JobFailedHandler;
030import org.apache.reef.client.parameters.JobMessageHandler;
031import org.apache.reef.client.parameters.JobRunningHandler;
032import org.apache.reef.driver.parameters.DriverIdentifier;
033import org.apache.reef.proto.ClientRuntimeProtocol.JobControlProto;
034import org.apache.reef.proto.ClientRuntimeProtocol.Signal;
035import org.apache.reef.proto.ReefServiceProtos;
036import org.apache.reef.proto.ReefServiceProtos.JobStatusProto;
037import org.apache.reef.runtime.common.utils.ExceptionCodec;
038import org.apache.reef.runtime.common.utils.RemoteManager;
039import org.apache.reef.tang.annotations.Parameter;
040import org.apache.reef.util.Optional;
041import org.apache.reef.wake.EventHandler;
042
043import javax.inject.Inject;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047/**
048 * Implementation of RunningJob.
049 */
050@ClientSide
051@Private
052public final class RunningJobImpl implements RunningJob, EventHandler<JobStatusProto> {
053
054  private static final Logger LOG = Logger.getLogger(RunningJob.class.getName());
055
056  private final String jobId;
057
058  private final EventHandler<JobControlProto> jobControlHandler;
059  private final EventHandler<RunningJob> runningJobEventHandler;
060  private final EventHandler<CompletedJob> completedJobEventHandler;
061  private final EventHandler<FailedJob> failedJobEventHandler;
062  private final EventHandler<JobMessage> jobMessageEventHandler;
063  private final ExceptionCodec exceptionCodec;
064
065  @Inject
066  RunningJobImpl(final RemoteManager remoteManager,
067                 @Parameter(DriverIdentifier.class) final String driverIdentifier,
068                 @Parameter(REEFImplementation.DriverRemoteIdentifier.class) final String driverRID,
069                 @Parameter(JobRunningHandler.class) final EventHandler<RunningJob> runningJobEventHandler,
070                 @Parameter(JobCompletedHandler.class) final EventHandler<CompletedJob> completedJobEventHandler,
071                 @Parameter(JobFailedHandler.class) final EventHandler<FailedJob> failedJobEventHandler,
072                 @Parameter(JobMessageHandler.class) final EventHandler<JobMessage> jobMessageEventHandler,
073                 final ExceptionCodec exceptionCodec) {
074
075    this.jobId = driverIdentifier;
076    this.runningJobEventHandler = runningJobEventHandler;
077    this.completedJobEventHandler = completedJobEventHandler;
078    this.failedJobEventHandler = failedJobEventHandler;
079    this.jobMessageEventHandler = jobMessageEventHandler;
080    this.exceptionCodec = exceptionCodec;
081    this.jobControlHandler = remoteManager.getHandler(driverRID, JobControlProto.class);
082
083    this.runningJobEventHandler.onNext(this);
084    LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
085  }
086
087  @Override
088  public synchronized void close() {
089    this.jobControlHandler.onNext(
090        JobControlProto.newBuilder()
091            .setIdentifier(this.jobId)
092            .setSignal(Signal.SIG_TERMINATE)
093            .build()
094    );
095  }
096
097  @Override
098  public synchronized void close(final byte[] message) {
099    this.jobControlHandler.onNext(
100        JobControlProto.newBuilder()
101            .setIdentifier(this.jobId)
102            .setSignal(Signal.SIG_TERMINATE)
103            .setMessage(ByteString.copyFrom(message))
104            .build()
105    );
106  }
107
108  @Override
109  public String getId() {
110    return this.jobId;
111  }
112
113  @Override
114  public synchronized void send(final byte[] message) {
115    this.jobControlHandler.onNext(
116        JobControlProto.newBuilder()
117            .setIdentifier(this.jobId)
118            .setMessage(ByteString.copyFrom(message))
119            .build()
120    );
121  }
122
123  @Override
124  public synchronized void onNext(final JobStatusProto value) {
125
126    final ReefServiceProtos.State state = value.getState();
127    LOG.log(Level.FINEST, "Received job status: {0} from {1}",
128        new Object[]{state, value.getIdentifier()});
129
130    if (value.hasMessage()) {
131      this.jobMessageEventHandler.onNext(
132          new JobMessage(getId(), value.getMessage().toByteArray()));
133    }
134    if (state == ReefServiceProtos.State.DONE) {
135      this.completedJobEventHandler.onNext(new CompletedJobImpl(this.getId()));
136    } else if (state == ReefServiceProtos.State.FAILED) {
137      this.onJobFailure(value);
138    }
139  }
140
141  /**
142   * Inform the client of a failed job.
143   *
144   * @param jobStatusProto status of the failed job
145   */
146  private synchronized void onJobFailure(final JobStatusProto jobStatusProto) {
147    assert jobStatusProto.getState() == ReefServiceProtos.State.FAILED;
148
149    final String id = this.jobId;
150    final Optional<byte[]> data = jobStatusProto.hasException() ?
151        Optional.of(jobStatusProto.getException().toByteArray()) :
152        Optional.<byte[]>empty();
153    final Optional<Throwable> cause = this.exceptionCodec.fromBytes(data);
154
155    final String message = cause.isPresent() ?
156        cause.get().getMessage() :
157        "No Message sent by the Job";
158    final Optional<String> description = Optional.of(message);
159
160    final FailedJob failedJob = new FailedJob(id, message, description, cause, data);
161    this.failedJobEventHandler.onNext(failedJob);
162  }
163
164  @Override
165  public String toString() {
166    return "RunningJob{'" + this.jobId + "'}";
167  }
168}