001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.impl; 020 021import org.apache.hadoop.mapred.InputSplit; 022import org.apache.reef.annotations.Unstable; 023import org.apache.reef.annotations.audience.DriverSide; 024import org.apache.reef.driver.catalog.NodeDescriptor; 025import org.apache.reef.runtime.common.utils.Constants; 026import org.apache.reef.tang.annotations.Parameter; 027 028import java.util.Collections; 029import java.util.Iterator; 030import java.util.Set; 031import java.util.TreeSet; 032import java.util.concurrent.BlockingQueue; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.LinkedBlockingQueue; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039import javax.inject.Inject; 040 041/** 042 * This is an online version which satisfies requests based on the locations the 043 * users ask the data to be loaded, for multiple data center network topologies. 044 * 045 */ 046@DriverSide 047@Unstable 048public final class MultiDataCenterEvaluatorToPartitionStrategy extends AbstractEvaluatorToPartitionStrategy { 049 private static final Logger LOG = Logger.getLogger(MultiDataCenterEvaluatorToPartitionStrategy.class.getName()); 050 051 /** 052 * Sorted set in reverse order, to keep track of the locations from most to 053 * least specific. For example: [/dc1/room1, /dc1]. 054 */ 055 private Set<String> normalizedLocations; 056 /** 057 * Partial locations where we want to allocate, in case exact match does not work. 058 */ 059 private ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> partialLocationsToSplits; 060 061 062 @Inject 063 MultiDataCenterEvaluatorToPartitionStrategy( 064 @Parameter(JobConfExternalConstructor.InputFormatClass.class) final String inputFormatClassName, 065 @Parameter(DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class) 066 final Set<String> serializedDataPartitions) { 067 super(inputFormatClassName, serializedDataPartitions); 068 } 069 070 /** 071 * Creates the objects to be used in updateLocations and tryAllocate methods. 072 */ 073 @Override 074 protected void setUp() { 075 normalizedLocations = new TreeSet<>(Collections.reverseOrder()); 076 partialLocationsToSplits = new ConcurrentHashMap<>(); 077 } 078 079 /** 080 * {@inheritDoc}. 081 * Saves locationToSplits and partialLocations as well. 082 */ 083 @Override 084 protected void updateLocations(final NumberedSplit<InputSplit> numberedSplit) { 085 final String location = numberedSplit.getLocation(); 086 addLocationMapping(locationToSplits, numberedSplit, location); 087 final String normalizedLocation = normalize(location); 088 addLocationMapping(partialLocationsToSplits, numberedSplit, normalizedLocation); 089 normalizedLocations.add(normalizedLocation); 090 } 091 092 /** 093 * {@inheritDoc}. Tries to allocate on exact rack match, if it cannot, then it 094 * tries to get a partial match using the partialLocations map. 095 */ 096 @Override 097 protected NumberedSplit<InputSplit> tryAllocate(final NodeDescriptor nodeDescriptor, final String evaluatorId) { 098 final String rackName = nodeDescriptor.getRackDescriptor().getName(); 099 LOG.log(Level.FINE, "Trying an exact match on rack name {0}", rackName); 100 if (locationToSplits.containsKey(rackName)) { 101 LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new Object[] {evaluatorId, rackName}); 102 final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, locationToSplits.get(rackName)); 103 if (split != null) { 104 return split; 105 } 106 } 107 LOG.fine("No success, trying based on a partial match on locations"); 108 final Iterator<String> it = normalizedLocations.iterator(); 109 while (it.hasNext()) { 110 final String possibleLocation = it.next(); 111 LOG.log(Level.FINE, "Trying on possible location {0}", possibleLocation); 112 if (rackName.startsWith(possibleLocation)) { 113 LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1} for rack {2}", new Object[] {evaluatorId, 114 possibleLocation, rackName}); 115 final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, 116 partialLocationsToSplits.get(possibleLocation)); 117 if (split != null) { 118 return split; 119 } 120 } 121 } 122 LOG.fine("Nothing found"); 123 return null; 124 } 125 126 private void addLocationMapping(final ConcurrentMap<String, 127 BlockingQueue<NumberedSplit<InputSplit>>> concurrentMap, 128 final NumberedSplit<InputSplit> numberedSplit, final String location) { 129 if (!concurrentMap.containsKey(location)) { 130 final BlockingQueue<NumberedSplit<InputSplit>> newSplitQueue = new LinkedBlockingQueue<>(); 131 concurrentMap.put(location, newSplitQueue); 132 } 133 concurrentMap.get(location).add(numberedSplit); 134 } 135 136 private String normalize(final String location) { 137 String loc = location; 138 // should start with a separator 139 if (!loc.startsWith(Constants.RACK_PATH_SEPARATOR)) { 140 loc = Constants.RACK_PATH_SEPARATOR + loc; 141 } 142 // if it is just /*, return / 143 if (loc.equals(Constants.RACK_PATH_SEPARATOR + Constants.ANY_RACK)) { 144 return Constants.RACK_PATH_SEPARATOR; 145 } 146 // remove the ending ANY or path separator 147 while (loc.endsWith(Constants.ANY_RACK) || loc.endsWith(Constants.RACK_PATH_SEPARATOR)) { 148 loc = loc.substring(0, loc.length() - 1); 149 } 150 return loc; 151 } 152}