001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.api;
020
021import org.apache.commons.lang.Validate;
022import org.apache.hadoop.mapred.InputFormat;
023import org.apache.hadoop.mapred.TextInputFormat;
024import org.apache.reef.client.DriverConfiguration;
025import org.apache.reef.driver.evaluator.EvaluatorRequest;
026import org.apache.reef.io.data.loading.impl.DistributedDataSetPartitionSerializer;
027import org.apache.reef.io.data.loading.impl.AvroEvaluatorRequestSerializer;
028import org.apache.reef.io.data.loading.impl.SingleDataCenterEvaluatorToPartitionStrategy;
029import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition;
030import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
031import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
032import org.apache.reef.io.data.loading.impl.MultiDataCenterEvaluatorToPartitionStrategy;
033import org.apache.reef.runtime.common.utils.Constants;
034import org.apache.reef.tang.Configuration;
035import org.apache.reef.tang.JavaConfigurationBuilder;
036import org.apache.reef.tang.Tang;
037import org.apache.reef.tang.annotations.Name;
038import org.apache.reef.tang.annotations.NamedParameter;
039import org.apache.reef.tang.exceptions.BindException;
040import org.apache.reef.tang.formats.ConfigurationModule;
041
042import java.util.ArrayList;
043import java.util.Iterator;
044import java.util.List;
045import java.util.Set;
046
047/**
048 * Builder to create a request to the DataLoadingService.
049 */
050public final class DataLoadingRequestBuilder
051    implements org.apache.reef.util.Builder<Configuration> {
052
053  // constant used in several places.
054  private static final int UNINITIALIZED = -1;
055  private int numberOfDesiredSplits = UNINITIALIZED;
056  private final List<EvaluatorRequest> computeRequests = new ArrayList<>();
057  private final List<EvaluatorRequest> dataRequests = new ArrayList<>();
058  private boolean inMemory = false;
059  private boolean renewFailedEvaluators = true;
060  private ConfigurationModule driverConfigurationModule = null;
061  private String inputFormatClass;
062  /**
063   * Single data center loading strategy flag. Allows to specify if the data
064   * will be loaded in machines of a single data center or not. By
065   * default, is set to true.
066   */
067  private boolean singleDataCenterStrategy = true;
068  /**
069   * Distributed dataset that can contain many distributed partitions.
070   */
071  private DistributedDataSet distributedDataSet;
072
073  /**
074   * The input path of the data to be loaded.
075   */
076  private String inputPath;
077
078  public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) {
079    this.numberOfDesiredSplits = numberOfDesiredSplits;
080    return this;
081  }
082
083  /**
084   * Adds the requests to the compute requests list.
085   *
086   * @param computeRequests
087   *          the compute requests to add
088   * @return this
089   */
090  @SuppressWarnings("checkstyle:hiddenfield")
091  public DataLoadingRequestBuilder addComputeRequests(final List<EvaluatorRequest> computeRequests) {
092    for (final EvaluatorRequest computeRequest : computeRequests) {
093      addComputeRequest(computeRequest);
094    }
095    return this;
096  }
097
098  /**
099   * Adds the requests to the data requests list.
100   *
101   * @param dataRequests
102   *          the data requests to add
103   * @return this
104   */
105  @SuppressWarnings("checkstyle:hiddenfield")
106  public DataLoadingRequestBuilder addDataRequests(final List<EvaluatorRequest> dataRequests) {
107    for (final EvaluatorRequest dataRequest : dataRequests) {
108      addDataRequest(dataRequest);
109    }
110    return this;
111  }
112
113  /**
114   * Adds a single request to the compute requests list.
115   *
116   * @param computeRequest
117   *          the compute request to add
118   * @return this
119   */
120  public DataLoadingRequestBuilder addComputeRequest(final EvaluatorRequest computeRequest) {
121    this.computeRequests.add(computeRequest);
122    return this;
123  }
124
125  /**
126   * Adds a single request to the data requests list.
127   *
128   * @param dataRequest
129   *          the data request to add
130   * @return this
131   */
132  public DataLoadingRequestBuilder addDataRequest(final EvaluatorRequest dataRequest) {
133    this.dataRequests.add(dataRequest);
134    return this;
135  }
136
137  @SuppressWarnings("checkstyle:hiddenfield")
138  public DataLoadingRequestBuilder loadIntoMemory(final boolean inMemory) {
139    this.inMemory = inMemory;
140    return this;
141  }
142
143  @SuppressWarnings("checkstyle:hiddenfield")
144  public DataLoadingRequestBuilder renewFailedEvaluators(final boolean renewFailedEvaluators) {
145    this.renewFailedEvaluators = renewFailedEvaluators;
146    return this;
147  }
148
149  public DataLoadingRequestBuilder setDriverConfigurationModule(
150      final ConfigurationModule driverConfigurationModule) {
151    this.driverConfigurationModule = driverConfigurationModule;
152    return this;
153  }
154
155  public DataLoadingRequestBuilder setInputFormatClass(
156      final Class<? extends InputFormat> inputFormatClass) {
157    this.inputFormatClass = inputFormatClass.getName();
158    return this;
159  }
160
161  /**
162   * Sets the path of the folder where the data is.
163   * Internally, a distributed dataset with a unique partition is created,
164   * and {@link SingleDataCenterEvaluatorToPartitionStrategy} is binded.
165   *
166   * @param inputPath
167   *          the input path
168   * @return this
169   */
170  public DataLoadingRequestBuilder setInputPath(final String inputPath) {
171    this.inputPath = inputPath;
172    this.singleDataCenterStrategy = true;
173    return this;
174  }
175
176  /**
177   * Sets the distributed data set.
178   * Internally, a {@link MultiDataCenterEvaluatorToPartitionStrategy} is binded.
179   *
180   * @param distributedDataSet
181   *          the distributed data set
182   * @return this
183   */
184  public DataLoadingRequestBuilder setDistributedDataSet(final DistributedDataSet distributedDataSet) {
185    this.distributedDataSet = distributedDataSet;
186    this.singleDataCenterStrategy = false;
187    return this;
188  }
189
190  @Override
191  public Configuration build() throws BindException {
192    if (this.driverConfigurationModule == null) {
193      throw new BindException("Driver Configuration Module is a required parameter.");
194    }
195
196    // need to create the distributed data set
197    if (this.singleDataCenterStrategy) {
198      if (this.inputPath == null) {
199        throw new BindException("Should specify an input path.");
200      }
201      if (this.distributedDataSet != null && !this.distributedDataSet.isEmpty()) {
202        throw new BindException("You should either call setInputPath or setDistributedDataSet, but not both");
203      }
204      // Create a distributed data set with one partition, the splits defined by
205      // the user if greater than 0 or no splits, and data to be loaded from
206      // anywhere.
207      final DistributedDataSet dds = new DistributedDataSet();
208      dds.addPartition(DistributedDataSetPartition
209          .newBuilder()
210          .setPath(inputPath)
211          .setLocation(Constants.ANY_RACK)
212          .setDesiredSplits(numberOfDesiredSplits > 0 ?
213                                numberOfDesiredSplits :
214                                Integer.parseInt(NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)).build());
215      this.distributedDataSet = dds;
216    } else {
217      if (this.inputPath != null) {
218        throw new BindException("You should either call setInputPath or setDistributedDataSet, but not both");
219      }
220    }
221
222    if (this.distributedDataSet == null || this.distributedDataSet.isEmpty()) {
223      throw new BindException("Distributed Data Set is a required parameter.");
224    }
225
226    if (this.inputFormatClass == null) {
227      this.inputFormatClass = TextInputFormat.class.getName();
228    }
229
230    final Configuration driverConfiguration;
231    if (renewFailedEvaluators) {
232      driverConfiguration = this.driverConfigurationModule
233          .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
234          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
235          .set(DriverConfiguration.ON_EVALUATOR_FAILED, DataLoader.EvaluatorFailedHandler.class)
236          .build();
237    } else {
238      driverConfiguration = this.driverConfigurationModule
239          .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
240          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
241          .build();
242    }
243
244    final JavaConfigurationBuilder jcb =
245        Tang.Factory.getTang().newConfigurationBuilder(driverConfiguration);
246
247    Validate.isTrue(!this.dataRequests.isEmpty(),
248        "Number of cores and memory are deprecated; you have to add specific data requests");
249    for (final EvaluatorRequest request : this.dataRequests) {
250      jcb.bindSetEntry(DataLoadingDataRequests.class, AvroEvaluatorRequestSerializer.toString(request));
251    }
252
253    // compute requests can be empty to maintain compatibility with previous code.
254    if (!this.computeRequests.isEmpty()) {
255      for (final EvaluatorRequest request : this.computeRequests) {
256        jcb.bindSetEntry(DataLoadingComputeRequests.class, AvroEvaluatorRequestSerializer.toString(request));
257      }
258    }
259
260    jcb.bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory))
261       .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass);
262
263    final Iterator<DistributedDataSetPartition> partitions = this.distributedDataSet.iterator();
264    while (partitions.hasNext()) {
265      jcb.bindSetEntry(
266          DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class,
267          DistributedDataSetPartitionSerializer.serialize(partitions.next()));
268    }
269
270    // we do this check for backwards compatibility, if the user defined it
271    // wants to use the single data center loading strategy, we bind that implementation.
272    if (this.singleDataCenterStrategy) {
273      jcb.bindImplementation(EvaluatorToPartitionStrategy.class, SingleDataCenterEvaluatorToPartitionStrategy.class);
274    } else {
275      // otherwise, we bind the strategy that will allow the user to specify
276      // which evaluators can load the different partitions in a multi data center network topology
277      jcb.bindImplementation(EvaluatorToPartitionStrategy.class, MultiDataCenterEvaluatorToPartitionStrategy.class);
278    }
279
280    return jcb.bindImplementation(DataLoadingService.class, InputFormatLoadingService.class).build();
281  }
282
283  @NamedParameter(short_name = "num_splits", default_value = NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)
284  public static final class NumberOfDesiredSplits implements Name<Integer> {
285    static final String DEFAULT_DESIRED_SPLITS = "0";
286  }
287
288  /**
289   * Allows to specify a set of compute requests to send to the DataLoader.
290   */
291  @NamedParameter(doc = "Sets of compute requests to request to the DataLoader, " +
292      "i.e. evaluators requests that will not load data")
293  static final class DataLoadingComputeRequests implements Name<Set<String>> {
294  }
295
296  /**
297   * Allows to specify a set of data requests to send to the DataLoader.
298   */
299  @NamedParameter(doc = "Sets of data requests to request to the DataLoader, " +
300      "i.e. evaluators requests that will load data")
301  static final class DataLoadingDataRequests implements Name<Set<String>> {
302  }
303
304  @NamedParameter(default_value = "false")
305  public static final class LoadDataIntoMemory implements Name<Boolean> {
306  }
307}