001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.client; 020 021import org.apache.reef.annotations.Provided; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.client.REEF; 025import org.apache.reef.client.SubmittedJob; 026import org.apache.reef.client.parameters.DriverConfigurationProviders; 027import org.apache.reef.client.parameters.JobSubmittedHandler; 028import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; 029import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; 030import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; 031import org.apache.reef.tang.*; 032import org.apache.reef.tang.annotations.Name; 033import org.apache.reef.tang.annotations.NamedParameter; 034import org.apache.reef.tang.annotations.Parameter; 035import org.apache.reef.util.REEFVersion; 036import org.apache.reef.util.logging.LoggingScope; 037import org.apache.reef.util.logging.LoggingScopeFactory; 038import org.apache.reef.wake.EventHandler; 039 040import javax.inject.Inject; 041import java.util.Set; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Default REEF implementation. 047 */ 048@ClientSide 049@Provided 050@Private 051public final class REEFImplementation implements REEF { 052 053 private static final Logger LOG = Logger.getLogger(REEFImplementation.class.getName()); 054 055 private final JobSubmissionHandler jobSubmissionHandler; 056 private final JobSubmissionHelper jobSubmissionHelper; 057 private final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler; 058 private final RunningJobs runningJobs; 059 private final ClientWireUp clientWireUp; 060 private final LoggingScopeFactory loggingScopeFactory; 061 private final Set<ConfigurationProvider> configurationProviders; 062 063 /** 064 * @param jobSubmissionHandler 065 * @param jobSubmissionHelper 066 * @param jobStatusMessageHandler is passed only to make sure it is instantiated 067 * @param runningJobs 068 * @param clientWireUp 069 * @param reefVersion provides the current version of REEF. 070 * @param configurationProviders 071 */ 072 @Inject 073 private REEFImplementation( 074 final JobSubmissionHandler jobSubmissionHandler, 075 final JobSubmissionHelper jobSubmissionHelper, 076 final JobStatusMessageHandler jobStatusMessageHandler, 077 final RunningJobs runningJobs, 078 final ClientWireUp clientWireUp, 079 final LoggingScopeFactory loggingScopeFactory, 080 final REEFVersion reefVersion, 081 @Parameter(JobSubmittedHandler.class) final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler, 082 @Parameter(DriverConfigurationProviders.class) final Set<ConfigurationProvider> configurationProviders) { 083 084 this.jobSubmissionHandler = jobSubmissionHandler; 085 this.jobSubmittedHandler = jobSubmittedHandler; 086 this.jobSubmissionHelper = jobSubmissionHelper; 087 this.runningJobs = runningJobs; 088 this.clientWireUp = clientWireUp; 089 this.configurationProviders = configurationProviders; 090 this.loggingScopeFactory = loggingScopeFactory; 091 092 clientWireUp.performWireUp(); 093 reefVersion.logVersion(); 094 } 095 096 @Override 097 public void close() { 098 099 LOG.log(Level.FINE, "Close REEF: shutdown jobs"); 100 this.runningJobs.closeAllJobs(); 101 102 LOG.log(Level.FINE, "Close REEF: shutdown client"); 103 this.clientWireUp.close(); 104 105 LOG.log(Level.FINE, "Close REEF: shutdown job submitter"); 106 try { 107 this.jobSubmissionHandler.close(); 108 } catch (final Exception ex) { 109 LOG.log(Level.WARNING, "Could not shutdown job submitter", ex); 110 } 111 112 LOG.log(Level.FINE, "Close REEF: done"); 113 } 114 115 @Override 116 public void submit(final Configuration driverConf) { 117 try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) { 118 final Configuration driverConfiguration = createDriverConfiguration(driverConf); 119 final JobSubmissionEvent submissionMessage; 120 try { 121 if (this.clientWireUp.isClientPresent()) { 122 submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConfiguration) 123 .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier()) 124 .build(); 125 } else { 126 submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConfiguration) 127 .setRemoteId(ErrorHandlerRID.NONE) 128 .build(); 129 } 130 } catch (final Exception e) { 131 throw new RuntimeException("Exception while processing driver configuration.", e); 132 } 133 134 this.jobSubmissionHandler.onNext(submissionMessage); 135 136 this.jobSubmittedHandler.get().onNext( 137 new SubmittedJobImpl(this.jobSubmissionHandler.getApplicationId())); 138 } 139 } 140 141 /** 142 * Assembles the final Driver Configuration by merging in all the Configurations provided by ConfigurationProviders. 143 * 144 * @param driverConfiguration 145 * @return 146 */ 147 private Configuration createDriverConfiguration(final Configuration driverConfiguration) { 148 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang() 149 .newConfigurationBuilder(driverConfiguration); 150 for (final ConfigurationProvider configurationProvider : this.configurationProviders) { 151 configurationBuilder.addConfiguration(configurationProvider.getConfiguration()); 152 } 153 return configurationBuilder.build(); 154 } 155 156 /** 157 * The driver remote identifier. 158 */ 159 @NamedParameter(doc = "The driver remote identifier.") 160 public static final class DriverRemoteIdentifier implements Name<String> { 161 } 162}