001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.impl; 020 021import org.apache.hadoop.mapred.InputFormat; 022import org.apache.hadoop.mapred.InputSplit; 023import org.apache.reef.annotations.audience.DriverSide; 024import org.apache.reef.io.data.loading.api.DataLoadingService; 025 026import java.io.IOException; 027import java.util.Map; 028import java.util.concurrent.BlockingQueue; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031import java.util.concurrent.LinkedBlockingQueue; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Class that tracks the mapping between 037 * evaluators & the data partition assigned 038 * to those evaluators. Its part of the 039 * implementation of a {@link DataLoadingService} 040 * that uses the Hadoop {@link InputFormat} to 041 * partition the data and request resources 042 * accordingly 043 * <p/> 044 * This is an online version which satisfies 045 * requests in a greedy way. 046 * 047 * @param <V> 048 */ 049@DriverSide 050public class EvaluatorToPartitionMapper<V extends InputSplit> { 051 private static final Logger LOG = Logger 052 .getLogger(EvaluatorToPartitionMapper.class.getName()); 053 054 private final ConcurrentMap<String, BlockingQueue<NumberedSplit<V>>> locationToSplits = new ConcurrentHashMap<>(); 055 private final ConcurrentMap<String, NumberedSplit<V>> evaluatorToSplits = new ConcurrentHashMap<>(); 056 private final BlockingQueue<NumberedSplit<V>> unallocatedSplits = new LinkedBlockingQueue<>(); 057 058 /** 059 * Initializes the locations of splits mapping 060 * 061 * @param splits 062 */ 063 public EvaluatorToPartitionMapper(V[] splits) { 064 try { 065 for (int splitNum = 0; splitNum < splits.length; splitNum++) { 066 LOG.log(Level.FINE, "Processing split: " + splitNum); 067 final V split = splits[splitNum]; 068 final String[] locations = split.getLocations(); 069 final NumberedSplit<V> numberedSplit = new NumberedSplit<V>(split, splitNum); 070 unallocatedSplits.add(numberedSplit); 071 for (final String location : locations) { 072 BlockingQueue<NumberedSplit<V>> newSplitQue = new LinkedBlockingQueue<NumberedSplit<V>>(); 073 final BlockingQueue<NumberedSplit<V>> splitQue = locationToSplits.putIfAbsent(location, 074 newSplitQue); 075 if (splitQue != null) { 076 newSplitQue = splitQue; 077 } 078 newSplitQue.add(numberedSplit); 079 } 080 } 081 for (Map.Entry<String, BlockingQueue<NumberedSplit<V>>> locSplit : locationToSplits.entrySet()) { 082 LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString()); 083 } 084 } catch (IOException e) { 085 throw new RuntimeException( 086 "Unable to get InputSplits using the specified InputFormat", e); 087 } 088 } 089 090 /** 091 * Get an input split to be assigned to this 092 * evaluator 093 * <p/> 094 * Allocates one if its not already allocated 095 * 096 * @param evaluatorId 097 * @return 098 */ 099 public NumberedSplit<V> getInputSplit(final String hostName, final String evaluatorId) { 100 synchronized (evaluatorToSplits) { 101 if (evaluatorToSplits.containsKey(evaluatorId)) { 102 LOG.log(Level.FINE, "Found an already allocated partition"); 103 LOG.log(Level.FINE, evaluatorToSplits.toString()); 104 return evaluatorToSplits.get(evaluatorId); 105 } 106 } 107 LOG.log(Level.FINE, "allocated partition not found"); 108 if (locationToSplits.containsKey(hostName)) { 109 LOG.log(Level.FINE, "Found partitions possibly hosted for " + evaluatorId + " at " + hostName); 110 final NumberedSplit<V> split = allocateSplit(evaluatorId, locationToSplits.get(hostName)); 111 LOG.log(Level.FINE, evaluatorToSplits.toString()); 112 if (split != null) { 113 return split; 114 } 115 } 116 //pick random 117 LOG.log( 118 Level.FINE, 119 hostName 120 + " does not host any partitions or someone else took partitions hosted here. Picking a random one"); 121 final NumberedSplit<V> split = allocateSplit(evaluatorId, unallocatedSplits); 122 LOG.log(Level.FINE, evaluatorToSplits.toString()); 123 if (split != null) { 124 return split; 125 } 126 throw new RuntimeException("Unable to find an input partition to evaluator " + evaluatorId); 127 } 128 129 private NumberedSplit<V> allocateSplit(final String evaluatorId, 130 final BlockingQueue<NumberedSplit<V>> value) { 131 if (value == null) { 132 LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null"); 133 return null; 134 } 135 while (true) { 136 final NumberedSplit<V> split = value.poll(); 137 if (split == null) 138 return null; 139 if (value == unallocatedSplits || unallocatedSplits.remove(split)) { 140 LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue"); 141 final NumberedSplit<V> old = evaluatorToSplits.putIfAbsent(evaluatorId, split); 142 if (old != null) { 143 final String msg = "Trying to assign different partitions to the same evaluator " + 144 "is not supported"; 145 LOG.severe(msg); 146 throw new RuntimeException(msg); 147 } else { 148 LOG.log(Level.FINE, "Returning " + split.getIndex()); 149 return split; 150 } 151 } 152 } 153 } 154}