This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.impl;
020
021import org.apache.hadoop.io.Writable;
022import org.apache.hadoop.io.WritableComparable;
023import org.apache.hadoop.mapred.Counters.Counter;
024import org.apache.hadoop.mapred.*;
025import org.apache.reef.annotations.audience.TaskSide;
026import org.apache.reef.io.data.loading.api.DataSet;
027import org.apache.reef.io.network.util.Pair;
028
029import javax.inject.Inject;
030import java.io.IOException;
031import java.util.Iterator;
032
033/**
034 * An implementation of {@link DataSet} that reads records using a RecordReader
035 * encoded inside an InputSplit.
036 * <p>
037 * The input split is injected through an external constructor by deserializing
038 * the input split assigned to this evaluator.
039 *
040 * @param <K>
041 * @param <V>
042 */
043@TaskSide
044public final class
045    InputFormatDataSet<K extends WritableComparable<K>, V extends Writable>
046    implements DataSet<K, V> {
047
048  private final DummyReporter dummyReporter = new DummyReporter();
049  private final JobConf jobConf;
050  private final InputFormat<K, V> inputFormat;
051  private final InputSplit split;
052  private RecordReader lastRecordReader = null;
053
054  @Inject
055  public InputFormatDataSet(final InputSplit split, final JobConf jobConf) {
056    this.jobConf = jobConf;
057    this.inputFormat = this.jobConf.getInputFormat();
058    this.split = split;
059  }
060
061  @Override
062  public Iterator<Pair<K, V>> iterator() {
063    try {
064
065      final RecordReader newRecordReader =
066          this.inputFormat.getRecordReader(this.split, this.jobConf, this.dummyReporter);
067
068      if (newRecordReader == this.lastRecordReader) {
069        throw new RuntimeException("Received the same record reader again. This isn't supported.");
070      }
071
072      this.lastRecordReader = newRecordReader;
073      return new RecordReaderIterator(newRecordReader);
074
075    } catch (final IOException ex) {
076      throw new RuntimeException("Can't instantiate iterator.", ex);
077    }
078  }
079
080  private final class RecordReaderIterator implements Iterator<Pair<K, V>> {
081
082    private final RecordReader<K, V> recordReader;
083    private Pair<K, V> recordPair;
084    private boolean hasNext;
085
086    RecordReaderIterator(final RecordReader<K, V> recordReader) {
087      this.recordReader = recordReader;
088      fetchRecord();
089    }
090
091    @Override
092    public boolean hasNext() {
093      return this.hasNext;
094    }
095
096    @Override
097    public Pair<K, V> next() {
098      final Pair<K, V> prevRecordPair = this.recordPair;
099      fetchRecord();
100      return prevRecordPair;
101    }
102
103    @Override
104    public void remove() {
105      throw new UnsupportedOperationException("Remove is not supported on RecordReader iterator");
106    }
107
108    private void fetchRecord() {
109      this.recordPair = new Pair<>(this.recordReader.createKey(), this.recordReader.createValue());
110      try {
111        this.hasNext = this.recordReader.next(this.recordPair.getFirst(), this.recordPair.getSecond());
112      } catch (final IOException ex) {
113        throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", ex);
114      }
115    }
116  }
117
118  private final class DummyReporter implements Reporter {
119
120    @Override
121    public void progress() {
122    }
123
124    @Override
125    public Counter getCounter(final Enum<?> key) {
126      return null;
127    }
128
129    @Override
130    public Counter getCounter(final String group, final String name) {
131      return null;
132    }
133
134    @Override
135    public InputSplit getInputSplit() throws UnsupportedOperationException {
136      throw new UnsupportedOperationException("This is a Fake Reporter");
137    }
138
139    @Override
140    public float getProgress() {
141      return 0;
142    }
143
144    @Override
145    public void incrCounter(final Enum<?> key, final long amount) {
146    }
147
148    @Override
149    public void incrCounter(final String group, final String counter, final long amount) {
150    }
151
152    @Override
153    public void setStatus(final String status) {
154    }
155  }
156}