001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.client;
020
021import org.apache.reef.annotations.Provided;
022import org.apache.reef.annotations.audience.ClientSide;
023import org.apache.reef.annotations.audience.Public;
024import org.apache.reef.driver.ProgressProvider;
025import org.apache.reef.driver.context.ActiveContext;
026import org.apache.reef.driver.context.ClosedContext;
027import org.apache.reef.driver.context.ContextMessage;
028import org.apache.reef.driver.context.FailedContext;
029import org.apache.reef.driver.evaluator.AllocatedEvaluator;
030import org.apache.reef.driver.evaluator.CompletedEvaluator;
031import org.apache.reef.driver.evaluator.FailedEvaluator;
032import org.apache.reef.driver.parameters.*;
033import org.apache.reef.driver.task.*;
034import org.apache.reef.runtime.common.driver.DriverRuntimeConfiguration;
035import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessThreadPoolSize;
036import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessWaitInMilliseconds;
037import org.apache.reef.tang.formats.*;
038import org.apache.reef.wake.EventHandler;
039import org.apache.reef.wake.time.Clock;
040import org.apache.reef.wake.time.event.StartTime;
041import org.apache.reef.wake.time.event.StopTime;
042
043/**
044 * A ConfigurationModule for Drivers.
045 */
046@ClientSide
047@Public
048@Provided
049public final class DriverConfiguration extends ConfigurationModuleBuilder {
050
051  /**
052   * Identifies the driver and therefore the JOB. Expect to see this e.g. on YARN's dashboard.
053   */
054  public static final OptionalParameter<String> DRIVER_IDENTIFIER = new OptionalParameter<>();
055
056  /**
057   * The amount of memory to be allocated for the Driver. This is the size of the AM container in YARN.
058   */
059  public static final OptionalParameter<Integer> DRIVER_MEMORY = new OptionalParameter<>();
060
061  /**
062   * Files to be made available on the Driver and all Evaluators.
063   */
064  public static final OptionalParameter<String> GLOBAL_FILES = new OptionalParameter<>();
065
066  /**
067   * Libraries to be made available on the Driver and all Evaluators.
068   */
069  public static final OptionalParameter<String> GLOBAL_LIBRARIES = new OptionalParameter<>();
070
071  /**
072   * Files to be made available on the Driver only.
073   */
074  public static final OptionalParameter<String> LOCAL_FILES = new OptionalParameter<>();
075
076  /**
077   * Libraries to be made available on the Driver only.
078   */
079  public static final OptionalParameter<String> LOCAL_LIBRARIES = new OptionalParameter<>();
080
081  /**
082   * Job submission directory to be used by driver. This is the folder on the DFS used to stage the files
083   * for the Driver and subsequently for the Evaluators. It will be created if it doesn't exist yet.
084   * If this is set by the user, user must make sure its uniqueness across different jobs.
085   */
086  public static final OptionalParameter<String> DRIVER_JOB_SUBMISSION_DIRECTORY = new OptionalParameter<>();
087
088  /**
089   * The event handler invoked right after the driver boots up.
090   */
091  public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
092
093  /**
094   * The event handler invoked right before the driver shuts down. Defaults to ignore.
095   */
096  public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
097
098  // ***** EVALUATOR HANDLER BINDINGS:
099
100  /**
101   * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
102   */
103  public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
104
105  /**
106   * Event handler for completed evaluators. Defaults to logging if not bound.
107   */
108  public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
109
110  /**
111   * Event handler for failed evaluators. Defaults to job failure if not bound.
112   */
113  public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
114
115  // ***** TASK HANDLER BINDINGS:
116
117  /**
118   * Event handler for task messages. Defaults to logging if not bound.
119   */
120  public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
121
122  /**
123   * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
124   */
125  public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
126
127  /**
128   * Event handler for failed tasks. Defaults to job failure if not bound.
129   */
130  public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
131
132  /**
133   * Event handler for running tasks. Defaults to logging if not bound.
134   */
135  public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
136
137  /**
138   * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
139   * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
140   */
141  public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
142
143  // ***** CLIENT HANDLER BINDINGS:
144
145  /**
146   * Event handler for client messages. Defaults to logging if not bound.
147   */
148  public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>();
149
150  /**
151   * Event handler for close messages sent by the client. Defaults to job failure if not bound.
152   */
153  public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>();
154
155  /**
156   * Event handler for close messages sent by the client. Defaults to job failure if not bound.
157   */
158  public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>();
159
160  // ***** CONTEXT HANDLER BINDINGS:
161
162  /**
163   * Event handler for active context. Defaults to closing the context if not bound.
164   */
165  public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
166
167  /**
168   * Event handler for closed context. Defaults to logging if not bound.
169   */
170  public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
171
172  /**
173   * Event handler for closed context. Defaults to job failure if not bound.
174   */
175  public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
176
177  /**
178   * Event handler for context messages. Defaults to logging if not bound.
179   */
180  public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
181
182  // ***** MISC
183
184  /**
185   * Progress provider. See {@link ProgressProvider}.
186   */
187  public static final OptionalImpl<ProgressProvider> PROGRESS_PROVIDER = new OptionalImpl<>();
188
189  /**
190   * Number of threads allocated per evaluator to dispatch events from this Evaluator.
191   */
192  public static final OptionalParameter<Integer> EVALUATOR_DISPATCHER_THREADS = new OptionalParameter<>();
193
194  /**
195   * The number of submissions that the resource manager will attempt to submit the application. Defaults to 1.
196   */
197  public static final OptionalParameter<Integer> MAX_APPLICATION_SUBMISSIONS = new OptionalParameter<>();
198
199  /**
200   * The number of Threads in a Driver to verify the completion of Evaluators.
201   * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
202   */
203  public static final OptionalParameter<Integer> EVALUATOR_IDLENESS_THREAD_POOL_SIZE = new OptionalParameter<>();
204
205  /**
206   * The number of Threads in a Driver to verify the completion of Evaluators.
207   * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
208   */
209  public static final OptionalParameter<Long> EVALUATOR_IDLENESS_WAIT_IN_MS = new OptionalParameter<>();
210
211  /**
212   * ConfigurationModule to fill out to get a legal Driver Configuration.
213   */
214  public static final ConfigurationModule CONF = new DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
215      .bindNamedParameter(DriverIdentifier.class, DRIVER_IDENTIFIER)
216      .bindNamedParameter(DriverMemory.class, DRIVER_MEMORY)
217      .bindNamedParameter(DriverJobSubmissionDirectory.class, DRIVER_JOB_SUBMISSION_DIRECTORY)
218      .bindNamedParameter(MaxApplicationSubmissions.class, MAX_APPLICATION_SUBMISSIONS)
219      .bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES)
220      .bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES)
221      .bindSetEntry(DriverLocalFiles.class, LOCAL_FILES)
222      .bindSetEntry(DriverLocalLibraries.class, LOCAL_LIBRARIES)
223
224          // Driver start/stop handlers
225      .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
226      .bindSetEntry(Clock.StartHandler.class, org.apache.reef.runtime.common.driver.DriverStartHandler.class)
227      .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
228
229          // Evaluator handlers
230      .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
231      .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
232      .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
233
234          // Task handlers
235      .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
236      .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
237      .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
238      .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
239      .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
240
241          // Context handlers
242      .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
243      .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
244      .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
245      .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
246
247          // Client handlers
248      .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE)
249      .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED)
250      .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE)
251
252          // Various parameters
253      .bindNamedParameter(EvaluatorDispatcherThreads.class, EVALUATOR_DISPATCHER_THREADS)
254      .bindNamedParameter(EvaluatorIdlenessThreadPoolSize.class, EVALUATOR_IDLENESS_THREAD_POOL_SIZE)
255      .bindNamedParameter(EvaluatorIdlenessWaitInMilliseconds.class, EVALUATOR_IDLENESS_WAIT_IN_MS)
256      .bindImplementation(ProgressProvider.class, PROGRESS_PROVIDER)
257      .build();
258}