This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.experimental.parquet;
020
021import java.io.ByteArrayOutputStream;
022import java.io.File;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.nio.ByteOrder;
026import java.util.logging.Level;
027import java.util.logging.Logger;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031
032import org.apache.avro.Schema;
033import org.apache.avro.file.DataFileWriter;
034import org.apache.avro.generic.GenericDatumWriter;
035import org.apache.avro.generic.GenericRecord;
036import org.apache.avro.io.DatumWriter;
037import org.apache.avro.io.Encoder;
038import org.apache.avro.io.EncoderFactory;
039
040import org.apache.parquet.avro.AvroParquetReader;
041import org.apache.parquet.avro.AvroSchemaConverter;
042import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
043import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
044import org.apache.parquet.hadoop.ParquetFileReader;
045import org.apache.parquet.hadoop.metadata.ParquetMetadata;
046import org.apache.parquet.schema.MessageType;
047import org.apache.reef.tang.annotations.Parameter;
048
049import javax.inject.Inject;
050
051/**
052 * A reader for Parquet files that can serialize data to local disk and return Avro schema and Avro reader.
053 * The intent is not to build a general parquet reader, but to consume data with table-like property.
054 */
055public final class ParquetReader {
056  /**
057   * Standard java logger.
058   */
059  private static final Logger LOG = Logger.getLogger(ParquetReader.class.getName());
060
061  private Path parquetFilePath;
062
063  @Inject
064  private ParquetReader(@Parameter(PathString.class) final String path) throws IOException {
065    parquetFilePath = new Path(new File(path).getAbsolutePath());
066    final Schema schema = createAvroSchema();
067    if (schema.getType() != Schema.Type.RECORD) {
068      LOG.log(Level.SEVERE, "ParquetReader only support Avro record type that can be consumed as a table.");
069      throw new IOException("ParquetReader only support Avro record type that can be consumed as a table.");
070    }
071    for (final Schema.Field f : schema.getFields()) {
072      if (f.schema().getType() == Schema.Type.RECORD) {
073        LOG.log(Level.SEVERE, "ParquetReader doesn't support nested record type for its elements.");
074        throw new IOException("ParquetReader doesn't support nested record type for its elements.");
075      }
076    }
077  }
078
079  /**
080   * Retrieve avro schema from parquet file.
081   * @return avro schema from parquet file.
082   * @throws IOException if the Avro schema couldn't be parsed from the parquet file.
083   */
084  public Schema createAvroSchema() throws IOException {
085    return createAvroSchema(new Configuration(true), NO_FILTER);
086  }
087  
088  /**
089   * Retrieve avro schema from parquet file.
090   * @param configuration Hadoop configuration.
091   * @param filter Filter for Avro metadata.
092   * @return avro schema from parquet file.
093   * @throws IOException if the Avro schema couldn't be parsed from the parquet file.
094   */
095  private Schema createAvroSchema(final Configuration configuration, final MetadataFilter filter) throws IOException {
096    final ParquetMetadata footer = ParquetFileReader.readFooter(configuration, parquetFilePath, filter);
097    final AvroSchemaConverter converter = new AvroSchemaConverter();
098    final MessageType schema = footer.getFileMetaData().getSchema();
099    return converter.convert(schema);
100  }
101  
102  /**
103   * Construct an avro reader from parquet file.
104   * @return avro reader based on the provided parquet file.
105   * @throws IOException if the parquet file couldn't be parsed correctly.
106   */
107  private AvroParquetReader<GenericRecord> createAvroReader() throws IOException {
108    return new AvroParquetReader<GenericRecord>(parquetFilePath);
109  }
110  
111  /**
112   * Serialize Avro data to a local file.
113   * @param file Local destination file for serialization.
114   * @throws IOException if the parquet file couldn't be parsed correctly.
115   */
116  public void serializeToDisk(final File file) throws IOException {
117    final DatumWriter datumWriter = new GenericDatumWriter<GenericRecord>();
118    final DataFileWriter fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
119    final AvroParquetReader<GenericRecord> reader = createAvroReader();
120    fileWriter.create(createAvroSchema(), file);
121
122    GenericRecord record = reader.read();
123    while (record != null) {
124      fileWriter.append(record);
125      record = reader.read();
126    }
127
128    try {
129      reader.close();
130    } catch (IOException ex){
131      LOG.log(Level.SEVERE, ex.getMessage());
132      throw ex;
133    }
134
135    try {
136      fileWriter.close();
137    } catch (IOException ex){
138      LOG.log(Level.SEVERE, ex.getMessage());
139      throw ex;
140    }
141  }
142  
143  /**
144   * Serialize Avro data to a in-memory ByteBuffer.
145   * @return A ByteBuffer that contains avro data.
146   * @throws IOException if the parquet file couldn't be parsed correctly.
147   */
148  public ByteBuffer serializeToByteBuffer() throws IOException {
149    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
150    final Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
151    final DatumWriter writer = new GenericDatumWriter<GenericRecord>();
152    writer.setSchema(createAvroSchema());
153    final AvroParquetReader<GenericRecord> reader = createAvroReader();
154
155    GenericRecord record = reader.read();
156    while (record != null) {
157      writer.write(record, encoder);
158      record = reader.read();
159    }
160
161    try {
162      reader.close();
163    } catch (IOException ex){
164      LOG.log(Level.SEVERE, ex.getMessage());
165      throw ex;
166    }
167
168    encoder.flush();
169    final ByteBuffer buf = ByteBuffer.wrap(stream.toByteArray());
170    buf.order(ByteOrder.LITTLE_ENDIAN);
171    return buf;
172  }
173}