001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.api; 020 021import org.apache.commons.lang.Validate; 022import org.apache.hadoop.mapred.InputFormat; 023import org.apache.hadoop.mapred.TextInputFormat; 024import org.apache.reef.client.DriverConfiguration; 025import org.apache.reef.driver.evaluator.EvaluatorRequest; 026import org.apache.reef.io.data.loading.impl.DistributedDataSetPartitionSerializer; 027import org.apache.reef.io.data.loading.impl.AvroEvaluatorRequestSerializer; 028import org.apache.reef.io.data.loading.impl.SingleDataCenterEvaluatorToPartitionStrategy; 029import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition; 030import org.apache.reef.io.data.loading.impl.InputFormatLoadingService; 031import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor; 032import org.apache.reef.io.data.loading.impl.MultiDataCenterEvaluatorToPartitionStrategy; 033import org.apache.reef.runtime.common.utils.Constants; 034import org.apache.reef.tang.Configuration; 035import org.apache.reef.tang.JavaConfigurationBuilder; 036import org.apache.reef.tang.Tang; 037import org.apache.reef.tang.annotations.Name; 038import org.apache.reef.tang.annotations.NamedParameter; 039import org.apache.reef.tang.exceptions.BindException; 040import org.apache.reef.tang.formats.ConfigurationModule; 041 042import java.util.ArrayList; 043import java.util.Iterator; 044import java.util.List; 045import java.util.Set; 046 047/** 048 * Builder to create a request to the DataLoadingService. 049 */ 050public final class DataLoadingRequestBuilder 051 implements org.apache.reef.util.Builder<Configuration> { 052 053 // constant used in several places. 054 private static final int UNINITIALIZED = -1; 055 private int numberOfDesiredSplits = UNINITIALIZED; 056 private final List<EvaluatorRequest> computeRequests = new ArrayList<>(); 057 private final List<EvaluatorRequest> dataRequests = new ArrayList<>(); 058 private boolean inMemory = false; 059 private boolean renewFailedEvaluators = true; 060 private ConfigurationModule driverConfigurationModule = null; 061 private String inputFormatClass; 062 /** 063 * Single data center loading strategy flag. Allows to specify if the data 064 * will be loaded in machines of a single data center or not. By 065 * default, is set to true. 066 */ 067 private boolean singleDataCenterStrategy = true; 068 /** 069 * Distributed dataset that can contain many distributed partitions. 070 */ 071 private DistributedDataSet distributedDataSet; 072 073 /** 074 * The input path of the data to be loaded. 075 */ 076 private String inputPath; 077 078 public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) { 079 this.numberOfDesiredSplits = numberOfDesiredSplits; 080 return this; 081 } 082 083 /** 084 * Adds the requests to the compute requests list. 085 * 086 * @param computeRequests 087 * the compute requests to add 088 * @return this 089 */ 090 @SuppressWarnings("checkstyle:hiddenfield") 091 public DataLoadingRequestBuilder addComputeRequests(final List<EvaluatorRequest> computeRequests) { 092 for (final EvaluatorRequest computeRequest : computeRequests) { 093 addComputeRequest(computeRequest); 094 } 095 return this; 096 } 097 098 /** 099 * Adds the requests to the data requests list. 100 * 101 * @param dataRequests 102 * the data requests to add 103 * @return this 104 */ 105 @SuppressWarnings("checkstyle:hiddenfield") 106 public DataLoadingRequestBuilder addDataRequests(final List<EvaluatorRequest> dataRequests) { 107 for (final EvaluatorRequest dataRequest : dataRequests) { 108 addDataRequest(dataRequest); 109 } 110 return this; 111 } 112 113 /** 114 * Adds a single request to the compute requests list. 115 * 116 * @param computeRequest 117 * the compute request to add 118 * @return this 119 */ 120 public DataLoadingRequestBuilder addComputeRequest(final EvaluatorRequest computeRequest) { 121 this.computeRequests.add(computeRequest); 122 return this; 123 } 124 125 /** 126 * Adds a single request to the data requests list. 127 * 128 * @param dataRequest 129 * the data request to add 130 * @return this 131 */ 132 public DataLoadingRequestBuilder addDataRequest(final EvaluatorRequest dataRequest) { 133 this.dataRequests.add(dataRequest); 134 return this; 135 } 136 137 @SuppressWarnings("checkstyle:hiddenfield") 138 public DataLoadingRequestBuilder loadIntoMemory(final boolean inMemory) { 139 this.inMemory = inMemory; 140 return this; 141 } 142 143 @SuppressWarnings("checkstyle:hiddenfield") 144 public DataLoadingRequestBuilder renewFailedEvaluators(final boolean renewFailedEvaluators) { 145 this.renewFailedEvaluators = renewFailedEvaluators; 146 return this; 147 } 148 149 public DataLoadingRequestBuilder setDriverConfigurationModule( 150 final ConfigurationModule driverConfigurationModule) { 151 this.driverConfigurationModule = driverConfigurationModule; 152 return this; 153 } 154 155 public DataLoadingRequestBuilder setInputFormatClass( 156 final Class<? extends InputFormat> inputFormatClass) { 157 this.inputFormatClass = inputFormatClass.getName(); 158 return this; 159 } 160 161 /** 162 * Sets the path of the folder where the data is. 163 * Internally, a distributed dataset with a unique partition is created, 164 * and {@link SingleDataCenterEvaluatorToPartitionStrategy} is binded. 165 * 166 * @param inputPath 167 * the input path 168 * @return this 169 */ 170 public DataLoadingRequestBuilder setInputPath(final String inputPath) { 171 this.inputPath = inputPath; 172 this.singleDataCenterStrategy = true; 173 return this; 174 } 175 176 /** 177 * Sets the distributed data set. 178 * Internally, a {@link MultiDataCenterEvaluatorToPartitionStrategy} is binded. 179 * 180 * @param distributedDataSet 181 * the distributed data set 182 * @return this 183 */ 184 public DataLoadingRequestBuilder setDistributedDataSet(final DistributedDataSet distributedDataSet) { 185 this.distributedDataSet = distributedDataSet; 186 this.singleDataCenterStrategy = false; 187 return this; 188 } 189 190 @Override 191 public Configuration build() throws BindException { 192 if (this.driverConfigurationModule == null) { 193 throw new BindException("Driver Configuration Module is a required parameter."); 194 } 195 196 // need to create the distributed data set 197 if (this.singleDataCenterStrategy) { 198 if (this.inputPath == null) { 199 throw new BindException("Should specify an input path."); 200 } 201 if (this.distributedDataSet != null && !this.distributedDataSet.isEmpty()) { 202 throw new BindException("You should either call setInputPath or setDistributedDataSet, but not both"); 203 } 204 // Create a distributed data set with one partition, the splits defined by 205 // the user if greater than 0 or no splits, and data to be loaded from 206 // anywhere. 207 final DistributedDataSet dds = new DistributedDataSet(); 208 dds.addPartition(DistributedDataSetPartition 209 .newBuilder() 210 .setPath(inputPath) 211 .setLocation(Constants.ANY_RACK) 212 .setDesiredSplits(numberOfDesiredSplits > 0 ? 213 numberOfDesiredSplits : 214 Integer.parseInt(NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)).build()); 215 this.distributedDataSet = dds; 216 } else { 217 if (this.inputPath != null) { 218 throw new BindException("You should either call setInputPath or setDistributedDataSet, but not both"); 219 } 220 } 221 222 if (this.distributedDataSet == null || this.distributedDataSet.isEmpty()) { 223 throw new BindException("Distributed Data Set is a required parameter."); 224 } 225 226 if (this.inputFormatClass == null) { 227 this.inputFormatClass = TextInputFormat.class.getName(); 228 } 229 230 final Configuration driverConfiguration; 231 if (renewFailedEvaluators) { 232 driverConfiguration = this.driverConfigurationModule 233 .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class) 234 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class) 235 .set(DriverConfiguration.ON_EVALUATOR_FAILED, DataLoader.EvaluatorFailedHandler.class) 236 .build(); 237 } else { 238 driverConfiguration = this.driverConfigurationModule 239 .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class) 240 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class) 241 .build(); 242 } 243 244 final JavaConfigurationBuilder jcb = 245 Tang.Factory.getTang().newConfigurationBuilder(driverConfiguration); 246 247 Validate.isTrue(!this.dataRequests.isEmpty(), 248 "Number of cores and memory are deprecated; you have to add specific data requests"); 249 for (final EvaluatorRequest request : this.dataRequests) { 250 jcb.bindSetEntry(DataLoadingDataRequests.class, AvroEvaluatorRequestSerializer.toString(request)); 251 } 252 253 // compute requests can be empty to maintain compatibility with previous code. 254 if (!this.computeRequests.isEmpty()) { 255 for (final EvaluatorRequest request : this.computeRequests) { 256 jcb.bindSetEntry(DataLoadingComputeRequests.class, AvroEvaluatorRequestSerializer.toString(request)); 257 } 258 } 259 260 jcb.bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory)) 261 .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass); 262 263 final Iterator<DistributedDataSetPartition> partitions = this.distributedDataSet.iterator(); 264 while (partitions.hasNext()) { 265 jcb.bindSetEntry( 266 DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class, 267 DistributedDataSetPartitionSerializer.serialize(partitions.next())); 268 } 269 270 // we do this check for backwards compatibility, if the user defined it 271 // wants to use the single data center loading strategy, we bind that implementation. 272 if (this.singleDataCenterStrategy) { 273 jcb.bindImplementation(EvaluatorToPartitionStrategy.class, SingleDataCenterEvaluatorToPartitionStrategy.class); 274 } else { 275 // otherwise, we bind the strategy that will allow the user to specify 276 // which evaluators can load the different partitions in a multi data center network topology 277 jcb.bindImplementation(EvaluatorToPartitionStrategy.class, MultiDataCenterEvaluatorToPartitionStrategy.class); 278 } 279 280 return jcb.bindImplementation(DataLoadingService.class, InputFormatLoadingService.class).build(); 281 } 282 283 @NamedParameter(short_name = "num_splits", default_value = NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS) 284 public static final class NumberOfDesiredSplits implements Name<Integer> { 285 static final String DEFAULT_DESIRED_SPLITS = "0"; 286 } 287 288 /** 289 * Allows to specify a set of compute requests to send to the DataLoader. 290 */ 291 @NamedParameter(doc = "Sets of compute requests to request to the DataLoader, " + 292 "i.e. evaluators requests that will not load data") 293 static final class DataLoadingComputeRequests implements Name<Set<String>> { 294 } 295 296 /** 297 * Allows to specify a set of data requests to send to the DataLoader. 298 */ 299 @NamedParameter(doc = "Sets of data requests to request to the DataLoader, " + 300 "i.e. evaluators requests that will load data") 301 static final class DataLoadingDataRequests implements Name<Set<String>> { 302 } 303 304 @NamedParameter(default_value = "false") 305 public static final class LoadDataIntoMemory implements Name<Boolean> { 306 } 307}