001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.group.bgd; 020 021import org.apache.hadoop.mapred.TextInputFormat; 022import org.apache.reef.client.DriverConfiguration; 023import org.apache.reef.client.DriverLauncher; 024import org.apache.reef.client.LauncherStatus; 025import org.apache.reef.client.REEF; 026import org.apache.reef.driver.evaluator.EvaluatorRequest; 027import org.apache.reef.examples.group.bgd.parameters.*; 028import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder; 029import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOut; 030import org.apache.reef.io.network.group.impl.driver.GroupCommService; 031import org.apache.reef.tang.Configuration; 032import org.apache.reef.tang.Configurations; 033import org.apache.reef.tang.JavaConfigurationBuilder; 034import org.apache.reef.tang.Tang; 035import org.apache.reef.tang.annotations.Parameter; 036import org.apache.reef.tang.formats.CommandLine; 037import org.apache.reef.util.EnvironmentUtils; 038 039import javax.inject.Inject; 040 041/** 042 * A client to submit BGD Jobs. 043 */ 044public class BGDClient { 045 private final String input; 046 private final int numSplits; 047 private final int memory; 048 049 private final BGDControlParameters bgdControlParameters; 050 private final int fanOut; 051 052 @Inject 053 public BGDClient(@Parameter(InputDir.class) final String input, 054 @Parameter(NumSplits.class) final int numSplits, 055 @Parameter(EvaluatorMemory.class) final int memory, 056 @Parameter(TreeTopologyFanOut.class) final int fanOut, 057 final BGDControlParameters bgdControlParameters) { 058 this.input = input; 059 this.fanOut = fanOut; 060 this.bgdControlParameters = bgdControlParameters; 061 this.numSplits = numSplits; 062 this.memory = memory; 063 } 064 065 /** 066 * Runs BGD on the given runtime. 067 * 068 * @param runtimeConfiguration the runtime to run on. 069 * @param jobName the name of the job on the runtime. 070 */ 071 public void submit(final Configuration runtimeConfiguration, final String jobName) throws Exception { 072 final Configuration driverConfiguration = getDriverConfiguration(jobName); 073 Tang.Factory.getTang().newInjector(runtimeConfiguration).getInstance(REEF.class).submit(driverConfiguration); 074 } 075 076 /** 077 * Runs BGD on the given runtime - with timeout. 078 * 079 * @param runtimeConfiguration the runtime to run on. 080 * @param jobName the name of the job on the runtime. 081 * @param timeout the time after which the job will be killed if not completed, in ms 082 * @return job completion status 083 */ 084 public LauncherStatus run(final Configuration runtimeConfiguration, 085 final String jobName, final int timeout) throws Exception { 086 final Configuration driverConfiguration = getDriverConfiguration(jobName); 087 return DriverLauncher.getLauncher(runtimeConfiguration).run(driverConfiguration, timeout); 088 } 089 090 private Configuration getDriverConfiguration(final String jobName) { 091 return Configurations.merge( 092 getDataLoadConfiguration(jobName), 093 GroupCommService.getConfiguration(fanOut), 094 this.bgdControlParameters.getConfiguration()); 095 } 096 097 private Configuration getDataLoadConfiguration(final String jobName) { 098 final EvaluatorRequest computeRequest = EvaluatorRequest.newBuilder() 099 .setNumber(1) 100 .setMemory(memory) 101 .build(); 102 final Configuration dataLoadConfiguration = new DataLoadingRequestBuilder() 103 .setInputFormatClass(TextInputFormat.class) 104 .setInputPath(input) 105 .setNumberOfDesiredSplits(numSplits) 106 .addComputeRequest(computeRequest) 107 .renewFailedEvaluators(false) 108 .setDriverConfigurationModule(DriverConfiguration.CONF 109 .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars()) 110 .set(DriverConfiguration.DRIVER_MEMORY, Integer.toString(memory)) 111 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, BGDDriver.ContextActiveHandler.class) 112 .set(DriverConfiguration.ON_TASK_RUNNING, BGDDriver.TaskRunningHandler.class) 113 .set(DriverConfiguration.ON_TASK_FAILED, BGDDriver.TaskFailedHandler.class) 114 .set(DriverConfiguration.ON_TASK_COMPLETED, BGDDriver.TaskCompletedHandler.class) 115 .set(DriverConfiguration.DRIVER_IDENTIFIER, jobName)) 116 .build(); 117 return dataLoadConfiguration; 118 } 119 120 public static final BGDClient fromCommandLine(final String[] args) throws Exception { 121 final JavaConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 122 final CommandLine commandLine = new CommandLine(configurationBuilder) 123 .registerShortNameOfClass(InputDir.class) 124 .registerShortNameOfClass(Timeout.class) 125 .registerShortNameOfClass(EvaluatorMemory.class) 126 .registerShortNameOfClass(NumSplits.class) 127 .registerShortNameOfClass(TreeTopologyFanOut.class); 128 BGDControlParameters.registerShortNames(commandLine); 129 commandLine.processCommandLine(args); 130 return Tang.Factory.getTang().newInjector(configurationBuilder.build()).getInstance(BGDClient.class); 131 } 132}