001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.group.bgd;
020
021import org.apache.hadoop.mapred.TextInputFormat;
022import org.apache.reef.client.DriverConfiguration;
023import org.apache.reef.client.DriverLauncher;
024import org.apache.reef.client.LauncherStatus;
025import org.apache.reef.client.REEF;
026import org.apache.reef.driver.evaluator.EvaluatorRequest;
027import org.apache.reef.examples.group.bgd.parameters.*;
028import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
029import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOut;
030import org.apache.reef.io.network.group.impl.driver.GroupCommService;
031import org.apache.reef.tang.Configuration;
032import org.apache.reef.tang.Configurations;
033import org.apache.reef.tang.JavaConfigurationBuilder;
034import org.apache.reef.tang.Tang;
035import org.apache.reef.tang.annotations.Parameter;
036import org.apache.reef.tang.formats.CommandLine;
037import org.apache.reef.util.EnvironmentUtils;
038
039import javax.inject.Inject;
040
041/**
042 * A client to submit BGD Jobs.
043 */
044public class BGDClient {
045  private final String input;
046  private final int numSplits;
047  private final int memory;
048
049  private final BGDControlParameters bgdControlParameters;
050  private final int fanOut;
051
052  @Inject
053  public BGDClient(@Parameter(InputDir.class) final String input,
054                   @Parameter(NumSplits.class) final int numSplits,
055                   @Parameter(EvaluatorMemory.class) final int memory,
056                   @Parameter(TreeTopologyFanOut.class) final int fanOut,
057                   final BGDControlParameters bgdControlParameters) {
058    this.input = input;
059    this.fanOut = fanOut;
060    this.bgdControlParameters = bgdControlParameters;
061    this.numSplits = numSplits;
062    this.memory = memory;
063  }
064
065  /**
066   * Runs BGD on the given runtime.
067   *
068   * @param runtimeConfiguration the runtime to run on.
069   * @param jobName              the name of the job on the runtime.
070   */
071  public void submit(final Configuration runtimeConfiguration, final String jobName) throws Exception {
072    final Configuration driverConfiguration = getDriverConfiguration(jobName);
073    Tang.Factory.getTang().newInjector(runtimeConfiguration).getInstance(REEF.class).submit(driverConfiguration);
074  }
075
076  /**
077   * Runs BGD on the given runtime - with timeout.
078   *
079   * @param runtimeConfiguration the runtime to run on.
080   * @param jobName              the name of the job on the runtime.
081   * @param timeout              the time after which the job will be killed if not completed, in ms
082   * @return job completion status
083   */
084  public LauncherStatus run(final Configuration runtimeConfiguration,
085                            final String jobName, final int timeout) throws Exception {
086    final Configuration driverConfiguration = getDriverConfiguration(jobName);
087    return DriverLauncher.getLauncher(runtimeConfiguration).run(driverConfiguration, timeout);
088  }
089
090  private Configuration getDriverConfiguration(final String jobName) {
091    return Configurations.merge(
092        getDataLoadConfiguration(jobName),
093        GroupCommService.getConfiguration(fanOut),
094        this.bgdControlParameters.getConfiguration());
095  }
096
097  private Configuration getDataLoadConfiguration(final String jobName) {
098    final EvaluatorRequest computeRequest = EvaluatorRequest.newBuilder()
099        .setNumber(1)
100        .setMemory(memory)
101        .build();
102    final Configuration dataLoadConfiguration = new DataLoadingRequestBuilder()
103        .setInputFormatClass(TextInputFormat.class)
104        .setInputPath(input)
105        .setNumberOfDesiredSplits(numSplits)
106        .addComputeRequest(computeRequest)
107        .renewFailedEvaluators(false)
108        .setDriverConfigurationModule(DriverConfiguration.CONF
109            .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars())
110            .set(DriverConfiguration.DRIVER_MEMORY, Integer.toString(memory))
111            .set(DriverConfiguration.ON_CONTEXT_ACTIVE, BGDDriver.ContextActiveHandler.class)
112            .set(DriverConfiguration.ON_TASK_RUNNING, BGDDriver.TaskRunningHandler.class)
113            .set(DriverConfiguration.ON_TASK_FAILED, BGDDriver.TaskFailedHandler.class)
114            .set(DriverConfiguration.ON_TASK_COMPLETED, BGDDriver.TaskCompletedHandler.class)
115            .set(DriverConfiguration.DRIVER_IDENTIFIER, jobName))
116        .build();
117    return dataLoadConfiguration;
118  }
119
120  public static final BGDClient fromCommandLine(final String[] args) throws Exception {
121    final JavaConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
122    final CommandLine commandLine = new CommandLine(configurationBuilder)
123        .registerShortNameOfClass(InputDir.class)
124        .registerShortNameOfClass(Timeout.class)
125        .registerShortNameOfClass(EvaluatorMemory.class)
126        .registerShortNameOfClass(NumSplits.class)
127        .registerShortNameOfClass(TreeTopologyFanOut.class);
128    BGDControlParameters.registerShortNames(commandLine);
129    commandLine.processCommandLine(args);
130    return Tang.Factory.getTang().newInjector(configurationBuilder.build()).getInstance(BGDClient.class);
131  }
132}