This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.client;
020
021import org.apache.reef.annotations.Provided;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.client.REEF;
025import org.apache.reef.client.SubmittedJob;
026import org.apache.reef.client.parameters.DriverConfigurationProviders;
027import org.apache.reef.client.parameters.JobSubmittedHandler;
028import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
029import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
030import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
031import org.apache.reef.tang.*;
032import org.apache.reef.tang.annotations.Name;
033import org.apache.reef.tang.annotations.NamedParameter;
034import org.apache.reef.tang.annotations.Parameter;
035import org.apache.reef.util.REEFVersion;
036import org.apache.reef.util.logging.LoggingScope;
037import org.apache.reef.util.logging.LoggingScopeFactory;
038import org.apache.reef.wake.EventHandler;
039
040import javax.inject.Inject;
041import java.util.Set;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Default REEF implementation.
047 */
048@ClientSide
049@Provided
050@Private
051public final class REEFImplementation implements REEF {
052
053  private static final Logger LOG = Logger.getLogger(REEFImplementation.class.getName());
054
055  private final JobSubmissionHandler jobSubmissionHandler;
056  private final JobSubmissionHelper jobSubmissionHelper;
057  private final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler;
058  private final RunningJobs runningJobs;
059  private final ClientWireUp clientWireUp;
060  private final LoggingScopeFactory loggingScopeFactory;
061  private final Set<ConfigurationProvider> configurationProviders;
062
063  /**
064   * @param jobSubmissionHandler
065   * @param jobSubmissionHelper
066   * @param jobStatusMessageHandler is passed only to make sure it is instantiated
067   * @param runningJobs
068   * @param clientWireUp
069   * @param reefVersion provides the current version of REEF.
070   * @param configurationProviders
071   */
072  @Inject
073  private REEFImplementation(
074        final JobSubmissionHandler jobSubmissionHandler,
075        final JobSubmissionHelper jobSubmissionHelper,
076        final JobStatusMessageHandler jobStatusMessageHandler,
077        final RunningJobs runningJobs,
078        final ClientWireUp clientWireUp,
079        final LoggingScopeFactory loggingScopeFactory,
080        final REEFVersion reefVersion,
081        @Parameter(JobSubmittedHandler.class) final InjectionFuture<EventHandler<SubmittedJob>> jobSubmittedHandler,
082        @Parameter(DriverConfigurationProviders.class) final Set<ConfigurationProvider> configurationProviders) {
083
084    this.jobSubmissionHandler = jobSubmissionHandler;
085    this.jobSubmittedHandler = jobSubmittedHandler;
086    this.jobSubmissionHelper = jobSubmissionHelper;
087    this.runningJobs = runningJobs;
088    this.clientWireUp = clientWireUp;
089    this.configurationProviders = configurationProviders;
090    this.loggingScopeFactory = loggingScopeFactory;
091
092    clientWireUp.performWireUp();
093    reefVersion.logVersion();
094  }
095
096  @Override
097  public void close() {
098
099    LOG.log(Level.FINE, "Close REEF: shutdown jobs");
100    this.runningJobs.closeAllJobs();
101
102    LOG.log(Level.FINE, "Close REEF: shutdown client");
103    this.clientWireUp.close();
104
105    LOG.log(Level.FINE, "Close REEF: shutdown job submitter");
106    try {
107      this.jobSubmissionHandler.close();
108    } catch (final Exception ex) {
109      LOG.log(Level.WARNING, "Could not shutdown job submitter", ex);
110    }
111
112    LOG.log(Level.FINE, "Close REEF: done");
113  }
114
115  @Override
116  public void submit(final Configuration driverConf) {
117    try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) {
118      final Configuration driverConfiguration = createDriverConfiguration(driverConf);
119      final JobSubmissionEvent submissionMessage;
120      try {
121        if (this.clientWireUp.isClientPresent()) {
122          submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConfiguration)
123              .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier())
124              .build();
125        } else {
126          submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConfiguration)
127              .setRemoteId(ErrorHandlerRID.NONE)
128              .build();
129        }
130      } catch (final Exception e) {
131        throw new RuntimeException("Exception while processing driver configuration.", e);
132      }
133
134      this.jobSubmissionHandler.onNext(submissionMessage);
135
136      this.jobSubmittedHandler.get().onNext(
137          new SubmittedJobImpl(this.jobSubmissionHandler.getApplicationId()));
138    }
139  }
140
141  /**
142   * Assembles the final Driver Configuration by merging in all the Configurations provided by ConfigurationProviders.
143   *
144   * @param driverConfiguration
145   * @return
146   */
147  private Configuration createDriverConfiguration(final Configuration driverConfiguration) {
148    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang()
149        .newConfigurationBuilder(driverConfiguration);
150    for (final ConfigurationProvider configurationProvider : this.configurationProviders) {
151      configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
152    }
153    return configurationBuilder.build();
154  }
155
156  /**
157   * The driver remote identifier.
158   */
159  @NamedParameter(doc = "The driver remote identifier.")
160  public static final class DriverRemoteIdentifier implements Name<String> {
161  }
162}