001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.api; 020 021import org.apache.hadoop.mapred.InputFormat; 022import org.apache.hadoop.mapred.JobConf; 023import org.apache.hadoop.mapred.TextInputFormat; 024import org.apache.reef.client.DriverConfiguration; 025import org.apache.reef.driver.evaluator.EvaluatorRequest; 026import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer; 027import org.apache.reef.io.data.loading.impl.InputFormatExternalConstructor; 028import org.apache.reef.io.data.loading.impl.InputFormatLoadingService; 029import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.JavaConfigurationBuilder; 032import org.apache.reef.tang.Tang; 033import org.apache.reef.tang.annotations.Name; 034import org.apache.reef.tang.annotations.NamedParameter; 035import org.apache.reef.tang.exceptions.BindException; 036import org.apache.reef.tang.formats.ConfigurationModule; 037 038/** 039 * Builder to create a request to the DataLoadingService. 040 */ 041public final class DataLoadingRequestBuilder 042 implements org.apache.reef.util.Builder<Configuration> { 043 044 private int memoryMB = -1; 045 private int numberOfCores = -1; 046 private int numberOfDesiredSplits = -1; 047 private EvaluatorRequest computeRequest = null; 048 private boolean inMemory = false; 049 private boolean renewFailedEvaluators = true; 050 private ConfigurationModule driverConfigurationModule = null; 051 private String inputFormatClass; 052 private String inputPath; 053 054 public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) { 055 this.numberOfDesiredSplits = numberOfDesiredSplits; 056 return this; 057 } 058 059 /** 060 * Set the memory to be used for Evaluator allocated. 061 * 062 * @param memoryMB the amount of memory in MB 063 * @return this 064 */ 065 public DataLoadingRequestBuilder setMemoryMB(final int memoryMB) { 066 this.memoryMB = memoryMB; 067 return this; 068 } 069 070 /** 071 * Set the core number to be used for Evaluator allocated. 072 * 073 * @param numberOfCores the number of cores 074 * @return this 075 */ 076 public DataLoadingRequestBuilder setNumberOfCores(final int numberOfCores) { 077 this.numberOfCores = numberOfCores; 078 return this; 079 } 080 081 public DataLoadingRequestBuilder setComputeRequest(final EvaluatorRequest computeRequest) { 082 this.computeRequest = computeRequest; 083 return this; 084 } 085 086 public DataLoadingRequestBuilder loadIntoMemory(final boolean inMemory) { 087 this.inMemory = inMemory; 088 return this; 089 } 090 091 public DataLoadingRequestBuilder renewFailedEvaluators(final boolean renewFailedEvaluators) { 092 this.renewFailedEvaluators = renewFailedEvaluators; 093 return this; 094 } 095 096 public DataLoadingRequestBuilder setDriverConfigurationModule( 097 final ConfigurationModule driverConfigurationModule) { 098 this.driverConfigurationModule = driverConfigurationModule; 099 return this; 100 } 101 102 public DataLoadingRequestBuilder setInputFormatClass( 103 final Class<? extends InputFormat> inputFormatClass) { 104 this.inputFormatClass = inputFormatClass.getName(); 105 return this; 106 } 107 108 public DataLoadingRequestBuilder setInputPath(final String inputPath) { 109 this.inputPath = inputPath; 110 return this; 111 } 112 113 @Override 114 public Configuration build() throws BindException { 115 if (this.driverConfigurationModule == null) { 116 throw new BindException("Driver Configuration Module is a required parameter."); 117 } 118 119 if (this.inputPath == null) { 120 throw new BindException("InputPath is a required parameter."); 121 } 122 123 if (this.inputFormatClass == null) { 124 this.inputFormatClass = TextInputFormat.class.getName(); 125 } 126 127 final Configuration driverConfiguration; 128 if (renewFailedEvaluators) { 129 driverConfiguration = this.driverConfigurationModule 130 .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class) 131 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class) 132 .set(DriverConfiguration.ON_EVALUATOR_FAILED, DataLoader.EvaluatorFailedHandler.class) 133 .build(); 134 } else { 135 driverConfiguration = this.driverConfigurationModule 136 .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class) 137 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class) 138 .build(); 139 } 140 141 final JavaConfigurationBuilder jcb = 142 Tang.Factory.getTang().newConfigurationBuilder(driverConfiguration); 143 144 if (this.numberOfDesiredSplits > 0) { 145 jcb.bindNamedParameter(NumberOfDesiredSplits.class, "" + this.numberOfDesiredSplits); 146 } 147 148 if (this.memoryMB > 0) { 149 jcb.bindNamedParameter(DataLoadingEvaluatorMemoryMB.class, "" + this.memoryMB); 150 } 151 152 if (this.numberOfCores > 0) { 153 jcb.bindNamedParameter(DataLoadingEvaluatorNumberOfCores.class, "" + this.numberOfCores); 154 } 155 156 if (this.computeRequest != null) { 157 jcb.bindNamedParameter(DataLoadingComputeRequest.class, 158 EvaluatorRequestSerializer.serialize(this.computeRequest)); 159 } 160 161 return jcb 162 .bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory)) 163 .bindConstructor(InputFormat.class, InputFormatExternalConstructor.class) 164 .bindConstructor(JobConf.class, JobConfExternalConstructor.class) 165 .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass) 166 .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath) 167 .bindImplementation(DataLoadingService.class, InputFormatLoadingService.class) 168 .build(); 169 } 170 171 @NamedParameter(short_name = "num_splits", default_value = "0") 172 public static final class NumberOfDesiredSplits implements Name<Integer> { 173 } 174 175 @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB", default_value = "4096") 176 public static final class DataLoadingEvaluatorMemoryMB implements Name<Integer> { 177 } 178 179 @NamedParameter(short_name = "dataLoadingEvaluatorCore", default_value = "1") 180 public static final class DataLoadingEvaluatorNumberOfCores implements Name<Integer> { 181 } 182 183 @NamedParameter(default_value = "NULL") 184 public static final class DataLoadingComputeRequest implements Name<String> { 185 } 186 187 @NamedParameter(default_value = "false") 188 public static final class LoadDataIntoMemory implements Name<Boolean> { 189 } 190}