001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.data.output; 020 021import org.apache.reef.annotations.audience.ClientSide; 022import org.apache.reef.client.DriverConfiguration; 023import org.apache.reef.client.DriverLauncher; 024import org.apache.reef.client.LauncherStatus; 025import org.apache.reef.io.data.output.*; 026import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 027import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; 028import org.apache.reef.tang.Configuration; 029import org.apache.reef.tang.Injector; 030import org.apache.reef.tang.JavaConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.annotations.Name; 033import org.apache.reef.tang.annotations.NamedParameter; 034import org.apache.reef.tang.exceptions.BindException; 035import org.apache.reef.tang.exceptions.InjectionException; 036import org.apache.reef.tang.formats.CommandLine; 037import org.apache.reef.util.EnvironmentUtils; 038 039import java.io.File; 040import java.io.IOException; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Client for the output service demo app. 046 */ 047@ClientSide 048public final class OutputServiceREEF { 049 private static final Logger LOG = Logger.getLogger(OutputServiceREEF.class.getName()); 050 051 public static void main(final String[] args) 052 throws InjectionException, BindException, IOException { 053 054 final Tang tang = Tang.Factory.getTang(); 055 final JavaConfigurationBuilder cb = tang.newConfigurationBuilder(); 056 new CommandLine(cb) 057 .registerShortNameOfClass(Local.class) 058 .registerShortNameOfClass(TimeOut.class) 059 .registerShortNameOfClass(OutputDir.class) 060 .processCommandLine(args); 061 062 final Injector injector = tang.newInjector(cb.build()); 063 final boolean isLocal = injector.getNamedInstance(Local.class); 064 final String outputDir = injector.getNamedInstance(OutputDir.class); 065 final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000; 066 067 final Configuration driverConf = getDriverConf(); 068 final Configuration outputServiceConf = getOutputServiceConf(isLocal, outputDir); 069 final Configuration submittedConfiguration = Tang.Factory.getTang() 070 .newConfigurationBuilder(driverConf, outputServiceConf) 071 .build(); 072 final LauncherStatus state = DriverLauncher.getLauncher(getRuntimeConf(isLocal)) 073 .run(submittedConfiguration, jobTimeout); 074 075 LOG.log(Level.INFO, "REEF job completed: {0}", state); 076 } 077 078 /** 079 * @param isLocal true for local runtime, or false for YARN runtime. 080 * @return The runtime configuration 081 */ 082 private static Configuration getRuntimeConf(final boolean isLocal) { 083 final Configuration runtimeConf; 084 if (isLocal) { 085 LOG.log(Level.INFO, "Running the output service demo on the local runtime"); 086 runtimeConf = LocalRuntimeConfiguration.CONF 087 .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 3) 088 .build(); 089 } else { 090 LOG.log(Level.INFO, "Running the output service demo on YARN"); 091 runtimeConf = YarnClientConfiguration.CONF.build(); 092 } 093 return runtimeConf; 094 } 095 096 /** 097 * @return The Driver configuration. 098 */ 099 private static Configuration getDriverConf() { 100 final Configuration driverConf = DriverConfiguration.CONF 101 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(OutputServiceDriver.class)) 102 .set(DriverConfiguration.DRIVER_IDENTIFIER, "OutputServiceREEF") 103 .set(DriverConfiguration.ON_DRIVER_STARTED, OutputServiceDriver.StartHandler.class) 104 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, OutputServiceDriver.EvaluatorAllocatedHandler.class) 105 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, OutputServiceDriver.ActiveContextHandler.class) 106 .build(); 107 108 return driverConf; 109 } 110 111 /** 112 * @param isLocal true for local runtime, or false for YARN runtime. 113 * @param outputDir path of the output directory. 114 * @return The configuration to use OutputService 115 */ 116 private static Configuration getOutputServiceConf(final boolean isLocal, final String outputDir) { 117 final Configuration outputServiceConf; 118 if (isLocal) { 119 outputServiceConf = TaskOutputServiceBuilder.CONF 120 .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderLocal.class) 121 .set(TaskOutputServiceBuilder.OUTPUT_PATH, getAbsolutePath(outputDir)) 122 .build(); 123 } else { 124 outputServiceConf = TaskOutputServiceBuilder.CONF 125 .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderHDFS.class) 126 .set(TaskOutputServiceBuilder.OUTPUT_PATH, outputDir) 127 .build(); 128 } 129 return outputServiceConf; 130 } 131 132 /** 133 * transform the given relative path into the absolute path based on the current directory where a user runs the demo. 134 * @param relativePath relative path 135 * @return absolute path 136 */ 137 private static String getAbsolutePath(final String relativePath) { 138 final File outputFile = new File(relativePath); 139 return outputFile.getAbsolutePath(); 140 } 141 142 /** 143 * Command line parameter = true to run locally, or false to run on YARN. 144 */ 145 @NamedParameter(doc = "Whether or not to run on the local runtime", 146 short_name = "local", default_value = "true") 147 public static final class Local implements Name<Boolean> { 148 } 149 150 /** 151 * Command line parameter = number of minutes before timeout. 152 */ 153 @NamedParameter(doc = "Number of minutes before timeout", 154 short_name = "timeout", default_value = "2") 155 public static final class TimeOut implements Name<Integer> { 156 } 157 158 /** 159 * Command line parameter = path of the output directory. 160 */ 161 @NamedParameter(doc = "Path of the output directory", 162 short_name = "output") 163 public static final class OutputDir implements Name<String> { 164 } 165 166 /** 167 * Empty private constructor to prohibit instantiation of utility class. 168 */ 169 private OutputServiceREEF() { 170 } 171}