This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.impl;
020
021import org.apache.hadoop.mapred.InputSplit;
022import org.apache.hadoop.mapred.JobConf;
023import org.apache.reef.annotations.audience.DriverSide;
024import org.apache.reef.driver.context.ActiveContext;
025import org.apache.reef.driver.context.ContextConfiguration;
026import org.apache.reef.driver.context.ServiceConfiguration;
027import org.apache.reef.driver.evaluator.AllocatedEvaluator;
028import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
029import org.apache.reef.io.data.loading.api.DataLoadingService;
030import org.apache.reef.io.data.loading.api.DataSet;
031import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;
032import org.apache.reef.tang.Configuration;
033import org.apache.reef.tang.Tang;
034import org.apache.reef.tang.annotations.Parameter;
035import org.apache.reef.tang.exceptions.BindException;
036
037import javax.inject.Inject;
038
039import java.util.Random;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043/**
044 * An implementation of {@link DataLoadingService}
045 * that uses the Hadoop {@link org.apache.hadoop.mapred.InputFormat} to find
046 * partitions of data and request resources.
047 * <p>
048 * The InputFormat is taken from the job configurations
049 * <p>
050 * The {@link EvaluatorToPartitionStrategy} is injected via Tang,
051 * in order to support different ways to map evaluators to data
052 */
053@DriverSide
054public class InputFormatLoadingService<K, V> implements DataLoadingService {
055
056  private static final Logger LOG = Logger.getLogger(InputFormatLoadingService.class.getName());
057
058  private static final String DATA_LOAD_CONTEXT_PREFIX = "DataLoadContext-";
059
060  private static final String COMPUTE_CONTEXT_PREFIX =
061      "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-";
062
063  private final EvaluatorToPartitionStrategy<InputSplit> evaluatorToPartitionStrategy;
064
065  private final boolean inMemory;
066
067  private final String inputFormatClass;
068
069  @Inject
070  public InputFormatLoadingService(
071      final EvaluatorToPartitionStrategy<InputSplit> evaluatorToPartitionStrategy,
072      @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) final boolean inMemory,
073      @Parameter(JobConfExternalConstructor.InputFormatClass.class) final String inputFormatClass) {
074    this.inMemory = inMemory;
075    this.inputFormatClass = inputFormatClass;
076    this.evaluatorToPartitionStrategy = evaluatorToPartitionStrategy;
077  }
078
079  /**
080   * This method actually returns the number of splits in all partition of the data.
081   * We should probably need to rename it in the future
082   */
083  @Override
084  public int getNumberOfPartitions() {
085    return evaluatorToPartitionStrategy.getNumberOfSplits();
086  }
087
088  @Override
089  public Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator) {
090
091    final NumberedSplit<InputSplit> numberedSplit =
092        this.evaluatorToPartitionStrategy.getInputSplit(
093            allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(),
094            allocatedEvaluator.getId());
095
096    return ContextConfiguration.CONF
097        .set(ContextConfiguration.IDENTIFIER, DATA_LOAD_CONTEXT_PREFIX + numberedSplit.getIndex())
098        .build();
099  }
100
101  @Override
102  public Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator) {
103
104    try {
105
106      final NumberedSplit<InputSplit> numberedSplit =
107          this.evaluatorToPartitionStrategy.getInputSplit(
108              allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(),
109              allocatedEvaluator.getId());
110
111      final Configuration serviceConfiguration = ServiceConfiguration.CONF
112          .set(ServiceConfiguration.SERVICES,
113              this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
114          .build();
115
116      return Tang.Factory.getTang().newConfigurationBuilder(serviceConfiguration)
117          .bindImplementation(
118              DataSet.class,
119              this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
120          .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
121          .bindNamedParameter(JobConfExternalConstructor.InputPath.class, numberedSplit.getPath())
122          .bindNamedParameter(
123              InputSplitExternalConstructor.SerializedInputSplit.class,
124              WritableSerializer.serialize(numberedSplit.getEntry()))
125          .bindConstructor(InputSplit.class, InputSplitExternalConstructor.class)
126          .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
127          .build();
128
129    } catch (final BindException ex) {
130      final String evalId = allocatedEvaluator.getId();
131      final String msg = "Unable to create configuration for evaluator " + evalId;
132      LOG.log(Level.WARNING, msg, ex);
133      throw new RuntimeException(msg, ex);
134    }
135  }
136
137  @Override
138  public String getComputeContextIdPrefix() {
139    return COMPUTE_CONTEXT_PREFIX;
140  }
141
142  @Override
143  public boolean isComputeContext(final ActiveContext context) {
144    return context.getId().startsWith(COMPUTE_CONTEXT_PREFIX);
145  }
146
147  @Override
148  public boolean isDataLoadedContext(final ActiveContext context) {
149    return context.getId().startsWith(DATA_LOAD_CONTEXT_PREFIX);
150  }
151}