001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.client; 020 021import org.apache.reef.annotations.Provided; 022import org.apache.reef.annotations.audience.ClientSide; 023import org.apache.reef.annotations.audience.Public; 024import org.apache.reef.driver.ProgressProvider; 025import org.apache.reef.driver.context.ActiveContext; 026import org.apache.reef.driver.context.ClosedContext; 027import org.apache.reef.driver.context.ContextMessage; 028import org.apache.reef.driver.context.FailedContext; 029import org.apache.reef.driver.evaluator.AllocatedEvaluator; 030import org.apache.reef.driver.evaluator.CompletedEvaluator; 031import org.apache.reef.driver.evaluator.FailedEvaluator; 032import org.apache.reef.driver.parameters.*; 033import org.apache.reef.driver.task.*; 034import org.apache.reef.runtime.common.driver.DriverRuntimeConfiguration; 035import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessThreadPoolSize; 036import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessWaitInMilliseconds; 037import org.apache.reef.tang.formats.*; 038import org.apache.reef.wake.EventHandler; 039import org.apache.reef.wake.time.Clock; 040import org.apache.reef.wake.time.event.StartTime; 041import org.apache.reef.wake.time.event.StopTime; 042 043/** 044 * A ConfigurationModule for Drivers. 045 */ 046@ClientSide 047@Public 048@Provided 049public final class DriverConfiguration extends ConfigurationModuleBuilder { 050 051 /** 052 * Identifies the driver and therefore the JOB. Expect to see this e.g. on YARN's dashboard. 053 */ 054 public static final OptionalParameter<String> DRIVER_IDENTIFIER = new OptionalParameter<>(); 055 056 /** 057 * The amount of memory to be allocated for the Driver. This is the size of the AM container in YARN. 058 */ 059 public static final OptionalParameter<Integer> DRIVER_MEMORY = new OptionalParameter<>(); 060 061 /** 062 * Files to be made available on the Driver and all Evaluators. 063 */ 064 public static final OptionalParameter<String> GLOBAL_FILES = new OptionalParameter<>(); 065 066 /** 067 * Libraries to be made available on the Driver and all Evaluators. 068 */ 069 public static final OptionalParameter<String> GLOBAL_LIBRARIES = new OptionalParameter<>(); 070 071 /** 072 * Files to be made available on the Driver only. 073 */ 074 public static final OptionalParameter<String> LOCAL_FILES = new OptionalParameter<>(); 075 076 /** 077 * Libraries to be made available on the Driver only. 078 */ 079 public static final OptionalParameter<String> LOCAL_LIBRARIES = new OptionalParameter<>(); 080 081 /** 082 * Job submission directory to be used by driver. This is the folder on the DFS used to stage the files 083 * for the Driver and subsequently for the Evaluators. It will be created if it doesn't exist yet. 084 * If this is set by the user, user must make sure its uniqueness across different jobs. 085 */ 086 public static final OptionalParameter<String> DRIVER_JOB_SUBMISSION_DIRECTORY = new OptionalParameter<>(); 087 088 /** 089 * The event handler invoked right after the driver boots up. 090 */ 091 public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>(); 092 093 /** 094 * The event handler invoked right before the driver shuts down. Defaults to ignore. 095 */ 096 public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>(); 097 098 // ***** EVALUATOR HANDLER BINDINGS: 099 100 /** 101 * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound. 102 */ 103 public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>(); 104 105 /** 106 * Event handler for completed evaluators. Defaults to logging if not bound. 107 */ 108 public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>(); 109 110 /** 111 * Event handler for failed evaluators. Defaults to job failure if not bound. 112 */ 113 public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>(); 114 115 // ***** TASK HANDLER BINDINGS: 116 117 /** 118 * Event handler for task messages. Defaults to logging if not bound. 119 */ 120 public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>(); 121 122 /** 123 * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound. 124 */ 125 public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>(); 126 127 /** 128 * Event handler for failed tasks. Defaults to job failure if not bound. 129 */ 130 public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>(); 131 132 /** 133 * Event handler for running tasks. Defaults to logging if not bound. 134 */ 135 public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>(); 136 137 /** 138 * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support 139 * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then. 140 */ 141 public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>(); 142 143 // ***** CLIENT HANDLER BINDINGS: 144 145 /** 146 * Event handler for client messages. Defaults to logging if not bound. 147 */ 148 public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>(); 149 150 /** 151 * Event handler for close messages sent by the client. Defaults to job failure if not bound. 152 */ 153 public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>(); 154 155 /** 156 * Event handler for close messages sent by the client. Defaults to job failure if not bound. 157 */ 158 public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>(); 159 160 // ***** CONTEXT HANDLER BINDINGS: 161 162 /** 163 * Event handler for active context. Defaults to closing the context if not bound. 164 */ 165 public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>(); 166 167 /** 168 * Event handler for closed context. Defaults to logging if not bound. 169 */ 170 public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>(); 171 172 /** 173 * Event handler for closed context. Defaults to job failure if not bound. 174 */ 175 public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>(); 176 177 /** 178 * Event handler for context messages. Defaults to logging if not bound. 179 */ 180 public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>(); 181 182 // ***** MISC 183 184 /** 185 * Progress provider. See {@link ProgressProvider}. 186 */ 187 public static final OptionalImpl<ProgressProvider> PROGRESS_PROVIDER = new OptionalImpl<>(); 188 189 /** 190 * Number of threads allocated per evaluator to dispatch events from this Evaluator. 191 */ 192 public static final OptionalParameter<Integer> EVALUATOR_DISPATCHER_THREADS = new OptionalParameter<>(); 193 194 /** 195 * The number of submissions that the resource manager will attempt to submit the application. Defaults to 1. 196 */ 197 public static final OptionalParameter<Integer> MAX_APPLICATION_SUBMISSIONS = new OptionalParameter<>(); 198 199 /** 200 * The number of Threads in a Driver to verify the completion of Evaluators. 201 * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}. 202 */ 203 public static final OptionalParameter<Integer> EVALUATOR_IDLENESS_THREAD_POOL_SIZE = new OptionalParameter<>(); 204 205 /** 206 * The number of Threads in a Driver to verify the completion of Evaluators. 207 * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}. 208 */ 209 public static final OptionalParameter<Long> EVALUATOR_IDLENESS_WAIT_IN_MS = new OptionalParameter<>(); 210 211 /** 212 * ConfigurationModule to fill out to get a legal Driver Configuration. 213 */ 214 public static final ConfigurationModule CONF = new DriverConfiguration().merge(DriverRuntimeConfiguration.CONF) 215 .bindNamedParameter(DriverIdentifier.class, DRIVER_IDENTIFIER) 216 .bindNamedParameter(DriverMemory.class, DRIVER_MEMORY) 217 .bindNamedParameter(DriverJobSubmissionDirectory.class, DRIVER_JOB_SUBMISSION_DIRECTORY) 218 .bindNamedParameter(MaxApplicationSubmissions.class, MAX_APPLICATION_SUBMISSIONS) 219 .bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES) 220 .bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES) 221 .bindSetEntry(DriverLocalFiles.class, LOCAL_FILES) 222 .bindSetEntry(DriverLocalLibraries.class, LOCAL_LIBRARIES) 223 224 // Driver start/stop handlers 225 .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED) 226 .bindSetEntry(Clock.StartHandler.class, org.apache.reef.runtime.common.driver.DriverStartHandler.class) 227 .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP) 228 229 // Evaluator handlers 230 .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED) 231 .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED) 232 .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED) 233 234 // Task handlers 235 .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING) 236 .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED) 237 .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE) 238 .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED) 239 .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED) 240 241 // Context handlers 242 .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE) 243 .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED) 244 .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE) 245 .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED) 246 247 // Client handlers 248 .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE) 249 .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED) 250 .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE) 251 252 // Various parameters 253 .bindNamedParameter(EvaluatorDispatcherThreads.class, EVALUATOR_DISPATCHER_THREADS) 254 .bindNamedParameter(EvaluatorIdlenessThreadPoolSize.class, EVALUATOR_IDLENESS_THREAD_POOL_SIZE) 255 .bindNamedParameter(EvaluatorIdlenessWaitInMilliseconds.class, EVALUATOR_IDLENESS_WAIT_IN_MS) 256 .bindImplementation(ProgressProvider.class, PROGRESS_PROVIDER) 257 .build(); 258}