001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.hello; 020 021import org.apache.reef.client.DriverConfiguration; 022import org.apache.reef.client.DriverLauncher; 023import org.apache.reef.client.LauncherStatus; 024import org.apache.reef.client.parameters.DriverConfigurationProviders; 025import org.apache.reef.io.TcpPortConfigurationProvider; 026import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 027import org.apache.reef.tang.Configuration; 028import org.apache.reef.tang.Tang; 029import org.apache.reef.tang.exceptions.BindException; 030import org.apache.reef.tang.exceptions.InjectionException; 031import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; 032import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; 033import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; 034 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038/** 039 * The Client for running HelloREEF with tcp port configuration on YARN. 040 */ 041public final class HelloReefYarnTcp { 042 043 private static final Logger LOG = Logger.getLogger(HelloReefYarnTcp.class.getName()); 044 045 /** 046 * Number of milliseconds to wait for the job to complete. 047 */ 048 private static final int JOB_TIMEOUT = 150000; // 30 sec. 049 050 private static final int DEFAULT_TCP_BEGIN_PORT = 8900; 051 private static final int DEFAULT_TCP_RANGE_COUNT = 10; 052 private static final int DEFAULT_TCP_RANGE_TRY_COUNT = 1111; 053 054 /** 055 * @param tcpBeginPort the first tcp port number to try 056 * @param tcpRangeCount the number of tcp ports in the range 057 * @param tcpTryCount maximum number of tries for port numbers 058 * @return the configuration of the runtime 059 */ 060 private static Configuration getRuntimeConfiguration( 061 final int tcpBeginPort, 062 final int tcpRangeCount, 063 final int tcpTryCount) { 064 065 return Tang.Factory.getTang().newConfigurationBuilder(YarnClientConfiguration.CONF.build()) 066 .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) 067 .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) 068 .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) 069 .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) 070 .build(); 071 } 072 073 /** 074 * @return the configuration of the HelloREEF driver. 075 */ 076 private static Configuration getDriverConfiguration() { 077 return DriverConfiguration.CONF 078 .set(DriverConfiguration.GLOBAL_LIBRARIES, 079 HelloReefYarnTcp.class.getProtectionDomain().getCodeSource().getLocation().getFile()) 080 .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF") 081 .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class) 082 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class) 083 .build(); 084 } 085 086 /** 087 * Start Hello REEF job. 088 * 089 * @param args command line parameters. 090 * @throws BindException configuration error. 091 * @throws InjectionException configuration error. 092 */ 093 public static void main(final String[] args) throws BindException, InjectionException { 094 final int tcpBeginPort = args.length > 0 ? Integer.parseInt(args[0]) : DEFAULT_TCP_BEGIN_PORT; 095 final int tcpRangeCount = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_TCP_RANGE_COUNT; 096 final int tcpTryCount = args.length > 2 ? Integer.parseInt(args[2]) : DEFAULT_TCP_RANGE_TRY_COUNT; 097 098 final Configuration runtimeConf = getRuntimeConfiguration(tcpBeginPort, tcpRangeCount, tcpTryCount); 099 final Configuration driverConf = getDriverConfiguration(); 100 101 final LauncherStatus status = DriverLauncher 102 .getLauncher(runtimeConf) 103 .run(driverConf, JOB_TIMEOUT); 104 LOG.log(Level.INFO, "REEF job completed: {0}", status); 105 } 106 107 /** 108 * Empty private constructor to prohibit instantiation of utility class. 109 */ 110 private HelloReefYarnTcp() { 111 } 112}