001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.client; 020 021import org.apache.reef.annotations.Provided; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.client.REEF; 025import org.apache.reef.proto.ClientRuntimeProtocol.JobSubmissionProto; 026import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; 027import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; 028import org.apache.reef.tang.Configuration; 029import org.apache.reef.tang.annotations.Name; 030import org.apache.reef.tang.annotations.NamedParameter; 031import org.apache.reef.util.REEFVersion; 032import org.apache.reef.util.logging.LoggingScope; 033import org.apache.reef.util.logging.LoggingScopeFactory; 034 035import javax.inject.Inject; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039@ClientSide 040@Provided 041@Private 042public final class REEFImplementation implements REEF { 043 044 private final static Logger LOG = Logger.getLogger(REEFImplementation.class.getName()); 045 046 private final JobSubmissionHandler jobSubmissionHandler; 047 private final RunningJobs runningJobs; 048 private final JobSubmissionHelper jobSubmissionHelper; 049 private final ClientWireUp clientWireUp; 050 private final LoggingScopeFactory loggingScopeFactory; 051 052 /** 053 * @param jobSubmissionHandler 054 * @param runningJobs 055 * @param jobSubmissionHelper 056 * @param jobStatusMessageHandler is passed only to make sure it is instantiated 057 * @param clientWireUp 058 * @param reefVersion provides the current version of REEF. 059 */ 060 @Inject 061 REEFImplementation(final JobSubmissionHandler jobSubmissionHandler, 062 final RunningJobs runningJobs, 063 final JobSubmissionHelper jobSubmissionHelper, 064 final JobStatusMessageHandler jobStatusMessageHandler, 065 final ClientWireUp clientWireUp, 066 final LoggingScopeFactory loggingScopeFactory, 067 final REEFVersion reefVersion) { 068 this.jobSubmissionHandler = jobSubmissionHandler; 069 this.runningJobs = runningJobs; 070 this.jobSubmissionHelper = jobSubmissionHelper; 071 this.clientWireUp = clientWireUp; 072 clientWireUp.performWireUp(); 073 this.loggingScopeFactory = loggingScopeFactory; 074 reefVersion.logVersion(); 075 } 076 077 @Override 078 public final void close() { 079 this.runningJobs.closeAllJobs(); 080 this.clientWireUp.close(); 081 } 082 083 @Override 084 public void submit(final Configuration driverConf) { 085 try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) { 086 final JobSubmissionProto submissionMessage; 087 try { 088 if (this.clientWireUp.isClientPresent()) { 089 submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf) 090 .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier()) 091 .build(); 092 } else { 093 submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf) 094 .setRemoteId(ErrorHandlerRID.NONE) 095 .build(); 096 } 097 } catch (final Exception e) { 098 throw new RuntimeException("Exception while processing driver configuration.", e); 099 } 100 101 this.jobSubmissionHandler.onNext(submissionMessage); 102 } 103 } 104 105 @NamedParameter(doc = "The driver remote identifier.") 106 public final static class DriverRemoteIdentifier implements Name<String> { 107 } 108 109 110}