TAJO-2073: Upgrade parquet-mr to 1.8.1.
Closes #958
diff --git a/CHANGES b/CHANGES
index a932c65..2f31255 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@
IMPROVEMENT
+ TAJO-2073: Upgrade parquet-mr to 1.8.1. (jinho)
+
TAJO-2052: Upgrading ORC reader version. (Jongyoung Park via jaehwa)
TAJO-1940: Implement HBaseTablespace::getTableVolume() method. (hyunsik)
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 6c10a88..50bfa15 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -34,8 +34,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <parquet.version>1.5.0</parquet.version>
- <parquet.format.version>2.1.0</parquet.format.version>
+ <parquet.version>1.8.1</parquet.version>
</properties>
<repositories>
@@ -337,21 +336,11 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-column</artifactId>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-hadoop</artifactId>
- <version>${parquet.version}</version>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-format</artifactId>
- <version>${parquet.format.version}</version>
- </dependency>
- <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index e7b523a..423869d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -29,8 +29,8 @@
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TableStatistics;
import org.apache.tajo.storage.Tuple;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
@@ -121,7 +121,7 @@
}
public long getEstimatedOutputSize() throws IOException {
- return writer.getEstimatedWrittenSize();
+ return writer.getDataSize();
}
/**
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
index d7f753c..0c4749c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -21,9 +21,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.NotImplementedException;
import org.apache.tajo.exception.TajoRuntimeException;
-import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.storage.EmptyTuple;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.fragment.Fragment;
@@ -35,6 +36,10 @@
*/
public class ParquetScanner extends FileScanner {
private TajoParquetReader reader;
+ /** The number of actual read records */
+ private long currentRowCount;
+ private long totalRowCount;
+ private boolean closed;
/**
* Creates a new ParquetScanner.
@@ -58,7 +63,10 @@
if (targets == null) {
targets = schema.toArray();
}
- reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets));
+ reader = new TajoParquetReader(conf, fragment.getPath(), schema, new Schema(targets));
+ totalRowCount = reader.getTotalRowCount();
+ currentRowCount = 0;
+ closed = false;
super.init();
}
@@ -70,6 +78,16 @@
*/
@Override
public Tuple next() throws IOException {
+ // If there is no required column, we just read footer and then return an empty tuple
+ if (targets.length == 0) {
+ if(currentRowCount == totalRowCount) {
+ return null;
+ } else {
+ currentRowCount++;
+ return EmptyTuple.get();
+ }
+ }
+
return reader.read();
}
@@ -78,6 +96,7 @@
*/
@Override
public void reset() throws IOException {
+ throw new TajoRuntimeException(new NotImplementedException());
}
/**
@@ -88,6 +107,7 @@
if (reader != null) {
reader.close();
}
+ closed = true;
}
/**
@@ -112,7 +132,7 @@
@Override
public void setFilter(EvalNode filter) {
- throw new TajoRuntimeException(new UnsupportedException());
+ throw new TajoRuntimeException(new NotImplementedException());
}
/**
@@ -131,7 +151,11 @@
if (!inited) {
return super.getProgress();
} else {
- return reader.getProgress();
+ if (closed) {
+ return 1.0f;
+ } else {
+ return reader.getProgress();
+ }
}
}
}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
index a765f48..8d70c48 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
@@ -18,11 +18,12 @@
package org.apache.tajo.storage.parquet;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.thirdparty.parquet.ParquetReader;
-import parquet.filter.UnboundRecordFilter;
import java.io.IOException;
@@ -32,54 +33,34 @@
* directly.
*/
public class TajoParquetReader extends ParquetReader<Tuple> {
- /**
- * Creates a new TajoParquetReader.
- *
- * @param file The file to read from.
- * @param readSchema Tajo schema of the table.
- */
- public TajoParquetReader(Path file, Schema readSchema) throws IOException {
- super(file, new TajoReadSupport(readSchema));
- }
/**
* Creates a new TajoParquetReader.
*
+ * @param conf the configuration
* @param file The file to read from.
* @param readSchema Tajo schema of the table.
* @param requestedSchema Tajo schema of the projection.
*/
- public TajoParquetReader(Path file, Schema readSchema,
+ public TajoParquetReader(Configuration conf, Path file, Schema readSchema,
Schema requestedSchema) throws IOException {
- super(file, new TajoReadSupport(readSchema, requestedSchema));
+ super(conf, file, new TajoReadSupport(readSchema, requestedSchema));
}
/**
* Creates a new TajoParquetReader.
*
- * @param file The file to read from.
- * @param readSchema Tajo schema of the table.
- * @param recordFilter Record filter.
- */
- public TajoParquetReader(Path file, Schema readSchema,
- UnboundRecordFilter recordFilter)
- throws IOException {
- super(file, new TajoReadSupport(readSchema), recordFilter);
- }
-
- /**
- * Creates a new TajoParquetReader.
- *
+ * @param conf the configuration
* @param file The file to read from.
* @param readSchema Tajo schema of the table.
* @param requestedSchema Tajo schema of the projection.
* @param recordFilter Record filter.
*/
- public TajoParquetReader(Path file, Schema readSchema,
+ public TajoParquetReader(Configuration conf, Path file, Schema readSchema,
Schema requestedSchema,
UnboundRecordFilter recordFilter)
throws IOException {
- super(file, new TajoReadSupport(readSchema, requestedSchema),
+ super(conf, file, new TajoReadSupport(readSchema, requestedSchema),
recordFilter);
}
}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
index 5f220c5..2217ed2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
@@ -21,8 +21,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
index a64e987..a3e45e3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
@@ -19,13 +19,13 @@
package org.apache.tajo.storage.parquet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Log;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.storage.Tuple;
-import parquet.Log;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
import java.util.Map;
@@ -69,7 +69,7 @@
* @return A ReadContext that defines how to read the file.
*/
@Override
- public ReadContext init(InitContext context) {
+ public ReadSupport.ReadContext init(InitContext context) {
if (requestedSchema == null) {
throw new RuntimeException("requestedSchema is null.");
}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7d73021..d3bbed1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -27,12 +27,12 @@
import org.apache.tajo.datum.*;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
-import parquet.io.api.Binary;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.GroupType;
-import parquet.schema.Type;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
index 25610fc..f5a84de 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
@@ -21,9 +21,9 @@
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.Tuple;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
/**
* Materializes a Tajo Tuple from a stream of Parquet data.
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
index 3d573f6..dfe6af8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
@@ -18,14 +18,14 @@
package org.apache.tajo.storage.parquet;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
import java.util.ArrayList;
import java.util.List;
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index e5ad28c..e5d73f5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -24,12 +24,12 @@
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
import org.apache.tajo.storage.Tuple;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
import java.util.HashMap;
import java.util.List;
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 12ab738..e112b0d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -268,7 +268,7 @@
private final long endOffset;
/** The number of actual read records */
- private int recordCount = 0;
+ private long recordCount = 0;
private DelimitedLineReader reader;
private TextLineDeserializer deserializer;
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
deleted file mode 100644
index fcaf2f3..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.ReflectionUtils;
-import parquet.bytes.BytesInput;
-import parquet.hadoop.BadConfigurationException;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-class CodecFactory {
-
- public static class BytesDecompressor {
-
- private final CompressionCodec codec;
- private final Decompressor decompressor;
-
- public BytesDecompressor(CompressionCodec codec) {
- this.codec = codec;
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- decompressor = null;
- }
- }
-
- public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
- final BytesInput decompressed;
- if (codec != null) {
- decompressor.reset();
- InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
- decompressed = BytesInput.from(is, uncompressedSize);
- } else {
- decompressed = bytes;
- }
- return decompressed;
- }
-
- private void release() {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
- }
-
- /**
- * Encapsulates the logic around hadoop compression
- *
- * @author Julien Le Dem
- *
- */
- public static class BytesCompressor {
-
- private final CompressionCodec codec;
- private final Compressor compressor;
- private final ByteArrayOutputStream compressedOutBuffer;
- private final CompressionCodecName codecName;
-
- public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
- this.codecName = codecName;
- this.codec = codec;
- if (codec != null) {
- this.compressor = CodecPool.getCompressor(codec);
- this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
- } else {
- this.compressor = null;
- this.compressedOutBuffer = null;
- }
- }
-
- public BytesInput compress(BytesInput bytes) throws IOException {
- final BytesInput compressedBytes;
- if (codec == null) {
- compressedBytes = bytes;
- } else {
- compressedOutBuffer.reset();
- if (compressor != null) {
- // null compressor for non-native gzip
- compressor.reset();
- }
- CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
- bytes.writeAllTo(cos);
- cos.finish();
- cos.close();
- compressedBytes = BytesInput.from(compressedOutBuffer);
- }
- return compressedBytes;
- }
-
- private void release() {
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- }
- }
-
- public CompressionCodecName getCodecName() {
- return codecName;
- }
-
- }
-
- private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<>();
- private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<>();
- private final Map<String, CompressionCodec> codecByName = new HashMap<>();
- private final Configuration configuration;
-
- public CodecFactory(Configuration configuration) {
- this.configuration = configuration;
- }
-
- /**
- *
- * @param codecName the requested codec
- * @return the corresponding hadoop codec. null if UNCOMPRESSED
- */
- private CompressionCodec getCodec(CompressionCodecName codecName) {
- String codecClassName = codecName.getHadoopCompressionCodecClassName();
- if (codecClassName == null) {
- return null;
- }
- CompressionCodec codec = codecByName.get(codecClassName);
- if (codec != null) {
- return codec;
- }
-
- try {
- Class<?> codecClass = Class.forName(codecClassName);
- codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
- codecByName.put(codecClassName, codec);
- return codec;
- } catch (ClassNotFoundException e) {
- throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
- }
- }
-
- public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
- BytesCompressor comp = compressors.get(codecName);
- if (comp == null) {
- CompressionCodec codec = getCodec(codecName);
- comp = new BytesCompressor(codecName, codec, pageSize);
- compressors.put(codecName, comp);
- }
- return comp;
- }
-
- public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
- BytesDecompressor decomp = decompressors.get(codecName);
- if (decomp == null) {
- CompressionCodec codec = getCodec(codecName);
- decomp = new BytesDecompressor(codec);
- decompressors.put(codecName, decomp);
- }
- return decomp;
- }
-
- public void release() {
- for (BytesCompressor compressor : compressors.values()) {
- compressor.release();
- }
- compressors.clear();
- for (BytesDecompressor decompressor : decompressors.values()) {
- decompressor.release();
- }
- decompressors.clear();
- }
-}
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
deleted file mode 100644
index 6507b80..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.BooleanStatistics;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.DEBUG;
-
-class ColumnChunkPageWriteStore implements PageWriteStore {
- private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
-
- private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-
- private static final class ColumnChunkPageWriter implements PageWriter {
-
- private final ColumnDescriptor path;
- private final BytesCompressor compressor;
-
- private final CapacityByteArrayOutputStream buf;
- private DictionaryPage dictionaryPage;
-
- private long uncompressedLength;
- private long compressedLength;
- private long totalValueCount;
- private int pageCount;
-
- private Set<Encoding> encodings = new HashSet<>();
-
- private Statistics totalStatistics;
-
- private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) {
- this.path = path;
- this.compressor = compressor;
- this.buf = new CapacityByteArrayOutputStream(initialSize);
- this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
- }
-
- @Deprecated
- @Override
- public void writePage(BytesInput bytes,
- int valueCount,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
- long uncompressedSize = bytes.size();
- BytesInput compressedBytes = compressor.compress(bytes);
- long compressedSize = compressedBytes.size();
- BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
- parquetMetadataConverter.writeDataPageHeader(
- (int)uncompressedSize,
- (int)compressedSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- buf);
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
- compressedBytes.writeAllTo(buf);
- encodings.add(rlEncoding);
- encodings.add(dlEncoding);
- encodings.add(valuesEncoding);
- }
-
- @Override
- public void writePage(BytesInput bytes,
- int valueCount,
- Statistics statistics,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
- long uncompressedSize = bytes.size();
- BytesInput compressedBytes = compressor.compress(bytes);
- long compressedSize = compressedBytes.size();
- parquetMetadataConverter.writeDataPageHeader(
- (int)uncompressedSize,
- (int)compressedSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- buf);
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
- this.totalStatistics.mergeStatistics(statistics);
- compressedBytes.writeAllTo(buf);
- encodings.add(rlEncoding);
- encodings.add(dlEncoding);
- encodings.add(valuesEncoding);
- }
-
- @Override
- public long getMemSize() {
- return buf.size();
- }
-
- public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
- writer.startColumn(path, totalValueCount, compressor.getCodecName());
- if (dictionaryPage != null) {
- writer.writeDictionaryPage(dictionaryPage);
- encodings.add(dictionaryPage.getEncoding());
- }
- writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<>(encodings));
- writer.endColumn();
- if (DEBUG) {
- LOG.debug(
- String.format(
- "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
- buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
- + (dictionaryPage != null ? String.format(
- ", dic { %,d entries, %,dB raw, %,dB comp}",
- dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
- : ""));
- }
- encodings.clear();
- pageCount = 0;
- }
-
- @Override
- public long allocatedSize() {
- return buf.getCapacity();
- }
-
- @Override
- public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- if (this.dictionaryPage != null) {
- throw new ParquetEncodingException("Only one dictionary page is allowed");
- }
- BytesInput dictionaryBytes = dictionaryPage.getBytes();
- int uncompressedSize = (int)dictionaryBytes.size();
- BytesInput compressedBytes = compressor.compress(dictionaryBytes);
- this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
- }
-
- @Override
- public String memUsageString(String prefix) {
- return buf.memUsageString(prefix + " ColumnChunkPageWriter");
- }
- }
-
- private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<>();
- private final MessageType schema;
- private final BytesCompressor compressor;
- private final int initialSize;
-
- public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
- this.compressor = compressor;
- this.schema = schema;
- this.initialSize = initialSize;
- }
-
- @Override
- public PageWriter getPageWriter(ColumnDescriptor path) {
- if (!writers.containsKey(path)) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize));
- }
- return writers.get(path);
- }
-
- public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
- List<ColumnDescriptor> columns = schema.getColumns();
- for (ColumnDescriptor columnDescriptor : columns) {
- ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
- pageWriter.writeToFileWriter(writer);
- }
- }
-
-}
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
index 5beba14..1cc37dd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -20,33 +20,44 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.PageReadStore;
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.UnmaterializableRecordCounter;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static java.lang.String.format;
-import static parquet.Log.DEBUG;
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
+/**
+ * This class is borrowed from parquet-mr-1.8.1, but it is modified in order to progress.
+ */
class InternalParquetRecordReader<T> {
private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
- private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+ private ColumnIOFactory columnIOFactory = null;
+ private final Filter filter;
private MessageType requestedSchema;
private MessageType fileSchema;
@@ -57,11 +68,11 @@
private T currentValue;
private long total;
- private int current = 0;
+ private long current = 0;
private int currentBlock = -1;
private ParquetFileReader reader;
- private parquet.io.RecordReader<T> recordReader;
- private UnboundRecordFilter recordFilter;
+ private org.apache.parquet.io.RecordReader<T> recordReader;
+ private boolean strictTypeChecking;
private long totalTimeSpentReadingBytes;
private long totalTimeSpentProcessingRecords;
@@ -70,37 +81,50 @@
private long totalCountLoadedSoFar = 0;
private Path file;
+ private UnmaterializableRecordCounter unmaterializableRecordCounter;
+
+ /**
+ * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+ * @param filter for filtering individual records
+ */
+ public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
+ this.readSupport = readSupport;
+ this.filter = checkNotNull(filter, "filter");
+ }
/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
*/
public InternalParquetRecordReader(ReadSupport<T> readSupport) {
- this(readSupport, null);
+ this(readSupport, FilterCompat.NOOP);
}
/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
* @param filter Optional filter for only returning matching records.
+ * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)}
*/
- public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
- filter) {
- this.readSupport = readSupport;
- this.recordFilter = filter;
+ @Deprecated
+ public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+ this(readSupport, FilterCompat.get(filter));
}
private void checkRead() throws IOException {
if (current == totalCountLoadedSoFar) {
if (current != 0) {
- long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
- totalTimeSpentProcessingRecords += timeAssembling;
- if (DEBUG) LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
- long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
- long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
- long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
- if (DEBUG) LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+ totalTimeSpentProcessingRecords += (System.currentTimeMillis() - startedAssemblingCurrentBlockAt);
+ if (Log.DEBUG) {
+ LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+ final long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
+ if (totalTime != 0) {
+ final long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+ final long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
+ LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+ }
+ }
}
- if (DEBUG) LOG.debug("at row " + current + ". reading next block");
+ if (Log.DEBUG) LOG.debug("at row " + current + ". reading next block");
long t0 = System.currentTimeMillis();
PageReadStore pages = reader.readNextRowGroup();
if (pages == null) {
@@ -109,12 +133,10 @@
long timeSpentReading = System.currentTimeMillis() - t0;
totalTimeSpentReadingBytes += timeSpentReading;
BenchmarkCounter.incrementTime(timeSpentReading);
- if (DEBUG) {
- LOG.debug("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
- LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
- }
- MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
- recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+ if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+ if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+ MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
+ recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
++ currentBlock;
@@ -122,7 +144,9 @@
}
public void close() throws IOException {
- reader.close();
+ if (reader != null) {
+ reader.close();
+ }
}
public Void getCurrentKey() throws IOException, InterruptedException {
@@ -138,24 +162,28 @@
return (float) current / total;
}
- public void initialize(MessageType requestedSchema, MessageType fileSchema,
- Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
+ public void initialize(FileMetaData parquetFileMetadata,
Path file, List<BlockMetaData> blocks, Configuration configuration)
throws IOException {
- this.requestedSchema = requestedSchema;
- this.fileSchema = fileSchema;
+ // initialize a ReadContext for this file
+ Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
+ ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
+ configuration, toSetMultiMap(fileMetadata), fileSchema));
+ this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ this.requestedSchema = readContext.getRequestedSchema();
+ this.fileSchema = parquetFileMetadata.getSchema();
this.file = file;
- this.columnCount = this.requestedSchema.getPaths().size();
+ this.columnCount = requestedSchema.getPaths().size();
this.recordConverter = readSupport.prepareForRead(
- configuration, extraMetadata, fileSchema,
- new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
-
+ configuration, fileMetadata, fileSchema, readContext);
+ this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
List<ColumnDescriptor> columns = requestedSchema.getColumns();
- reader = new ParquetFileReader(configuration, file, blocks, columns);
+ reader = new ParquetFileReader(configuration, parquetFileMetadata, file, blocks, columns);
for (BlockMetaData block : blocks) {
total += block.getRowCount();
}
- if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + total + " records.");
+ this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
+ LOG.info("RecordReader initialized will read a total of " + total + " records.");
}
private boolean contains(GroupType group, String[] path, int index) {
@@ -174,17 +202,55 @@
}
public boolean nextKeyValue() throws IOException, InterruptedException {
- if (current < total) {
+ boolean recordFound = false;
+
+ while (!recordFound) {
+ // no more records left
+ if (current >= total) { return false; }
+
try {
checkRead();
- currentValue = recordReader.read();
- if (DEBUG) LOG.debug("read value: " + currentValue);
current ++;
+
+ try {
+ currentValue = recordReader.read();
+ } catch (RecordMaterializationException e) {
+ // this might throw, but it's fatal if it does.
+ unmaterializableRecordCounter.incErrors(e);
+ if (DEBUG) LOG.debug("skipping a corrupt record");
+ continue;
+ }
+
+ if (recordReader.shouldSkipCurrentRecord()) {
+ // this record is being filtered via the filter2 package
+ if (DEBUG) LOG.debug("skipping record");
+ continue;
+ }
+
+ if (currentValue == null) {
+ // only happens with FilteredRecordReader at end of block
+ current = totalCountLoadedSoFar;
+ if (DEBUG) LOG.debug("filtered record reader reached end of block");
+ continue;
+ }
+
+ recordFound = true;
+
+ if (DEBUG) LOG.debug("read value: " + currentValue);
} catch (RuntimeException e) {
throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
}
- return true;
}
- return false;
+ return true;
+ }
+
+ private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
+ Map<K, Set<V>> setMultiMap = new HashMap<K, Set<V>>();
+ for (Map.Entry<K, V> entry : map.entrySet()) {
+ Set<V> set = new HashSet<V>();
+ set.add(entry.getValue());
+ setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+ }
+ return Collections.unmodifiableMap(setMultiMap);
}
}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
deleted file mode 100644
index da57745..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.lang.String.format;
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-
-class InternalParquetRecordWriter<T> {
- private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
-
- private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
- private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
- private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
- private final ParquetFileWriter w;
- private final WriteSupport<T> writeSupport;
- private final MessageType schema;
- private final Map<String, String> extraMetaData;
- private final int blockSize;
- private final int pageSize;
- private final BytesCompressor compressor;
- private final int dictionaryPageSize;
- private final boolean enableDictionary;
- private final boolean validating;
- private final WriterVersion writerVersion;
-
- private long recordCount = 0;
- private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
-
- private ColumnWriteStoreImpl store;
- private ColumnChunkPageWriteStore pageStore;
-
- /**
- * @param w the file to write to
- * @param writeSupport the class to convert incoming records
- * @param schema the schema of the records
- * @param extraMetaData extra meta data to write in the footer of the file
- * @param blockSize the size of a block in the file (this will be approximate)
- * @param codec the codec used to compress
- */
- public InternalParquetRecordWriter(
- ParquetFileWriter w,
- WriteSupport<T> writeSupport,
- MessageType schema,
- Map<String, String> extraMetaData,
- int blockSize,
- int pageSize,
- BytesCompressor compressor,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- WriterVersion writerVersion) {
- this.w = w;
- this.writeSupport = checkNotNull(writeSupport, "writeSupport");
- this.schema = schema;
- this.extraMetaData = extraMetaData;
- this.blockSize = blockSize;
- this.pageSize = pageSize;
- this.compressor = compressor;
- this.dictionaryPageSize = dictionaryPageSize;
- this.enableDictionary = enableDictionary;
- this.validating = validating;
- this.writerVersion = writerVersion;
- initStore();
- }
-
- private void initStore() {
- // we don't want this number to be too small
- // ideally we divide the block equally across the columns
- // it is unlikely all columns are going to be the same size.
- int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5);
- pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
- // we don't want this number to be too small either
- // ideally, slightly bigger than the page size, but not bigger than the block buffer
- int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
- store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
- MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
- writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
- }
-
- public void close() throws IOException, InterruptedException {
- flushStore();
- w.end(extraMetaData);
- }
-
- public void write(T value) throws IOException, InterruptedException {
- writeSupport.write(value);
- ++ recordCount;
- checkBlockSizeReached();
- }
-
- private void checkBlockSizeReached() throws IOException {
- if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
- long memSize = store.memSize();
- if (memSize > blockSize) {
- if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
- flushStore();
- initStore();
- recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
- } else {
- float recordSize = (float) memSize / recordCount;
- recordCountForNextMemCheck = min(
- max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway
- recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
- );
- if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
- }
- }
- }
-
- public long getEstimatedWrittenSize() throws IOException {
- return w.getPos() + store.memSize();
- }
-
- private void flushStore()
- throws IOException {
- if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
- if (store.allocatedSize() > 3 * blockSize) {
- LOG.warn("Too much memory used: " + store.memUsageString());
- }
- w.startBlock(recordCount);
- store.flush();
- pageStore.flushToFileWriter(w);
- recordCount = 0;
- w.endBlock();
- store = null;
- pageStore = null;
- }
-}
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
deleted file mode 100644
index 7a32329..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import parquet.Log;
-import parquet.Version;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DictionaryPage;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.metadata.*;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.*;
-import java.util.Map.Entry;
-
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
-/**
- * Internal implementation of the Parquet file writer as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileWriter {
- private static final Log LOG = Log.getLog(ParquetFileWriter.class);
-
- public static final String PARQUET_METADATA_FILE = "_metadata";
- public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
- public static final int CURRENT_VERSION = 1;
-
- private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-
- private final MessageType schema;
- private final FSDataOutputStream out;
- private BlockMetaData currentBlock;
- private ColumnChunkMetaData currentColumn;
- private long currentRecordCount;
- private List<BlockMetaData> blocks = new ArrayList<>();
- private long uncompressedLength;
- private long compressedLength;
- private Set<parquet.column.Encoding> currentEncodings;
-
- private CompressionCodecName currentChunkCodec;
- private ColumnPath currentChunkPath;
- private PrimitiveTypeName currentChunkType;
- private long currentChunkFirstDataPage;
- private long currentChunkDictionaryPageOffset;
- private long currentChunkValueCount;
-
- private Statistics currentStatistics;
-
- /**
- * Captures the order in which methods should be called
- *
- * @author Julien Le Dem
- *
- */
- private enum STATE {
- NOT_STARTED {
- STATE start() {
- return STARTED;
- }
- },
- STARTED {
- STATE startBlock() {
- return BLOCK;
- }
- STATE end() {
- return ENDED;
- }
- },
- BLOCK {
- STATE startColumn() {
- return COLUMN;
- }
- STATE endBlock() {
- return STARTED;
- }
- },
- COLUMN {
- STATE endColumn() {
- return BLOCK;
- };
- STATE write() {
- return this;
- }
- },
- ENDED;
-
- STATE start() throws IOException { return error(); }
- STATE startBlock() throws IOException { return error(); }
- STATE startColumn() throws IOException { return error(); }
- STATE write() throws IOException { return error(); }
- STATE endColumn() throws IOException { return error(); }
- STATE endBlock() throws IOException { return error(); }
- STATE end() throws IOException { return error(); }
-
- private final STATE error() throws IOException {
- throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
- }
- }
-
- private STATE state = STATE.NOT_STARTED;
-
- /**
- *
- * @param configuration Configuration
- * @param schema the schema of the data
- * @param file the file to write to
- * @throws java.io.IOException if the file can not be created
- */
- public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
- super();
- this.schema = schema;
- FileSystem fs = file.getFileSystem(configuration);
- this.out = fs.create(file, false);
- }
-
- /**
- * start the file
- * @throws java.io.IOException
- */
- public void start() throws IOException {
- state = state.start();
- if (DEBUG) LOG.debug(out.getPos() + ": start");
- out.write(MAGIC);
- }
-
- /**
- * start a block
- * @param recordCount the record count in this block
- * @throws java.io.IOException
- */
- public void startBlock(long recordCount) throws IOException {
- state = state.startBlock();
- if (DEBUG) LOG.debug(out.getPos() + ": start block");
-// out.write(MAGIC); // TODO: add a magic delimiter
- currentBlock = new BlockMetaData();
- currentRecordCount = recordCount;
- }
-
- /**
- * start a column inside a block
- * @param descriptor the column descriptor
- * @param valueCount the value count in this column
- * @param statistics the statistics in this column
- * @param compressionCodecName
- * @throws java.io.IOException
- */
- public void startColumn(ColumnDescriptor descriptor,
- long valueCount,
- CompressionCodecName compressionCodecName) throws IOException {
- state = state.startColumn();
- if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
- currentEncodings = new HashSet<>();
- currentChunkPath = ColumnPath.get(descriptor.getPath());
- currentChunkType = descriptor.getType();
- currentChunkCodec = compressionCodecName;
- currentChunkValueCount = valueCount;
- currentChunkFirstDataPage = out.getPos();
- compressedLength = 0;
- uncompressedLength = 0;
- // need to know what type of stats to initialize to
- // better way to do this?
- currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
- }
-
- /**
- * writes a dictionary page page
- * @param dictionaryPage the dictionary page
- */
- public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- state = state.write();
- if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
- currentChunkDictionaryPageOffset = out.getPos();
- int uncompressedSize = dictionaryPage.getUncompressedSize();
- int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
- metadataConverter.writeDictionaryPageHeader(
- uncompressedSize,
- compressedPageSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding(),
- out);
- long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
- this.uncompressedLength += uncompressedSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
- dictionaryPage.getBytes().writeAllTo(out);
- currentEncodings.add(dictionaryPage.getEncoding());
- }
-
-
- /**
- * writes a single page
- * @param valueCount count of values
- * @param uncompressedPageSize the size of the data once uncompressed
- * @param bytes the compressed data for the page without header
- * @param rlEncoding encoding of the repetition level
- * @param dlEncoding encoding of the definition level
- * @param valuesEncoding encoding of values
- */
- @Deprecated
- public void writeDataPage(
- int valueCount, int uncompressedPageSize,
- BytesInput bytes,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding) throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
- int compressedPageSize = (int)bytes.size();
- metadataConverter.writeDataPageHeader(
- uncompressedPageSize, compressedPageSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out);
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
- bytes.writeAllTo(out);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
- }
-
- /**
- * writes a single page
- * @param valueCount count of values
- * @param uncompressedPageSize the size of the data once uncompressed
- * @param bytes the compressed data for the page without header
- * @param rlEncoding encoding of the repetition level
- * @param dlEncoding encoding of the definition level
- * @param valuesEncoding encoding of values
- */
- public void writeDataPage(
- int valueCount, int uncompressedPageSize,
- BytesInput bytes,
- Statistics statistics,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding) throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
- int compressedPageSize = (int)bytes.size();
- metadataConverter.writeDataPageHeader(
- uncompressedPageSize, compressedPageSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out);
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
- bytes.writeAllTo(out);
- currentStatistics.mergeStatistics(statistics);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
- }
-
- /**
- * writes a number of pages at once
- * @param bytes bytes to be written including page headers
- * @param uncompressedTotalPageSize total uncompressed size (without page headers)
- * @param compressedTotalPageSize total compressed size (without page headers)
- * @throws java.io.IOException
- */
- void writeDataPages(BytesInput bytes,
- long uncompressedTotalPageSize,
- long compressedTotalPageSize,
- Statistics totalStats,
- List<parquet.column.Encoding> encodings) throws IOException {
- state = state.write();
- if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
- long headersSize = bytes.size() - compressedTotalPageSize;
- this.uncompressedLength += uncompressedTotalPageSize + headersSize;
- this.compressedLength += compressedTotalPageSize + headersSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
- bytes.writeAllTo(out);
- currentEncodings.addAll(encodings);
- currentStatistics = totalStats;
- }
-
- /**
- * end a column (once all rep, def and data have been written)
- * @throws java.io.IOException
- */
- public void endColumn() throws IOException {
- state = state.endColumn();
- if (DEBUG) LOG.debug(out.getPos() + ": end column");
- currentBlock.addColumn(ColumnChunkMetaData.get(
- currentChunkPath,
- currentChunkType,
- currentChunkCodec,
- currentEncodings,
- currentStatistics,
- currentChunkFirstDataPage,
- currentChunkDictionaryPageOffset,
- currentChunkValueCount,
- compressedLength,
- uncompressedLength));
- if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
- currentColumn = null;
- this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
- this.uncompressedLength = 0;
- this.compressedLength = 0;
- }
-
- /**
- * ends a block once all column chunks have been written
- * @throws java.io.IOException
- */
- public void endBlock() throws IOException {
- state = state.endBlock();
- if (DEBUG) LOG.debug(out.getPos() + ": end block");
- currentBlock.setRowCount(currentRecordCount);
- blocks.add(currentBlock);
- currentBlock = null;
- }
-
- /**
- * ends a file once all blocks have been written.
- * closes the file.
- * @param extraMetaData the extra meta data to write in the footer
- * @throws java.io.IOException
- */
- public void end(Map<String, String> extraMetaData) throws IOException {
- state = state.end();
- if (DEBUG) LOG.debug(out.getPos() + ": end");
- ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
- serializeFooter(footer, out);
- out.close();
- }
-
- private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
- long footerIndex = out.getPos();
- parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
- writeFileMetaData(parquetMetadata, out);
- if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
- BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
- out.write(MAGIC);
- }
-
- /**
- * writes a _metadata file
- * @param configuration the configuration to use to get the FileSystem
- * @param outputPath the directory to write the _metadata file to
- * @param footers the list of footers to merge
- * @throws java.io.IOException
- */
- public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
- Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
- FileSystem fs = outputPath.getFileSystem(configuration);
- outputPath = outputPath.makeQualified(fs);
- FSDataOutputStream metadata = fs.create(metaDataPath);
- metadata.write(MAGIC);
- ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
- serializeFooter(metadataFooter, metadata);
- metadata.close();
- }
-
- private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
- String rootPath = root.toString();
- GlobalMetaData fileMetaData = null;
- List<BlockMetaData> blocks = new ArrayList<>();
- for (Footer footer : footers) {
- String path = footer.getFile().toString();
- if (!path.startsWith(rootPath)) {
- throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root);
- }
- path = path.substring(rootPath.length());
- while (path.startsWith("/")) {
- path = path.substring(1);
- }
- fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
- for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
- block.setPath(path);
- blocks.add(block);
- }
- }
- return new ParquetMetadata(fileMetaData.merge(), blocks);
- }
-
- /**
- * @return the current position in the underlying file
- * @throws java.io.IOException
- */
- public long getPos() throws IOException {
- return out.getPos();
- }
-
- /**
- * Will merge the metadata of all the footers together
- * @param footers the list files footers to merge
- * @return the global meta data for all the footers
- */
- static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
- GlobalMetaData fileMetaData = null;
- for (Footer footer : footers) {
- ParquetMetadata currentMetadata = footer.getParquetMetadata();
- fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
- }
- return fileMetaData;
- }
-
- /**
- * Will return the result of merging toMerge into mergedMetadata
- * @param toMerge the metadata toMerge
- * @param mergedMetadata the reference metadata to merge into
- * @return the result of the merge
- */
- static GlobalMetaData mergeInto(
- FileMetaData toMerge,
- GlobalMetaData mergedMetadata) {
- MessageType schema = null;
- Map<String, Set<String>> newKeyValues = new HashMap<>();
- Set<String> createdBy = new HashSet<>();
- if (mergedMetadata != null) {
- schema = mergedMetadata.getSchema();
- newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
- createdBy.addAll(mergedMetadata.getCreatedBy());
- }
- if ((schema == null && toMerge.getSchema() != null)
- || (schema != null && !schema.equals(toMerge.getSchema()))) {
- schema = mergeInto(toMerge.getSchema(), schema);
- }
- for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
- Set<String> values = newKeyValues.get(entry.getKey());
- if (values == null) {
- values = new HashSet<>();
- newKeyValues.put(entry.getKey(), values);
- }
- values.add(entry.getValue());
- }
- createdBy.add(toMerge.getCreatedBy());
- return new GlobalMetaData(
- schema,
- newKeyValues,
- createdBy);
- }
-
- /**
- * will return the result of merging toMerge into mergedSchema
- * @param toMerge the schema to merge into mergedSchema
- * @param mergedSchema the schema to append the fields to
- * @return the resulting schema
- */
- static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
- if (mergedSchema == null) {
- return toMerge;
- }
- return mergedSchema.union(toMerge);
- }
-
-}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
index c353a81..cb903b0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
@@ -22,93 +22,112 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.schema.MessageType;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.schema.MessageType;
import java.io.Closeable;
import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
/**
+ * This class is borrowed from parquet-mr-1.8.1, but it is modified in order to fix the PARQUET-363 and progress.
+ *
* Read records from a Parquet file.
*/
public class ParquetReader<T> implements Closeable {
- private ReadSupport<T> readSupport;
- private UnboundRecordFilter filter;
- private Configuration conf;
- private ReadContext readContext;
- private Iterator<Footer> footersIterator;
+ private final ReadSupport<T> readSupport;
+ private final Configuration conf;
+ private final Iterator<Footer> footersIterator;
+ private final FilterCompat.Filter filter;
+
private InternalParquetRecordReader<T> reader;
- private GlobalMetaData globalMetaData;
+ private long totalRowCount;
/**
* @param file the file to read
* @param readSupport to materialize records
- * @throws java.io.IOException
+ * @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
+ @Deprecated
public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
- this(file, readSupport, null);
+ this(new Configuration(), file, readSupport, FilterCompat.NOOP);
}
/**
* @param conf the configuration
* @param file the file to read
* @param readSupport to materialize records
- * @throws java.io.IOException
+ * @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
+ @Deprecated
public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
- this(conf, file, readSupport, null);
+ this(conf, file, readSupport, FilterCompat.NOOP);
}
/**
* @param file the file to read
* @param readSupport to materialize records
- * @param filter the filter to use to filter records
- * @throws java.io.IOException
+ * @param unboundRecordFilter the filter to use to filter records
+ * @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
- public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
- this(new Configuration(), file, readSupport, filter);
+ @Deprecated
+ public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+ this(new Configuration(), file, readSupport, FilterCompat.get(unboundRecordFilter));
}
/**
* @param conf the configuration
* @param file the file to read
* @param readSupport to materialize records
- * @param filter the filter to use to filter records
- * @throws java.io.IOException
+ * @param unboundRecordFilter the filter to use to filter records
+ * @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+ @Deprecated
+ public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+ this(conf, file, readSupport, FilterCompat.get(unboundRecordFilter));
+ }
+
+ private ParquetReader(Configuration conf,
+ Path file,
+ ReadSupport<T> readSupport,
+ Filter filter) throws IOException {
this.readSupport = readSupport;
- this.filter = filter;
+ this.filter = checkNotNull(filter, "filter");
this.conf = conf;
FileSystem fs = file.getFileSystem(conf);
- List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
- List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+ List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, HiddenFileFilter.INSTANCE));
+ List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
this.footersIterator = footers.iterator();
- globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
- List<BlockMetaData> blocks = new ArrayList<>();
for (Footer footer : footers) {
- blocks.addAll(footer.getParquetMetadata().getBlocks());
+ for(BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+ totalRowCount += block.getRowCount();
+ }
}
-
- MessageType schema = globalMetaData.getSchema();
- Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
- readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
}
/**
* @return the next record or null if finished
- * @throws java.io.IOException
+ * @throws IOException
*/
public T read() throws IOException {
try {
@@ -130,10 +149,17 @@
}
if (footersIterator.hasNext()) {
Footer footer = footersIterator.next();
- reader = new InternalParquetRecordReader<>(readSupport, filter);
- reader.initialize(
- readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
- readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+
+ List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
+
+ MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();
+
+ List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
+ filter, blocks, fileSchema);
+
+ reader = new InternalParquetRecordReader<T>(readSupport, filter);
+ reader.initialize(footer.getParquetMetadata().getFileMetaData(),
+ footer.getFile(), filteredBlocks, conf);
}
}
@@ -145,10 +171,64 @@
}
public float getProgress() {
- if (!footersIterator.hasNext()) {
- return 1.0f;
+ if (reader != null) {
+ return reader.getProgress();
} else {
- return reader != null ? reader.getProgress() : 0.0f;
+ if (!footersIterator.hasNext()) {
+ return 1.0f;
+ } else {
+ return 0.0f;
+ }
+ }
+ }
+
+ public long getTotalRowCount() {
+ return totalRowCount;
+ }
+
+ public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
+ return new Builder<T>(readSupport, path);
+ }
+
+ public static class Builder<T> {
+ private final ReadSupport<T> readSupport;
+ private final Path file;
+ private Filter filter;
+ protected Configuration conf;
+
+ private Builder(ReadSupport<T> readSupport, Path path) {
+ this.readSupport = checkNotNull(readSupport, "readSupport");
+ this.file = checkNotNull(path, "path");
+ this.conf = new Configuration();
+ this.filter = FilterCompat.NOOP;
+ }
+
+ protected Builder(Path path) {
+ this.readSupport = null;
+ this.file = checkNotNull(path, "path");
+ this.conf = new Configuration();
+ this.filter = FilterCompat.NOOP;
+ }
+
+ public Builder<T> withConf(Configuration conf) {
+ this.conf = checkNotNull(conf, "conf");
+ return this;
+ }
+
+ public Builder<T> withFilter(Filter filter) {
+ this.filter = checkNotNull(filter, "filter");
+ return this;
+ }
+
+ protected ReadSupport<T> getReadSupport() {
+ // if readSupport is null, the protected constructor must have been used
+ Preconditions.checkArgument(readSupport != null,
+ "[BUG] Classes that extend Builder should override getReadSupport()");
+ return readSupport;
+ }
+
+ public ParquetReader<T> build() throws IOException {
+ return new ParquetReader<T>(conf, file, getReadSupport(), filter);
}
}
}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
deleted file mode 100644
index 5843c2d..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.column.ParquetProperties;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class ParquetWriter<T> implements Closeable {
-
- public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
- public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
- public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
- CompressionCodecName.UNCOMPRESSED;
- public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
- public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
- public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
- ParquetProperties.WriterVersion.PARQUET_1_0;
-
- private final InternalParquetRecordWriter<T> writer;
-
- /**
- * Create a new ParquetWriter.
- * (with dictionary encoding enabled and validation off)
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @throws java.io.IOException
- * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean)
- */
- public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize,
- DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold (both data and dictionary)
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @throws java.io.IOException
- * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- boolean enableDictionary,
- boolean validating) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @throws java.io.IOException
- * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize,
- dictionaryPageSize, enableDictionary, validating,
- DEFAULT_WRITER_VERSION);
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
- * configuration from the classpath.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
- * @throws java.io.IOException
- * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- ParquetProperties.WriterVersion writerVersion) throws IOException {
- this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
- }
-
- /**
- * Create a new ParquetWriter.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @param compressionCodecName the compression codec to use
- * @param blockSize the block size threshold
- * @param pageSize the page size threshold
- * @param dictionaryPageSize the page size threshold for the dictionary pages
- * @param enableDictionary to turn dictionary encoding on
- * @param validating to turn on validation using the schema
- * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
- * @param conf Hadoop configuration to use while accessing the filesystem
- * @throws java.io.IOException
- */
- public ParquetWriter(
- Path file,
- WriteSupport<T> writeSupport,
- CompressionCodecName compressionCodecName,
- int blockSize,
- int pageSize,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- ParquetProperties.WriterVersion writerVersion,
- Configuration conf) throws IOException {
-
- WriteSupport.WriteContext writeContext = writeSupport.init(conf);
- MessageType schema = writeContext.getSchema();
-
- ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
- fileWriter.start();
-
- CodecFactory codecFactory = new CodecFactory(conf);
- CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0);
- this.writer = new InternalParquetRecordWriter<>(
- fileWriter,
- writeSupport,
- schema,
- writeContext.getExtraMetaData(),
- blockSize,
- pageSize,
- compressor,
- dictionaryPageSize,
- enableDictionary,
- validating,
- writerVersion);
- }
-
- /**
- * Create a new ParquetWriter. The default block size is 50 MB.The default
- * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled.
- *
- * @param file the file to create
- * @param writeSupport the implementation to write a record to a RecordConsumer
- * @throws java.io.IOException
- */
- public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
- this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
- }
-
- public void write(T object) throws IOException {
- try {
- writer.write(object);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- public long getEstimatedWrittenSize() throws IOException {
- return this.writer.getEstimatedWrittenSize();
- }
-
- @Override
- public void close() throws IOException {
- try {
- writer.close();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }}
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index a08dfb9..44d8fdc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -98,6 +98,14 @@
" ]\n" +
"}\n";
+ private static String TEST_EMPTY_FILED_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testEmptySchema\",\n" +
+ " \"fields\": []\n" +
+ "}\n";
+
private static String TEST_MAX_VALUE_AVRO_SCHEMA =
"{\n" +
" \"type\": \"record\",\n" +
@@ -1123,7 +1131,6 @@
public void testLessThanSchemaSize() throws IOException {
/* Internal storage must be same with schema size */
if (internalType || dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)
- || dataFormat.equalsIgnoreCase(BuiltinStorages.PARQUET)
|| dataFormat.equalsIgnoreCase(BuiltinStorages.ORC)) {
return;
}
@@ -1356,4 +1363,59 @@
scanner.close();
assertEquals(1.0f, scanner.getProgress(), 0.0f);
}
+
+ @Test
+ public void testEmptySchema() throws IOException {
+ if (internalType) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("score", Type.FLOAT4);
+
+ TableMeta meta = CatalogUtil.newTableMeta(dataFormat);
+ meta.setPropertySet(CatalogUtil.newDefaultProperty(dataFormat));
+ if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+ meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_PROJECTION_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testEmptySchema.data");
+ FileTablespace sm = TablespaceManager.getLocalFs();
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+
+ Tuple expect = new VTuple(schema.size());
+ expect.put(new Datum[]{
+ DatumFactory.createInt4(Integer.MAX_VALUE),
+ DatumFactory.createInt8(Long.MAX_VALUE),
+ DatumFactory.createFloat4(Float.MAX_VALUE)
+ });
+
+ appender.addTuple(expect);
+ appender.flush();
+ appender.close();
+
+ assertTrue(fs.exists(tablePath));
+ FileStatus status = fs.getFileStatus(tablePath);
+
+ if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+ meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_EMPTY_FILED_AVRO_SCHEMA);
+ }
+
+ //e,g select count(*) from table
+ Schema target = new Schema();
+ assertEquals(0, target.size());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target);
+ scanner.init();
+
+ Tuple tuple = scanner.next();
+ assertNotNull(tuple);
+ assertEquals(0, tuple.size());
+ scanner.close();
+ }
}
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
index 35b9f07..ea223e7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
@@ -25,6 +25,7 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.Tuple;
@@ -90,7 +91,7 @@
writer.write(tuple);
writer.close();
- TajoParquetReader reader = new TajoParquetReader(file, schema);
+ TajoParquetReader reader = new TajoParquetReader(new TajoConf(), file, schema, schema);
tuple = reader.read();
assertNotNull(tuple);
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
index 07451d5..ba3f72e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
@@ -18,12 +18,12 @@
package org.apache.tajo.storage.parquet;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.junit.Test;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
import java.util.ArrayList;
import java.util.List;