001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.impl; 020 021import org.apache.hadoop.mapred.InputFormat; 022import org.apache.hadoop.mapred.InputSplit; 023import org.apache.hadoop.mapred.JobConf; 024import org.apache.reef.annotations.audience.DriverSide; 025import org.apache.reef.driver.context.ActiveContext; 026import org.apache.reef.driver.context.ContextConfiguration; 027import org.apache.reef.driver.context.ServiceConfiguration; 028import org.apache.reef.driver.evaluator.AllocatedEvaluator; 029import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder; 030import org.apache.reef.io.data.loading.api.DataLoadingService; 031import org.apache.reef.io.data.loading.api.DataSet; 032import org.apache.reef.tang.Configuration; 033import org.apache.reef.tang.Tang; 034import org.apache.reef.tang.annotations.Parameter; 035import org.apache.reef.tang.exceptions.BindException; 036 037import javax.inject.Inject; 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.Random; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * An implementation of {@link DataLoadingService} 046 * that uses the Hadoop {@link InputFormat} to find 047 * partitions of data & request resources. 048 * <p/> 049 * The InputFormat is injected using a Tang external constructor 050 * <p/> 051 * It also tries to obtain data locality in a greedy 052 * fashion using {@link EvaluatorToPartitionMapper} 053 */ 054@DriverSide 055public class InputFormatLoadingService<K, V> implements DataLoadingService { 056 057 private static final Logger LOG = Logger.getLogger(InputFormatLoadingService.class.getName()); 058 059 private static final String DATA_LOAD_CONTEXT_PREFIX = "DataLoadContext-"; 060 061 private static final String COMPUTE_CONTEXT_PREFIX = 062 "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-"; 063 064 private final EvaluatorToPartitionMapper<InputSplit> evaluatorToPartitionMapper; 065 private final int numberOfPartitions; 066 067 private final boolean inMemory; 068 069 private final String inputFormatClass; 070 071 private final String inputPath; 072 073 @Inject 074 public InputFormatLoadingService( 075 final InputFormat<K, V> inputFormat, 076 final JobConf jobConf, 077 final @Parameter(DataLoadingRequestBuilder.NumberOfDesiredSplits.class) int numberOfDesiredSplits, 078 final @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) boolean inMemory, 079 final @Parameter(JobConfExternalConstructor.InputFormatClass.class) String inputFormatClass, 080 final @Parameter(JobConfExternalConstructor.InputPath.class) String inputPath) { 081 082 this.inMemory = inMemory; 083 this.inputFormatClass = inputFormatClass; 084 this.inputPath = inputPath; 085 086 087 try { 088 089 final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, numberOfDesiredSplits); 090 if (LOG.isLoggable(Level.FINEST)) { 091 LOG.log(Level.FINEST, "Splits: {0}", Arrays.toString(inputSplits)); 092 } 093 094 this.numberOfPartitions = inputSplits.length; 095 LOG.log(Level.FINE, "Number of partitions: {0}", this.numberOfPartitions); 096 097 this.evaluatorToPartitionMapper = new EvaluatorToPartitionMapper<>(inputSplits); 098 099 } catch (final IOException e) { 100 throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e); 101 } 102 } 103 104 @Override 105 public int getNumberOfPartitions() { 106 return this.numberOfPartitions; 107 } 108 109 @Override 110 public Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator) { 111 112 final NumberedSplit<InputSplit> numberedSplit = 113 this.evaluatorToPartitionMapper.getInputSplit( 114 allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(), 115 allocatedEvaluator.getId()); 116 117 return ContextConfiguration.CONF 118 .set(ContextConfiguration.IDENTIFIER, DATA_LOAD_CONTEXT_PREFIX + numberedSplit.getIndex()) 119 .build(); 120 } 121 122 @Override 123 public Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator) { 124 125 try { 126 127 final NumberedSplit<InputSplit> numberedSplit = 128 this.evaluatorToPartitionMapper.getInputSplit( 129 allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(), 130 allocatedEvaluator.getId()); 131 132 final Configuration serviceConfiguration = ServiceConfiguration.CONF 133 .set(ServiceConfiguration.SERVICES, 134 this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class) 135 .build(); 136 137 return Tang.Factory.getTang().newConfigurationBuilder(serviceConfiguration) 138 .bindImplementation( 139 DataSet.class, 140 this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class) 141 .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass) 142 .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath) 143 .bindNamedParameter( 144 InputSplitExternalConstructor.SerializedInputSplit.class, 145 WritableSerializer.serialize(numberedSplit.getEntry())) 146 .bindConstructor(InputSplit.class, InputSplitExternalConstructor.class) 147 .bindConstructor(JobConf.class, JobConfExternalConstructor.class) 148 .build(); 149 150 } catch (final BindException ex) { 151 final String evalId = allocatedEvaluator.getId(); 152 final String msg = "Unable to create configuration for evaluator " + evalId; 153 LOG.log(Level.WARNING, msg, ex); 154 throw new RuntimeException(msg, ex); 155 } 156 } 157 158 @Override 159 public String getComputeContextIdPrefix() { 160 return COMPUTE_CONTEXT_PREFIX; 161 } 162 163 @Override 164 public boolean isComputeContext(final ActiveContext context) { 165 return context.getId().startsWith(COMPUTE_CONTEXT_PREFIX); 166 } 167 168 @Override 169 public boolean isDataLoadedContext(final ActiveContext context) { 170 return context.getId().startsWith(DATA_LOAD_CONTEXT_PREFIX); 171 } 172}