This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.impl;
020
021import org.apache.hadoop.mapred.InputFormat;
022import org.apache.hadoop.mapred.InputSplit;
023import org.apache.reef.annotations.audience.DriverSide;
024import org.apache.reef.io.data.loading.api.DataLoadingService;
025
026import java.io.IOException;
027import java.util.Map;
028import java.util.concurrent.BlockingQueue;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.LinkedBlockingQueue;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Class that tracks the mapping between
037 * evaluators & the data partition assigned
038 * to those evaluators. Its part of the
039 * implementation of a {@link DataLoadingService}
040 * that uses the Hadoop {@link InputFormat} to
041 * partition the data and request resources
042 * accordingly
043 * <p/>
044 * This is an online version which satisfies
045 * requests in a greedy way.
046 *
047 * @param <V>
048 */
049@DriverSide
050public class EvaluatorToPartitionMapper<V extends InputSplit> {
051  private static final Logger LOG = Logger
052      .getLogger(EvaluatorToPartitionMapper.class.getName());
053
054  private final ConcurrentMap<String, BlockingQueue<NumberedSplit<V>>> locationToSplits = new ConcurrentHashMap<>();
055  private final ConcurrentMap<String, NumberedSplit<V>> evaluatorToSplits = new ConcurrentHashMap<>();
056  private final BlockingQueue<NumberedSplit<V>> unallocatedSplits = new LinkedBlockingQueue<>();
057
058  /**
059   * Initializes the locations of splits mapping
060   *
061   * @param splits
062   */
063  public EvaluatorToPartitionMapper(V[] splits) {
064    try {
065      for (int splitNum = 0; splitNum < splits.length; splitNum++) {
066        LOG.log(Level.FINE, "Processing split: " + splitNum);
067        final V split = splits[splitNum];
068        final String[] locations = split.getLocations();
069        final NumberedSplit<V> numberedSplit = new NumberedSplit<V>(split, splitNum);
070        unallocatedSplits.add(numberedSplit);
071        for (final String location : locations) {
072          BlockingQueue<NumberedSplit<V>> newSplitQue = new LinkedBlockingQueue<NumberedSplit<V>>();
073          final BlockingQueue<NumberedSplit<V>> splitQue = locationToSplits.putIfAbsent(location,
074              newSplitQue);
075          if (splitQue != null) {
076            newSplitQue = splitQue;
077          }
078          newSplitQue.add(numberedSplit);
079        }
080      }
081      for (Map.Entry<String, BlockingQueue<NumberedSplit<V>>> locSplit : locationToSplits.entrySet()) {
082        LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString());
083      }
084    } catch (IOException e) {
085      throw new RuntimeException(
086          "Unable to get InputSplits using the specified InputFormat", e);
087    }
088  }
089
090  /**
091   * Get an input split to be assigned to this
092   * evaluator
093   * <p/>
094   * Allocates one if its not already allocated
095   *
096   * @param evaluatorId
097   * @return
098   */
099  public NumberedSplit<V> getInputSplit(final String hostName, final String evaluatorId) {
100    synchronized (evaluatorToSplits) {
101      if (evaluatorToSplits.containsKey(evaluatorId)) {
102        LOG.log(Level.FINE, "Found an already allocated partition");
103        LOG.log(Level.FINE, evaluatorToSplits.toString());
104        return evaluatorToSplits.get(evaluatorId);
105      }
106    }
107    LOG.log(Level.FINE, "allocated partition not found");
108    if (locationToSplits.containsKey(hostName)) {
109      LOG.log(Level.FINE, "Found partitions possibly hosted for " + evaluatorId + " at " + hostName);
110      final NumberedSplit<V> split = allocateSplit(evaluatorId, locationToSplits.get(hostName));
111      LOG.log(Level.FINE, evaluatorToSplits.toString());
112      if (split != null) {
113        return split;
114      }
115    }
116    //pick random
117    LOG.log(
118        Level.FINE,
119        hostName
120            + " does not host any partitions or someone else took partitions hosted here. Picking a random one");
121    final NumberedSplit<V> split = allocateSplit(evaluatorId, unallocatedSplits);
122    LOG.log(Level.FINE, evaluatorToSplits.toString());
123    if (split != null) {
124      return split;
125    }
126    throw new RuntimeException("Unable to find an input partition to evaluator " + evaluatorId);
127  }
128
129  private NumberedSplit<V> allocateSplit(final String evaluatorId,
130                                         final BlockingQueue<NumberedSplit<V>> value) {
131    if (value == null) {
132      LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
133      return null;
134    }
135    while (true) {
136      final NumberedSplit<V> split = value.poll();
137      if (split == null)
138        return null;
139      if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
140        LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
141        final NumberedSplit<V> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
142        if (old != null) {
143          final String msg = "Trying to assign different partitions to the same evaluator " +
144              "is not supported";
145          LOG.severe(msg);
146          throw new RuntimeException(msg);
147        } else {
148          LOG.log(Level.FINE, "Returning " + split.getIndex());
149          return split;
150        }
151      }
152    }
153  }
154}