001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.client; 020 021import org.apache.reef.annotations.Provided; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.client.REEF; 025import org.apache.reef.client.parameters.DriverConfigurationProviders; 026import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; 027import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; 028import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.ConfigurationBuilder; 031import org.apache.reef.tang.ConfigurationProvider; 032import org.apache.reef.tang.Tang; 033import org.apache.reef.tang.annotations.Name; 034import org.apache.reef.tang.annotations.NamedParameter; 035import org.apache.reef.tang.annotations.Parameter; 036import org.apache.reef.util.REEFVersion; 037import org.apache.reef.util.logging.LoggingScope; 038import org.apache.reef.util.logging.LoggingScopeFactory; 039 040import javax.inject.Inject; 041import java.util.Set; 042import java.util.logging.Logger; 043 044/** 045 * Default REEF implementation. 046 */ 047@ClientSide 048@Provided 049@Private 050public final class REEFImplementation implements REEF { 051 052 private static final Logger LOG = Logger.getLogger(REEFImplementation.class.getName()); 053 054 private final JobSubmissionHandler jobSubmissionHandler; 055 private final RunningJobs runningJobs; 056 private final JobSubmissionHelper jobSubmissionHelper; 057 private final ClientWireUp clientWireUp; 058 private final LoggingScopeFactory loggingScopeFactory; 059 private final Set<ConfigurationProvider> configurationProviders; 060 061 /** 062 * @param jobSubmissionHandler 063 * @param runningJobs 064 * @param jobSubmissionHelper 065 * @param jobStatusMessageHandler is passed only to make sure it is instantiated 066 * @param clientWireUp 067 * @param reefVersion provides the current version of REEF. 068 * @param configurationProviders 069 */ 070 @Inject 071 REEFImplementation(final JobSubmissionHandler jobSubmissionHandler, 072 final RunningJobs runningJobs, 073 final JobSubmissionHelper jobSubmissionHelper, 074 final JobStatusMessageHandler jobStatusMessageHandler, 075 final ClientWireUp clientWireUp, 076 final LoggingScopeFactory loggingScopeFactory, 077 final REEFVersion reefVersion, 078 @Parameter(DriverConfigurationProviders.class) 079 final Set<ConfigurationProvider> configurationProviders) { 080 this.jobSubmissionHandler = jobSubmissionHandler; 081 this.runningJobs = runningJobs; 082 this.jobSubmissionHelper = jobSubmissionHelper; 083 this.clientWireUp = clientWireUp; 084 this.configurationProviders = configurationProviders; 085 clientWireUp.performWireUp(); 086 this.loggingScopeFactory = loggingScopeFactory; 087 reefVersion.logVersion(); 088 } 089 090 @Override 091 public void close() { 092 this.runningJobs.closeAllJobs(); 093 this.clientWireUp.close(); 094 } 095 096 @Override 097 public void submit(final Configuration driverConf) { 098 try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) { 099 final Configuration driverConfiguration = createDriverConfiguration(driverConf); 100 final JobSubmissionEvent submissionMessage; 101 try { 102 if (this.clientWireUp.isClientPresent()) { 103 submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConfiguration) 104 .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier()) 105 .build(); 106 } else { 107 submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConfiguration) 108 .setRemoteId(ErrorHandlerRID.NONE) 109 .build(); 110 } 111 } catch (final Exception e) { 112 throw new RuntimeException("Exception while processing driver configuration.", e); 113 } 114 115 this.jobSubmissionHandler.onNext(submissionMessage); 116 } 117 } 118 119 /** 120 * Assembles the final Driver Configuration by merging in all the Configurations provided by ConfigurationProviders. 121 * 122 * @param driverConfiguration 123 * @return 124 */ 125 private Configuration createDriverConfiguration(final Configuration driverConfiguration) { 126 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang() 127 .newConfigurationBuilder(driverConfiguration); 128 for (final ConfigurationProvider configurationProvider : this.configurationProviders) { 129 configurationBuilder.addConfiguration(configurationProvider.getConfiguration()); 130 } 131 return configurationBuilder.build(); 132 } 133 134 /** 135 * The driver remote identifier. 136 */ 137 @NamedParameter(doc = "The driver remote identifier.") 138 public static final class DriverRemoteIdentifier implements Name<String> { 139 } 140 141 142}