001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.experimental.parquet; 020 021import java.io.ByteArrayOutputStream; 022import java.io.File; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.nio.ByteOrder; 026import java.util.logging.Level; 027import java.util.logging.Logger; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031 032import org.apache.avro.Schema; 033import org.apache.avro.file.DataFileWriter; 034import org.apache.avro.generic.GenericDatumWriter; 035import org.apache.avro.generic.GenericRecord; 036import org.apache.avro.io.DatumWriter; 037import org.apache.avro.io.Encoder; 038import org.apache.avro.io.EncoderFactory; 039 040import org.apache.parquet.avro.AvroParquetReader; 041import org.apache.parquet.avro.AvroSchemaConverter; 042import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; 043import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; 044import org.apache.parquet.hadoop.ParquetFileReader; 045import org.apache.parquet.hadoop.metadata.ParquetMetadata; 046import org.apache.parquet.schema.MessageType; 047import org.apache.reef.tang.annotations.Parameter; 048 049import javax.inject.Inject; 050 051/** 052 * A reader for Parquet files that can serialize data to local disk and return Avro schema and Avro reader. 053 * The intent is not to build a general parquet reader, but to consume data with table-like property. 054 */ 055public final class ParquetReader { 056 /** 057 * Standard java logger. 058 */ 059 private static final Logger LOG = Logger.getLogger(ParquetReader.class.getName()); 060 061 private Path parquetFilePath; 062 063 @Inject 064 private ParquetReader(@Parameter(PathString.class) final String path) throws IOException { 065 parquetFilePath = new Path(new File(path).getAbsolutePath()); 066 final Schema schema = createAvroSchema(); 067 if (schema.getType() != Schema.Type.RECORD) { 068 LOG.log(Level.SEVERE, "ParquetReader only support Avro record type that can be consumed as a table."); 069 throw new IOException("ParquetReader only support Avro record type that can be consumed as a table."); 070 } 071 for (final Schema.Field f : schema.getFields()) { 072 if (f.schema().getType() == Schema.Type.RECORD) { 073 LOG.log(Level.SEVERE, "ParquetReader doesn't support nested record type for its elements."); 074 throw new IOException("ParquetReader doesn't support nested record type for its elements."); 075 } 076 } 077 } 078 079 /** 080 * Retrieve avro schema from parquet file. 081 * @return avro schema from parquet file. 082 * @throws IOException if the Avro schema couldn't be parsed from the parquet file. 083 */ 084 public Schema createAvroSchema() throws IOException { 085 return createAvroSchema(new Configuration(true), NO_FILTER); 086 } 087 088 /** 089 * Retrieve avro schema from parquet file. 090 * @param configuration Hadoop configuration. 091 * @param filter Filter for Avro metadata. 092 * @return avro schema from parquet file. 093 * @throws IOException if the Avro schema couldn't be parsed from the parquet file. 094 */ 095 private Schema createAvroSchema(final Configuration configuration, final MetadataFilter filter) throws IOException { 096 final ParquetMetadata footer = ParquetFileReader.readFooter(configuration, parquetFilePath, filter); 097 final AvroSchemaConverter converter = new AvroSchemaConverter(); 098 final MessageType schema = footer.getFileMetaData().getSchema(); 099 return converter.convert(schema); 100 } 101 102 /** 103 * Construct an avro reader from parquet file. 104 * @return avro reader based on the provided parquet file. 105 * @throws IOException if the parquet file couldn't be parsed correctly. 106 */ 107 private AvroParquetReader<GenericRecord> createAvroReader() throws IOException { 108 return new AvroParquetReader<GenericRecord>(parquetFilePath); 109 } 110 111 /** 112 * Serialize Avro data to a local file. 113 * @param file Local destination file for serialization. 114 * @throws IOException if the parquet file couldn't be parsed correctly. 115 */ 116 public void serializeToDisk(final File file) throws IOException { 117 final DatumWriter datumWriter = new GenericDatumWriter<GenericRecord>(); 118 final DataFileWriter fileWriter = new DataFileWriter<GenericRecord>(datumWriter); 119 final AvroParquetReader<GenericRecord> reader = createAvroReader(); 120 fileWriter.create(createAvroSchema(), file); 121 122 GenericRecord record = reader.read(); 123 while (record != null) { 124 fileWriter.append(record); 125 record = reader.read(); 126 } 127 128 try { 129 reader.close(); 130 } catch (IOException ex){ 131 LOG.log(Level.SEVERE, ex.getMessage()); 132 throw ex; 133 } 134 135 try { 136 fileWriter.close(); 137 } catch (IOException ex){ 138 LOG.log(Level.SEVERE, ex.getMessage()); 139 throw ex; 140 } 141 } 142 143 /** 144 * Serialize Avro data to a in-memory ByteBuffer. 145 * @return A ByteBuffer that contains avro data. 146 * @throws IOException if the parquet file couldn't be parsed correctly. 147 */ 148 public ByteBuffer serializeToByteBuffer() throws IOException { 149 final ByteArrayOutputStream stream = new ByteArrayOutputStream(); 150 final Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null); 151 final DatumWriter writer = new GenericDatumWriter<GenericRecord>(); 152 writer.setSchema(createAvroSchema()); 153 final AvroParquetReader<GenericRecord> reader = createAvroReader(); 154 155 GenericRecord record = reader.read(); 156 while (record != null) { 157 writer.write(record, encoder); 158 record = reader.read(); 159 } 160 161 try { 162 reader.close(); 163 } catch (IOException ex){ 164 LOG.log(Level.SEVERE, ex.getMessage()); 165 throw ex; 166 } 167 168 encoder.flush(); 169 final ByteBuffer buf = ByteBuffer.wrap(stream.toByteArray()); 170 buf.order(ByteOrder.LITTLE_ENDIAN); 171 return buf; 172 } 173}