This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.api;
020
021import org.apache.hadoop.mapred.InputFormat;
022import org.apache.hadoop.mapred.JobConf;
023import org.apache.hadoop.mapred.TextInputFormat;
024import org.apache.reef.client.DriverConfiguration;
025import org.apache.reef.driver.evaluator.EvaluatorRequest;
026import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
027import org.apache.reef.io.data.loading.impl.InputFormatExternalConstructor;
028import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
029import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.JavaConfigurationBuilder;
032import org.apache.reef.tang.Tang;
033import org.apache.reef.tang.annotations.Name;
034import org.apache.reef.tang.annotations.NamedParameter;
035import org.apache.reef.tang.exceptions.BindException;
036import org.apache.reef.tang.formats.ConfigurationModule;
037
038/**
039 * Builder to create a request to the DataLoadingService.
040 */
041public final class DataLoadingRequestBuilder
042    implements org.apache.reef.util.Builder<Configuration> {
043
044  private int memoryMB = -1;
045  private int numberOfCores = -1;
046  private int numberOfDesiredSplits = -1;
047  private EvaluatorRequest computeRequest = null;
048  private boolean inMemory = false;
049  private boolean renewFailedEvaluators = true;
050  private ConfigurationModule driverConfigurationModule = null;
051  private String inputFormatClass;
052  private String inputPath;
053
054  public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) {
055    this.numberOfDesiredSplits = numberOfDesiredSplits;
056    return this;
057  }
058
059  /**
060   * Set the memory to be used for Evaluator allocated.
061   *
062   * @param memoryMB the amount of memory in MB
063   * @return this
064   */
065  public DataLoadingRequestBuilder setMemoryMB(final int memoryMB) {
066    this.memoryMB = memoryMB;
067    return this;
068  }
069
070  /**
071   * Set the core number to be used for Evaluator allocated.
072   *
073   * @param numberOfCores the number of cores
074   * @return this
075   */
076  public DataLoadingRequestBuilder setNumberOfCores(final int numberOfCores) {
077    this.numberOfCores = numberOfCores;
078    return this;
079  }
080
081  public DataLoadingRequestBuilder setComputeRequest(final EvaluatorRequest computeRequest) {
082    this.computeRequest = computeRequest;
083    return this;
084  }
085
086  public DataLoadingRequestBuilder loadIntoMemory(final boolean inMemory) {
087    this.inMemory = inMemory;
088    return this;
089  }
090
091  public DataLoadingRequestBuilder renewFailedEvaluators(final boolean renewFailedEvaluators) {
092    this.renewFailedEvaluators = renewFailedEvaluators;
093    return this;
094  }
095
096  public DataLoadingRequestBuilder setDriverConfigurationModule(
097      final ConfigurationModule driverConfigurationModule) {
098    this.driverConfigurationModule = driverConfigurationModule;
099    return this;
100  }
101
102  public DataLoadingRequestBuilder setInputFormatClass(
103      final Class<? extends InputFormat> inputFormatClass) {
104    this.inputFormatClass = inputFormatClass.getName();
105    return this;
106  }
107
108  public DataLoadingRequestBuilder setInputPath(final String inputPath) {
109    this.inputPath = inputPath;
110    return this;
111  }
112
113  @Override
114  public Configuration build() throws BindException {
115    if (this.driverConfigurationModule == null) {
116      throw new BindException("Driver Configuration Module is a required parameter.");
117    }
118
119    if (this.inputPath == null) {
120      throw new BindException("InputPath is a required parameter.");
121    }
122
123    if (this.inputFormatClass == null) {
124      this.inputFormatClass = TextInputFormat.class.getName();
125    }
126
127    final Configuration driverConfiguration;
128    if (renewFailedEvaluators) {
129      driverConfiguration = this.driverConfigurationModule
130          .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
131          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
132          .set(DriverConfiguration.ON_EVALUATOR_FAILED, DataLoader.EvaluatorFailedHandler.class)
133          .build();
134    } else {
135      driverConfiguration = this.driverConfigurationModule
136          .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
137          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
138          .build();
139    }
140
141    final JavaConfigurationBuilder jcb =
142        Tang.Factory.getTang().newConfigurationBuilder(driverConfiguration);
143
144    if (this.numberOfDesiredSplits > 0) {
145      jcb.bindNamedParameter(NumberOfDesiredSplits.class, "" + this.numberOfDesiredSplits);
146    }
147
148    if (this.memoryMB > 0) {
149      jcb.bindNamedParameter(DataLoadingEvaluatorMemoryMB.class, "" + this.memoryMB);
150    }
151
152    if (this.numberOfCores > 0) {
153      jcb.bindNamedParameter(DataLoadingEvaluatorNumberOfCores.class, "" + this.numberOfCores);
154    }
155
156    if (this.computeRequest != null) {
157      jcb.bindNamedParameter(DataLoadingComputeRequest.class,
158          EvaluatorRequestSerializer.serialize(this.computeRequest));
159    }
160
161    return jcb
162        .bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory))
163        .bindConstructor(InputFormat.class, InputFormatExternalConstructor.class)
164        .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
165        .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
166        .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
167        .bindImplementation(DataLoadingService.class, InputFormatLoadingService.class)
168        .build();
169  }
170
171  @NamedParameter(short_name = "num_splits", default_value = "0")
172  public static final class NumberOfDesiredSplits implements Name<Integer> {
173  }
174
175  @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB", default_value = "4096")
176  public static final class DataLoadingEvaluatorMemoryMB implements Name<Integer> {
177  }
178
179  @NamedParameter(short_name = "dataLoadingEvaluatorCore", default_value = "1")
180  public static final class DataLoadingEvaluatorNumberOfCores implements Name<Integer> {
181  }
182
183  @NamedParameter(default_value = "NULL")
184  public static final class DataLoadingComputeRequest implements Name<String> {
185  }
186
187  @NamedParameter(default_value = "false")
188  public static final class LoadDataIntoMemory implements Name<Boolean> {
189  }
190}