001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.scheduler; 020 021import org.apache.commons.cli.ParseException; 022import org.apache.reef.client.DriverConfiguration; 023import org.apache.reef.client.REEF; 024import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; 025import org.apache.reef.tang.Configuration; 026import org.apache.reef.tang.Configurations; 027import org.apache.reef.tang.Tang; 028import org.apache.reef.tang.annotations.Name; 029import org.apache.reef.tang.annotations.NamedParameter; 030import org.apache.reef.tang.exceptions.InjectionException; 031import org.apache.reef.tang.formats.CommandLine; 032import org.apache.reef.util.EnvironmentUtils; 033import org.apache.reef.webserver.HttpHandlerConfiguration; 034 035import java.io.IOException; 036 037/** 038 * REEF TaskScheduler. 039 */ 040public final class SchedulerREEF { 041 042 /** 043 * Command line parameter = true to reuse evaluators, 044 * or false to allocate/close for each iteration 045 */ 046 @NamedParameter(doc = "Whether or not to reuse evaluators", 047 short_name = "retain", default_value = "true") 048 public static final class Retain implements Name<Boolean> { 049 } 050 051 /** 052 * @return The http configuration to use reef-webserver 053 */ 054 private final static Configuration getHttpConf() { 055 final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF 056 .set(HttpHandlerConfiguration.HTTP_HANDLERS, SchedulerHttpHandler.class) 057 .build(); 058 return httpHandlerConf; 059 } 060 061 /** 062 * @return The Driver configuration. 063 */ 064 private final static Configuration getDriverConf() { 065 final Configuration driverConf = DriverConfiguration.CONF 066 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SchedulerDriver.class)) 067 .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler") 068 .set(DriverConfiguration.ON_DRIVER_STARTED, SchedulerDriver.StartHandler.class) 069 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SchedulerDriver.EvaluatorAllocatedHandler.class) 070 .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SchedulerDriver.ActiveContextHandler.class) 071 .set(DriverConfiguration.ON_TASK_COMPLETED, SchedulerDriver.CompletedTaskHandler.class) 072 .build(); 073 074 return driverConf; 075 } 076 077 /** 078 * Run the Task scheduler. If '-retain true' option is passed via command line, 079 * the scheduler reuses evaluators to submit new Tasks. 080 * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc) 081 * @param args Command line arguments. 082 * @throws InjectionException 083 * @throws java.io.IOException 084 */ 085 public static void runTaskScheduler(final Configuration runtimeConf, final String[] args) 086 throws InjectionException, IOException, ParseException { 087 final Tang tang = Tang.Factory.getTang(); 088 089 final Configuration commandLineConf = CommandLine.parseToConfiguration(args, Retain.class); 090 091 // Merge the configurations to run Driver 092 final Configuration driverConf = Configurations.merge(getDriverConf(), getHttpConf(), commandLineConf); 093 094 final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class); 095 reef.submit(driverConf); 096 } 097 098 /** 099 * Main program 100 * @param args 101 * @throws InjectionException 102 */ 103 public final static void main(String[] args) throws InjectionException, IOException, ParseException { 104 final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF 105 .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 3) 106 .build(); 107 runTaskScheduler(runtimeConfiguration, args); 108 } 109}