001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.impl; 020 021import org.apache.commons.lang.Validate; 022import org.apache.commons.math3.util.Pair; 023import org.apache.hadoop.mapred.InputFormat; 024import org.apache.hadoop.mapred.InputSplit; 025import org.apache.hadoop.mapred.JobConf; 026import org.apache.reef.annotations.Unstable; 027import org.apache.reef.annotations.audience.DriverSide; 028import org.apache.reef.driver.catalog.NodeDescriptor; 029import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy; 030import org.apache.reef.tang.ExternalConstructor; 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.Map.Entry; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046/** 047 * This is an abstract class useful for {@link EvaluatorToPartitionStrategy} 048 * implementations. Contains a template implementation of 049 * {@link EvaluatorToPartitionStrategy#getInputSplit(NodeDescriptor, String)} 050 * that call abstract methods implemented by subclasses. If your implementation 051 * does not need this logic, you should just implement the 052 * {@link EvaluatorToPartitionStrategy} interface and do not extend this class. 053 */ 054@DriverSide 055@Unstable 056public abstract class AbstractEvaluatorToPartitionStrategy implements EvaluatorToPartitionStrategy<InputSplit> { 057 private static final Logger LOG = Logger.getLogger(AbstractEvaluatorToPartitionStrategy.class.getName()); 058 059 protected final ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> locationToSplits; 060 protected final ConcurrentMap<String, NumberedSplit<InputSplit>> evaluatorToSplits; 061 protected final BlockingQueue<NumberedSplit<InputSplit>> unallocatedSplits; 062 063 private int totalNumberOfSplits; 064 065 @SuppressWarnings("rawtypes") 066 AbstractEvaluatorToPartitionStrategy( 067 final String inputFormatClassName, final Set<String> serializedDataPartitions) { 068 LOG.fine("AbstractEvaluatorToPartitionStrategy injected"); 069 Validate.notEmpty(inputFormatClassName); 070 Validate.notEmpty(serializedDataPartitions); 071 072 locationToSplits = new ConcurrentHashMap<>(); 073 evaluatorToSplits = new ConcurrentHashMap<>(); 074 unallocatedSplits = new LinkedBlockingQueue<>(); 075 setUp(); 076 077 final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition = new HashMap<>(); 078 for (final String serializedDataPartition : serializedDataPartitions) { 079 final DistributedDataSetPartition dp = DistributedDataSetPartitionSerializer.deserialize(serializedDataPartition); 080 final ExternalConstructor<JobConf> jobConfExternalConstructor = new JobConfExternalConstructor( 081 inputFormatClassName, dp.getPath()); 082 try { 083 final JobConf jobConf = jobConfExternalConstructor.newInstance(); 084 final InputFormat inputFormat = jobConf.getInputFormat(); 085 final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, dp.getDesiredSplits()); 086 if (LOG.isLoggable(Level.FINEST)) { 087 LOG.log(Level.FINEST, "Splits for partition: {0} {1}", new Object[] {dp, Arrays.toString(inputSplits)}); 088 } 089 this.totalNumberOfSplits += inputSplits.length; 090 splitsPerPartition.put(dp, inputSplits); 091 } catch (final IOException e) { 092 throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e); 093 } 094 } 095 init(splitsPerPartition); 096 LOG.log(Level.FINE, "Total Number of splits: {0}", this.totalNumberOfSplits); 097 } 098 099 /** 100 * Initializes the locations of the splits where we'd like to be loaded into. 101 * Sets all the splits to unallocated 102 * 103 * @param splitsPerPartition 104 * a map containing the input splits per data partition 105 */ 106 private void init(final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition) { 107 final Pair<InputSplit[], DistributedDataSetPartition[]> 108 splitsAndPartitions = getSplitsAndPartitions(splitsPerPartition); 109 final InputSplit[] splits = splitsAndPartitions.getFirst(); 110 final DistributedDataSetPartition[] partitions = splitsAndPartitions.getSecond(); 111 Validate.isTrue(splits.length == partitions.length); 112 for (int splitNum = 0; splitNum < splits.length; splitNum++) { 113 LOG.log(Level.FINE, "Processing split: " + splitNum); 114 final InputSplit split = splits[splitNum]; 115 final NumberedSplit<InputSplit> numberedSplit = new NumberedSplit<>(split, splitNum, 116 partitions[splitNum]); 117 unallocatedSplits.add(numberedSplit); 118 updateLocations(numberedSplit); 119 } 120 if (LOG.isLoggable(Level.FINE)) { 121 for (final Map.Entry<String, BlockingQueue<NumberedSplit<InputSplit>>> locSplit : locationToSplits.entrySet()) { 122 LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString()); 123 } 124 } 125 } 126 127 /** 128 * Each strategy should update the locations where they want the split to be 129 * loaded into. For example, the split physical location, certain node, 130 * certain rack 131 * 132 * @param numberedSplit 133 * the numberedSplit 134 */ 135 protected abstract void updateLocations(final NumberedSplit<InputSplit> numberedSplit); 136 137 /** 138 * Tries to allocate a split in an evaluator based on some particular rule. 139 * For example, based on the rack name, randomly, etc. 140 * 141 * @param nodeDescriptor 142 * the node descriptor to extract information from 143 * @param evaluatorId 144 * the evaluator id where we want to allocate the numberedSplit 145 * @return a numberedSplit or null if couldn't allocate one 146 */ 147 protected abstract NumberedSplit<InputSplit> tryAllocate(NodeDescriptor nodeDescriptor, String evaluatorId); 148 149 /** 150 * Called in the constructor. Allows children to setUp the objects they will 151 * need in 152 * {@link AbstractEvaluatorToPartitionStrategy#updateLocations(InputSplit, NumberedSplit)} 153 * and 154 * {@link AbstractEvaluatorToPartitionStrategy#tryAllocate(NodeDescriptor, String)} 155 * methods. 156 * By default we provide an empty implementation. 157 */ 158 protected void setUp() { 159 // empty implementation by default 160 } 161 162 /** 163 * Get an input split to be assigned to this evaluator. 164 * <p> 165 * Allocates one if its not already allocated 166 * 167 * @param evaluatorId 168 * @return a numberedSplit 169 * @throws RuntimeException 170 * if couldn't find any split 171 */ 172 @Override 173 public NumberedSplit<InputSplit> getInputSplit(final NodeDescriptor nodeDescriptor, final String evaluatorId) { 174 synchronized (evaluatorToSplits) { 175 if (evaluatorToSplits.containsKey(evaluatorId)) { 176 LOG.log(Level.FINE, "Found an already allocated split, {0}", evaluatorToSplits.toString()); 177 return evaluatorToSplits.get(evaluatorId); 178 } 179 } 180 // always first try to allocate based on the hostName 181 final String hostName = nodeDescriptor.getName(); 182 LOG.log(Level.FINE, "Allocated split not found, trying on {0}", hostName); 183 if (locationToSplits.containsKey(hostName)) { 184 LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new Object[] {evaluatorId, hostName}); 185 final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, locationToSplits.get(hostName)); 186 if (split != null) { 187 return split; 188 } 189 } 190 LOG.log(Level.FINE, "{0} does not host any splits or someone else took splits hosted here. Picking other ones", 191 hostName); 192 final NumberedSplit<InputSplit> split = tryAllocate(nodeDescriptor, evaluatorId); 193 if (split == null) { 194 throw new RuntimeException("Unable to find an input split to evaluator " + evaluatorId); 195 } else { 196 LOG.log(Level.FINE, evaluatorToSplits.toString()); 197 } 198 return split; 199 } 200 201 @Override 202 public int getNumberOfSplits() { 203 return this.totalNumberOfSplits; 204 } 205 206 private Pair<InputSplit[], DistributedDataSetPartition[]> getSplitsAndPartitions( 207 final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition) { 208 final List<InputSplit> inputSplits = new ArrayList<>(); 209 final List<DistributedDataSetPartition> partitions = new ArrayList<>(); 210 for (final Entry<DistributedDataSetPartition, InputSplit[]> entry : splitsPerPartition.entrySet()) { 211 final DistributedDataSetPartition partition = entry.getKey(); 212 final InputSplit[] splits = entry.getValue(); 213 for (final InputSplit split : splits) { 214 inputSplits.add(split); 215 partitions.add(partition); 216 } 217 } 218 return new Pair<>(inputSplits.toArray(new InputSplit[inputSplits.size()]), 219 partitions.toArray(new DistributedDataSetPartition[partitions.size()])); 220 } 221 222 /** 223 * Allocates the first available split into the evaluator. 224 * 225 * @param evaluatorId 226 * the evaluator id 227 * @param value 228 * the queue of splits 229 * @return a numberedSplit or null if it cannot find one 230 */ 231 protected NumberedSplit<InputSplit> allocateSplit(final String evaluatorId, 232 final BlockingQueue<NumberedSplit<InputSplit>> value) { 233 if (value == null) { 234 LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null"); 235 return null; 236 } 237 while (true) { 238 final NumberedSplit<InputSplit> split = value.poll(); 239 if (split == null) { 240 return null; 241 } 242 if (value == unallocatedSplits || unallocatedSplits.remove(split)) { 243 LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue"); 244 final NumberedSplit<InputSplit> old = evaluatorToSplits.putIfAbsent(evaluatorId, split); 245 if (old != null) { 246 throw new RuntimeException("Trying to assign different splits to the same evaluator is not supported"); 247 } else { 248 LOG.log(Level.FINE, "Returning " + split.getIndex()); 249 return split; 250 } 251 } 252 } 253 } 254}