001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.impl; 020 021import org.apache.hadoop.io.Writable; 022import org.apache.hadoop.io.WritableComparable; 023import org.apache.hadoop.mapred.Counters.Counter; 024import org.apache.hadoop.mapred.*; 025import org.apache.reef.annotations.audience.TaskSide; 026import org.apache.reef.io.data.loading.api.DataSet; 027import org.apache.reef.io.network.util.Pair; 028 029import javax.inject.Inject; 030import java.io.IOException; 031import java.util.Iterator; 032 033/** 034 * An implementation of {@link DataSet} that reads records using a RecordReader 035 * encoded inside an InputSplit. 036 * <p> 037 * The input split is injected through an external constructor by deserializing 038 * the input split assigned to this evaluator. 039 * 040 * @param <K> 041 * @param <V> 042 */ 043@TaskSide 044public final class 045 InputFormatDataSet<K extends WritableComparable<K>, V extends Writable> 046 implements DataSet<K, V> { 047 048 private final DummyReporter dummyReporter = new DummyReporter(); 049 private final JobConf jobConf; 050 private final InputFormat<K, V> inputFormat; 051 private final InputSplit split; 052 private RecordReader lastRecordReader = null; 053 054 @Inject 055 public InputFormatDataSet(final InputSplit split, final JobConf jobConf) { 056 this.jobConf = jobConf; 057 this.inputFormat = this.jobConf.getInputFormat(); 058 this.split = split; 059 } 060 061 @Override 062 public Iterator<Pair<K, V>> iterator() { 063 try { 064 065 final RecordReader newRecordReader = 066 this.inputFormat.getRecordReader(this.split, this.jobConf, this.dummyReporter); 067 068 if (newRecordReader == this.lastRecordReader) { 069 throw new RuntimeException("Received the same record reader again. This isn't supported."); 070 } 071 072 this.lastRecordReader = newRecordReader; 073 return new RecordReaderIterator(newRecordReader); 074 075 } catch (final IOException ex) { 076 throw new RuntimeException("Can't instantiate iterator.", ex); 077 } 078 } 079 080 private final class RecordReaderIterator implements Iterator<Pair<K, V>> { 081 082 private final RecordReader<K, V> recordReader; 083 private Pair<K, V> recordPair; 084 private boolean hasNext; 085 086 RecordReaderIterator(final RecordReader<K, V> recordReader) { 087 this.recordReader = recordReader; 088 fetchRecord(); 089 } 090 091 @Override 092 public boolean hasNext() { 093 return this.hasNext; 094 } 095 096 @Override 097 public Pair<K, V> next() { 098 final Pair<K, V> prevRecordPair = this.recordPair; 099 fetchRecord(); 100 return prevRecordPair; 101 } 102 103 @Override 104 public void remove() { 105 throw new UnsupportedOperationException("Remove is not supported on RecordReader iterator"); 106 } 107 108 private void fetchRecord() { 109 this.recordPair = new Pair<>(this.recordReader.createKey(), this.recordReader.createValue()); 110 try { 111 this.hasNext = this.recordReader.next(this.recordPair.getFirst(), this.recordPair.getSecond()); 112 } catch (final IOException ex) { 113 throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", ex); 114 } 115 } 116 } 117 118 private final class DummyReporter implements Reporter { 119 120 @Override 121 public void progress() { 122 } 123 124 @Override 125 public Counter getCounter(final Enum<?> key) { 126 return null; 127 } 128 129 @Override 130 public Counter getCounter(final String group, final String name) { 131 return null; 132 } 133 134 @Override 135 public InputSplit getInputSplit() throws UnsupportedOperationException { 136 throw new UnsupportedOperationException("This is a Fake Reporter"); 137 } 138 139 @Override 140 public float getProgress() { 141 return 0; 142 } 143 144 @Override 145 public void incrCounter(final Enum<?> key, final long amount) { 146 } 147 148 @Override 149 public void incrCounter(final String group, final String counter, final long amount) { 150 } 151 152 @Override 153 public void setStatus(final String status) { 154 } 155 } 156}