Merge remote-tracking branch 'apache/master' into bloom-filter
Conflicts:
parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
pom.xml
diff --git a/.travis.yml b/.travis.yml
index fae25f8..77b16d9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,8 +1,11 @@
language: java
-jdk: openjdk8
before_install:
- bash dev/travis-before_install.sh
+jdk:
+ - openjdk8
+ - openjdk11
+
env:
- HADOOP_PROFILE=default TEST_CODECS=uncompressed,brotli
- HADOOP_PROFILE=default TEST_CODECS=gzip,snappy
diff --git a/CHANGES.md b/CHANGES.md
index 6dae12b..12c1483 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -25,14 +25,15 @@
#### Bug
-* [PARQUET-1364](https://issues.apache.org/jira/browse/PARQUET-1364) - Column Indexes: Invalid row indexes for pages starting with nulls
* [PARQUET-138](https://issues.apache.org/jira/browse/PARQUET-138) - Parquet should allow a merge between required and optional schemas
* [PARQUET-952](https://issues.apache.org/jira/browse/PARQUET-952) - Avro union with single type fails with 'is not a group'
* [PARQUET-1128](https://issues.apache.org/jira/browse/PARQUET-1128) - \[Java\] Upgrade the Apache Arrow version to 0.8.0 for SchemaConverter
+* [PARQUET-1281](https://issues.apache.org/jira/browse/PARQUET-1281) - Jackson dependency
* [PARQUET-1285](https://issues.apache.org/jira/browse/PARQUET-1285) - \[Java\] SchemaConverter should not convert from TimeUnit.SECOND AND TimeUnit.NANOSECOND of Arrow
* [PARQUET-1293](https://issues.apache.org/jira/browse/PARQUET-1293) - Build failure when using Java 8 lambda expressions
* [PARQUET-1296](https://issues.apache.org/jira/browse/PARQUET-1296) - Travis kills build after 10 minutes, because "no output was received"
* [PARQUET-1297](https://issues.apache.org/jira/browse/PARQUET-1297) - \[Java\] SchemaConverter should not convert from Timestamp(TimeUnit.SECOND) and Timestamp(TimeUnit.NANOSECOND) of Arrow
+* [PARQUET-1303](https://issues.apache.org/jira/browse/PARQUET-1303) - Avro reflect @Stringable field write error if field not instanceof CharSequence
* [PARQUET-1304](https://issues.apache.org/jira/browse/PARQUET-1304) - Release 1.10 contains breaking changes for Hive
* [PARQUET-1305](https://issues.apache.org/jira/browse/PARQUET-1305) - Backward incompatible change introduced in 1.8
* [PARQUET-1309](https://issues.apache.org/jira/browse/PARQUET-1309) - Parquet Java uses incorrect stats and dictionary filter properties
@@ -40,13 +41,13 @@
* [PARQUET-1317](https://issues.apache.org/jira/browse/PARQUET-1317) - ParquetMetadataConverter throw NPE
* [PARQUET-1341](https://issues.apache.org/jira/browse/PARQUET-1341) - Null count is suppressed when columns have no min or max and use unsigned sort order
* [PARQUET-1344](https://issues.apache.org/jira/browse/PARQUET-1344) - Type builders don't honor new logical types
-* [PARQUET-1351](https://issues.apache.org/jira/browse/PARQUET-1351) - Travis builds fail for parquet-format
* [PARQUET-1368](https://issues.apache.org/jira/browse/PARQUET-1368) - ParquetFileReader should close its input stream for the failure in constructor
* [PARQUET-1371](https://issues.apache.org/jira/browse/PARQUET-1371) - Time/Timestamp UTC normalization parameter doesn't work
* [PARQUET-1407](https://issues.apache.org/jira/browse/PARQUET-1407) - Data loss on duplicate values with AvroParquetWriter/Reader
* [PARQUET-1417](https://issues.apache.org/jira/browse/PARQUET-1417) - BINARY\_AS\_SIGNED\_INTEGER\_COMPARATOR fails with IOBE for the same arrays with the different length
* [PARQUET-1421](https://issues.apache.org/jira/browse/PARQUET-1421) - InternalParquetRecordWriter logs debug messages at the INFO level
* [PARQUET-1440](https://issues.apache.org/jira/browse/PARQUET-1440) - Parquet-tools: Decimal values stored in an int32 or int64 in the parquet file aren't displayed with their proper scale
+* [PARQUET-1441](https://issues.apache.org/jira/browse/PARQUET-1441) - SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter
* [PARQUET-1456](https://issues.apache.org/jira/browse/PARQUET-1456) - Use page index, ParquetFileReader throw ArrayIndexOutOfBoundsException
* [PARQUET-1460](https://issues.apache.org/jira/browse/PARQUET-1460) - Fix javadoc errors and include javadoc checking in Travis checks
* [PARQUET-1461](https://issues.apache.org/jira/browse/PARQUET-1461) - Third party code does not compile after parquet-mr minor version update
@@ -56,6 +57,9 @@
* [PARQUET-1478](https://issues.apache.org/jira/browse/PARQUET-1478) - Can't read spec compliant, 3-level lists via parquet-proto
* [PARQUET-1480](https://issues.apache.org/jira/browse/PARQUET-1480) - INT96 to avro not yet implemented error should mention deprecation
* [PARQUET-1485](https://issues.apache.org/jira/browse/PARQUET-1485) - Snappy Decompressor/Compressor may cause direct memory leak
+* [PARQUET-1488](https://issues.apache.org/jira/browse/PARQUET-1488) - UserDefinedPredicate throw NPE
+* [PARQUET-1496](https://issues.apache.org/jira/browse/PARQUET-1496) - \[Java\] Update Scala for JDK 11 compatibility
+* [PARQUET-1497](https://issues.apache.org/jira/browse/PARQUET-1497) - \[Java\] javax annotations dependency missing for Java 11
* [PARQUET-1498](https://issues.apache.org/jira/browse/PARQUET-1498) - \[Java\] Add instructions to install thrift via homebrew
* [PARQUET-1510](https://issues.apache.org/jira/browse/PARQUET-1510) - Dictionary filter skips null values when evaluating not-equals.
* [PARQUET-1514](https://issues.apache.org/jira/browse/PARQUET-1514) - ParquetFileWriter Records Compressed Bytes instead of Uncompressed Bytes
@@ -63,7 +67,16 @@
* [PARQUET-1529](https://issues.apache.org/jira/browse/PARQUET-1529) - Shade fastutil in all modules where used
* [PARQUET-1531](https://issues.apache.org/jira/browse/PARQUET-1531) - Page row count limit causes empty pages to be written from MessageColumnIO
* [PARQUET-1533](https://issues.apache.org/jira/browse/PARQUET-1533) - TestSnappy() throws OOM exception with Parquet-1485 change
+* [PARQUET-1534](https://issues.apache.org/jira/browse/PARQUET-1534) - \[parquet-cli\] Argument error: Illegal character in opaque part at index 2 on Windows
* [PARQUET-1544](https://issues.apache.org/jira/browse/PARQUET-1544) - Possible over-shading of modules
+* [PARQUET-1550](https://issues.apache.org/jira/browse/PARQUET-1550) - CleanUtil does not work in Java 11
+* [PARQUET-1555](https://issues.apache.org/jira/browse/PARQUET-1555) - Bump snappy-java to 1.1.7.3
+* [PARQUET-1596](https://issues.apache.org/jira/browse/PARQUET-1596) - PARQUET-1375 broke parquet-cli's to-avro command
+* [PARQUET-1600](https://issues.apache.org/jira/browse/PARQUET-1600) - Fix shebang in parquet-benchmarks/run.sh
+* [PARQUET-1615](https://issues.apache.org/jira/browse/PARQUET-1615) - getRecordWriter shouldn't hardcode CREAT mode when new ParquetFileWriter
+* [PARQUET-1637](https://issues.apache.org/jira/browse/PARQUET-1637) - Builds are failing because default jdk changed to openjdk11 on Travis
+* [PARQUET-1644](https://issues.apache.org/jira/browse/PARQUET-1644) - Clean up some benchmark code and docs.
+* [PARQUET-1691](https://issues.apache.org/jira/browse/PARQUET-1691) - Build fails due to missing hadoop-lzo
#### New Feature
@@ -73,11 +86,13 @@
#### Improvement
+* [PARQUET-1135](https://issues.apache.org/jira/browse/PARQUET-1135) - upgrade thrift and protobuf dependencies
* [PARQUET-1280](https://issues.apache.org/jira/browse/PARQUET-1280) - \[parquet-protobuf\] Use maven protoc plugin
* [PARQUET-1321](https://issues.apache.org/jira/browse/PARQUET-1321) - LogicalTypeAnnotation.LogicalTypeAnnotationVisitor#visit methods should have a return value
* [PARQUET-1335](https://issues.apache.org/jira/browse/PARQUET-1335) - Logical type names in parquet-mr are not consistent with parquet-format
* [PARQUET-1336](https://issues.apache.org/jira/browse/PARQUET-1336) - PrimitiveComparator should implements Serializable
* [PARQUET-1365](https://issues.apache.org/jira/browse/PARQUET-1365) - Don't write page level statistics
+* [PARQUET-1375](https://issues.apache.org/jira/browse/PARQUET-1375) - Upgrade to supported version of Jackson
* [PARQUET-1383](https://issues.apache.org/jira/browse/PARQUET-1383) - Parquet tools should indicate UTC parameter for time/timestamp types
* [PARQUET-1390](https://issues.apache.org/jira/browse/PARQUET-1390) - \[Java\] Upgrade to Arrow 0.10.0
* [PARQUET-1399](https://issues.apache.org/jira/browse/PARQUET-1399) - Move parquet-mr related code from parquet-format
@@ -85,6 +100,8 @@
* [PARQUET-1414](https://issues.apache.org/jira/browse/PARQUET-1414) - Limit page size based on maximum row count
* [PARQUET-1418](https://issues.apache.org/jira/browse/PARQUET-1418) - Run integration tests in Travis
* [PARQUET-1435](https://issues.apache.org/jira/browse/PARQUET-1435) - Benchmark filtering column-indexes
+* [PARQUET-1444](https://issues.apache.org/jira/browse/PARQUET-1444) - Prefer ArrayList over LinkedList
+* [PARQUET-1445](https://issues.apache.org/jira/browse/PARQUET-1445) - Remove Files.java
* [PARQUET-1462](https://issues.apache.org/jira/browse/PARQUET-1462) - Allow specifying new development version in prepare-release.sh
* [PARQUET-1466](https://issues.apache.org/jira/browse/PARQUET-1466) - Upgrade to the latest guava 27.0-jre
* [PARQUET-1474](https://issues.apache.org/jira/browse/PARQUET-1474) - Less verbose and lower level logging for missing column/offset indexes
@@ -93,8 +110,9 @@
* [PARQUET-1489](https://issues.apache.org/jira/browse/PARQUET-1489) - Insufficient documentation for UserDefinedPredicate.keep(T)
* [PARQUET-1490](https://issues.apache.org/jira/browse/PARQUET-1490) - Add branch-specific Travis steps
* [PARQUET-1492](https://issues.apache.org/jira/browse/PARQUET-1492) - Remove protobuf install in travis build
+* [PARQUET-1499](https://issues.apache.org/jira/browse/PARQUET-1499) - \[parquet-mr\] Add Java 11 to Travis
* [PARQUET-1500](https://issues.apache.org/jira/browse/PARQUET-1500) - Remove the Closables
-* [PARQUET-1502](https://issues.apache.org/jira/browse/PARQUET-1502) - Convert FIXED\_LEN\_BYTE\_ARRAY to arrow type in
+* [PARQUET-1502](https://issues.apache.org/jira/browse/PARQUET-1502) - Convert FIXED\_LEN\_BYTE\_ARRAY to arrow type in logicalTypeAnnotation if it is not null
* [PARQUET-1503](https://issues.apache.org/jira/browse/PARQUET-1503) - Remove Ints Utility Class
* [PARQUET-1504](https://issues.apache.org/jira/browse/PARQUET-1504) - Add an option to convert Parquet Int96 to Arrow Timestamp
* [PARQUET-1505](https://issues.apache.org/jira/browse/PARQUET-1505) - Use Java 7 NIO StandardCharsets
@@ -103,6 +121,40 @@
* [PARQUET-1509](https://issues.apache.org/jira/browse/PARQUET-1509) - Update Docs for Hive Deprecation
* [PARQUET-1513](https://issues.apache.org/jira/browse/PARQUET-1513) - HiddenFileFilter Streamline
* [PARQUET-1518](https://issues.apache.org/jira/browse/PARQUET-1518) - Bump Jackson2 version of parquet-cli
+* [PARQUET-1530](https://issues.apache.org/jira/browse/PARQUET-1530) - Remove Dependency on commons-codec
+* [PARQUET-1542](https://issues.apache.org/jira/browse/PARQUET-1542) - Merge multiple I/O to one time I/O when read footer
+* [PARQUET-1557](https://issues.apache.org/jira/browse/PARQUET-1557) - Replace deprecated Apache Avro methods
+* [PARQUET-1558](https://issues.apache.org/jira/browse/PARQUET-1558) - Use try-with-resource in Apache Avro tests
+* [PARQUET-1576](https://issues.apache.org/jira/browse/PARQUET-1576) - Upgrade to Avro 1.9.0
+* [PARQUET-1577](https://issues.apache.org/jira/browse/PARQUET-1577) - Remove duplicate license
+* [PARQUET-1578](https://issues.apache.org/jira/browse/PARQUET-1578) - Introduce Lambdas
+* [PARQUET-1579](https://issues.apache.org/jira/browse/PARQUET-1579) - Add Github PR template
+* [PARQUET-1580](https://issues.apache.org/jira/browse/PARQUET-1580) - Page-level CRC checksum verification for DataPageV1
+* [PARQUET-1601](https://issues.apache.org/jira/browse/PARQUET-1601) - Add zstd support to parquet-cli to-avro
+* [PARQUET-1604](https://issues.apache.org/jira/browse/PARQUET-1604) - Bump fastutil from 7.0.13 to 8.2.3
+* [PARQUET-1605](https://issues.apache.org/jira/browse/PARQUET-1605) - Bump maven-javadoc-plugin from 2.9 to 3.1.0
+* [PARQUET-1606](https://issues.apache.org/jira/browse/PARQUET-1606) - Fix invalid tests scope
+* [PARQUET-1607](https://issues.apache.org/jira/browse/PARQUET-1607) - Remove duplicate maven-enforcer-plugin
+* [PARQUET-1616](https://issues.apache.org/jira/browse/PARQUET-1616) - Enable Maven batch mode
+* [PARQUET-1650](https://issues.apache.org/jira/browse/PARQUET-1650) - Implement unit test to validate column/offset indexes
+* [PARQUET-1654](https://issues.apache.org/jira/browse/PARQUET-1654) - Remove unnecessary options when building thrift
+* [PARQUET-1661](https://issues.apache.org/jira/browse/PARQUET-1661) - Upgrade to Avro 1.9.1
+* [PARQUET-1662](https://issues.apache.org/jira/browse/PARQUET-1662) - Upgrade Jackson to version 2.9.10
+* [PARQUET-1665](https://issues.apache.org/jira/browse/PARQUET-1665) - Upgrade zstd-jni to 1.4.0-1
+* [PARQUET-1669](https://issues.apache.org/jira/browse/PARQUET-1669) - Disable compiling all libraries when building thrift
+* [PARQUET-1671](https://issues.apache.org/jira/browse/PARQUET-1671) - Upgrade Yetus to 0.11.0
+* [PARQUET-1682](https://issues.apache.org/jira/browse/PARQUET-1682) - Maintain forward compatibility for TIME/TIMESTAMP
+* [PARQUET-1683](https://issues.apache.org/jira/browse/PARQUET-1683) - Remove unnecessary string converting in readFooter method
+* [PARQUET-1685](https://issues.apache.org/jira/browse/PARQUET-1685) - Truncate the stored min and max for String statistics to reduce the footer size
+
+#### Test
+
+* [PARQUET-1536](https://issues.apache.org/jira/browse/PARQUET-1536) - \[parquet-cli\] Add simple tests for each command
+
+#### Wish
+
+* [PARQUET-1552](https://issues.apache.org/jira/browse/PARQUET-1552) - upgrade protoc-jar-maven-plugin to 3.8.0
+* [PARQUET-1673](https://issues.apache.org/jira/browse/PARQUET-1673) - Upgrade parquet-mr format version to 2.7.0
#### Task
@@ -111,6 +163,12 @@
* [PARQUET-1434](https://issues.apache.org/jira/browse/PARQUET-1434) - Release parquet-mr 1.11.0
* [PARQUET-1436](https://issues.apache.org/jira/browse/PARQUET-1436) - TimestampMicrosStringifier shows wrong microseconds for timestamps before 1970
* [PARQUET-1452](https://issues.apache.org/jira/browse/PARQUET-1452) - Deprecate old logical types API
+* [PARQUET-1551](https://issues.apache.org/jira/browse/PARQUET-1551) - Support Java 11 - top-level JIRA
+* [PARQUET-1570](https://issues.apache.org/jira/browse/PARQUET-1570) - Publish 1.11.0 to maven central
+* [PARQUET-1585](https://issues.apache.org/jira/browse/PARQUET-1585) - Update old external links in the code base
+* [PARQUET-1645](https://issues.apache.org/jira/browse/PARQUET-1645) - Bump Apache Avro to 1.9.1
+* [PARQUET-1649](https://issues.apache.org/jira/browse/PARQUET-1649) - Bump Jackson Databind to 2.9.9.3
+* [PARQUET-1687](https://issues.apache.org/jira/browse/PARQUET-1687) - Update release process
### Version 1.10.1 ###
diff --git a/README.md b/README.md
index f670e41..bda45e2 100644
--- a/README.md
+++ b/README.md
@@ -39,7 +39,7 @@
tar xzf thrift-0.12.0.tar.gz
cd thrift-0.12.0
chmod +x ./configure
-./configure --disable-gen-erl --disable-gen-hs --without-ruby --without-haskell --without-erlang --without-php --without-nodejs
+./configure --disable-libs
sudo make install
```
diff --git a/dev/finalize-release b/dev/finalize-release
new file mode 100755
index 0000000..e06524a
--- /dev/null
+++ b/dev/finalize-release
@@ -0,0 +1,42 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set -e
+
+if [ -z "$3" ]; then
+ cat <<EOF
+Usage: $0 <release-version> <rc-num> <new-development-version-without-SNAPSHOT-suffix>
+Example: $0 1.11.0 7 1.12.0
+EOF
+ exit 1
+fi
+
+release_version="$1"
+release_tag="apache-parquet-$release_version"
+rc_tag="$release_tag-rc$2"
+new_development_version="$3-SNAPSHOT"
+
+git tag -am "Release Apache Parquet $release_version" "$release_tag" "$rc_tag"
+mvn --batch-mode release:update-versions -DdevelopmentVersion="$new_development_version"
+mvn -pl . versions:set-property -Dproperty=previous.version -DnewVersion="$release_version"
+git commit -am 'Prepare for next development iteration'
+
+echo
+echo "Verify the release tag and the current development version then push the changes by running: git push --follow-tags"
diff --git a/dev/prepare-release.sh b/dev/prepare-release.sh
index 04a79a1..94ce96d 100755
--- a/dev/prepare-release.sh
+++ b/dev/prepare-release.sh
@@ -20,18 +20,23 @@
set -e
-if [ -z "$2" ]; then
+[[ $# != 2 ]] && err="Incorrect number of arguments: $#"
+[[ -z $err ]] && ! [[ $1 =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]] && err="Invalid release version: \"$1\""
+[[ -z $err ]] && ! [[ $2 =~ ^[0-9]+$ ]] && err="Invalid rc number: \"$2\""
+
+if [[ -n $err ]]; then
cat <<EOF
-Usage: $0 <release-version> <new-development-version-without-SNAPSHOT-suffix>
-Example: $0 1.6.0 1.7.0
+$err
+Usage: $0 <release-version> <rc-num>
+Example: $0 1.11.0 7
EOF
exit 1
fi
release_version="$1"
-new_development_version="$2-SNAPSHOT"
+new_development_version="$release_version-SNAPSHOT"
-tag="apache-parquet-$release_version"
+tag="apache-parquet-$release_version-rc$2"
mvn release:clean
mvn release:prepare -Dtag="$tag" "-DreleaseVersion=$release_version" -DdevelopmentVersion="$new_development_version"
diff --git a/dev/source-release.sh b/dev/source-release.sh
old mode 100644
new mode 100755
index 28c414f..5ff06a1
--- a/dev/source-release.sh
+++ b/dev/source-release.sh
@@ -41,10 +41,10 @@
echo "Preparing source for $tagrc"
-release_hash=`git rev-list $tag 2> /dev/null | head -n 1 `
+release_hash=`git rev-list "$tagrc" 2> /dev/null | head -n 1 `
if [ -z "$release_hash" ]; then
- echo "Cannot continue: unknown git tag: $tag"
+ echo "Cannot continue: unknown git tag: $tagrc"
exit
fi
diff --git a/dev/travis-before_install.sh b/dev/travis-before_install.sh
old mode 100644
new mode 100755
index c87c8b5..14b8ca2
--- a/dev/travis-before_install.sh
+++ b/dev/travis-before_install.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -24,16 +25,15 @@
set -e
date
sudo apt-get update -qq
-sudo apt-get install -qq build-essential pv autoconf automake libtool curl make \
+sudo apt-get install -qq --no-install-recommends build-essential pv autoconf automake libtool curl make \
g++ unzip libboost-dev libboost-test-dev libboost-program-options-dev \
libevent-dev automake libtool flex bison pkg-config g++ libssl-dev xmlstarlet
date
pwd
-wget -nv https://archive.apache.org/dist/thrift/${THIFT_VERSION}/thrift-${THIFT_VERSION}.tar.gz
-tar zxf thrift-${THIFT_VERSION}.tar.gz
+wget -qO- https://archive.apache.org/dist/thrift/0.12.0/thrift-0.12.0.tar.gz | tar zxf -
cd thrift-${THIFT_VERSION}
chmod +x ./configure
-./configure --disable-gen-erl --disable-gen-hs --without-ruby --without-haskell --without-erlang --without-php --without-nodejs
+./configure --disable-libs
sudo make install
cd ..
branch_specific_script="dev/travis-before_install-${TRAVIS_BRANCH}.sh"
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
index cc49cc2..b9c07cf 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
@@ -273,13 +273,8 @@
public Object convert(Binary binary) {
try {
return ctor.newInstance(binary.toStringUsingUTF8());
- } catch (InstantiationException e) {
- throw new ParquetDecodingException(
- "Cannot convert binary to " + stringableName, e);
- } catch (IllegalAccessException e) {
- throw new ParquetDecodingException(
- "Cannot convert binary to " + stringableName, e);
- } catch (InvocationTargetException e) {
+ } catch (InstantiationException | IllegalAccessException
+ | InvocationTargetException e) {
throw new ParquetDecodingException(
"Cannot convert binary to " + stringableName, e);
}
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index 4bbf58a..aba562f 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -150,35 +150,38 @@
ParentValueContainer parent = ParentValueContainer
.getConversionContainer(setter, conversion, schema);
- if (schema.getType().equals(Schema.Type.BOOLEAN)) {
- return new AvroConverters.FieldBooleanConverter(parent);
- } else if (schema.getType().equals(Schema.Type.INT)) {
- return new AvroConverters.FieldIntegerConverter(parent);
- } else if (schema.getType().equals(Schema.Type.LONG)) {
- return new AvroConverters.FieldLongConverter(parent);
- } else if (schema.getType().equals(Schema.Type.FLOAT)) {
- return new AvroConverters.FieldFloatConverter(parent);
- } else if (schema.getType().equals(Schema.Type.DOUBLE)) {
- return new AvroConverters.FieldDoubleConverter(parent);
- } else if (schema.getType().equals(Schema.Type.BYTES)) {
- return new AvroConverters.FieldByteBufferConverter(parent);
- } else if (schema.getType().equals(Schema.Type.STRING)) {
- return new AvroConverters.FieldStringConverter(parent);
- } else if (schema.getType().equals(Schema.Type.RECORD)) {
- return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema, model);
- } else if (schema.getType().equals(Schema.Type.ENUM)) {
- return new FieldEnumConverter(parent, schema, model);
- } else if (schema.getType().equals(Schema.Type.ARRAY)) {
+ switch (schema.getType()) {
+ case ARRAY:
return new AvroArrayConverter(parent, type.asGroupType(), schema, model);
- } else if (schema.getType().equals(Schema.Type.MAP)) {
- return new MapConverter(parent, type.asGroupType(), schema, model);
- } else if (schema.getType().equals(Schema.Type.UNION)) {
- return new AvroUnionConverter(parent, type, schema, model);
- } else if (schema.getType().equals(Schema.Type.FIXED)) {
+ case BOOLEAN:
+ return new AvroConverters.FieldBooleanConverter(parent);
+ case BYTES:
+ return new AvroConverters.FieldByteBufferConverter(parent);
+ case DOUBLE:
+ return new AvroConverters.FieldDoubleConverter(parent);
+ case ENUM:
+ return new FieldEnumConverter(parent, schema, model);
+ case FIXED:
return new FieldFixedConverter(parent, schema, model);
+ case FLOAT:
+ return new AvroConverters.FieldFloatConverter(parent);
+ case INT:
+ return new AvroConverters.FieldIntegerConverter(parent);
+ case LONG:
+ return new AvroConverters.FieldLongConverter(parent);
+ case MAP:
+ return new MapConverter(parent, type.asGroupType(), schema, model);
+ case RECORD:
+ return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema, model);
+ case STRING:
+ return new AvroConverters.FieldStringConverter(parent);
+ case UNION:
+ return new AvroUnionConverter(parent, type, schema, model);
+ case NULL: // fall through
+ default:
+ throw new UnsupportedOperationException(String.format("Cannot convert Avro type: %s" +
+ " (Parquet type: %s) ", schema, type));
}
- throw new UnsupportedOperationException(String.format("Cannot convert Avro type: %s" +
- " (Parquet type: %s) ", schema, type));
}
private void set(int index, Object value) {
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 93bde65..743d6aa 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -381,9 +381,7 @@
try {
return (Class<T>) getClassMethod.invoke(schema);
- } catch (IllegalAccessException e) {
- return null;
- } catch (InvocationTargetException e) {
+ } catch (IllegalAccessException | InvocationTargetException e) {
return null;
}
}
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
index 1f47ba7..76f296e 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -34,7 +34,6 @@
import org.junit.Ignore;
import org.junit.Test;
import org.apache.parquet.DirectWriterTest;
-import org.apache.parquet.io.api.RecordConsumer;
import static org.apache.parquet.avro.AvroTestUtil.array;
import static org.apache.parquet.avro.AvroTestUtil.field;
@@ -64,20 +63,17 @@
"message UnannotatedListOfPrimitives {" +
" repeated int32 list_of_ints;" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("list_of_ints", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("list_of_ints", 0);
- rc.addInteger(34);
- rc.addInteger(35);
- rc.addInteger(36);
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
- rc.endField("list_of_ints", 0);
- rc.endMessage();
- }
- });
+ rc.endField("list_of_ints", 0);
+ rc.endMessage();
+ });
Schema expectedSchema = record("OldPrimitiveInList",
field("list_of_ints", array(primitive(Schema.Type.INT))));
@@ -100,34 +96,31 @@
" required float y;" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("list_of_points", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("list_of_points", 0);
- rc.startGroup();
- rc.startField("x", 0);
- rc.addFloat(1.0f);
- rc.endField("x", 0);
- rc.startField("y", 1);
- rc.addFloat(1.0f);
- rc.endField("y", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("x", 0);
+ rc.addFloat(1.0f);
+ rc.endField("x", 0);
+ rc.startField("y", 1);
+ rc.addFloat(1.0f);
+ rc.endField("y", 1);
+ rc.endGroup();
- rc.startGroup();
- rc.startField("x", 0);
- rc.addFloat(2.0f);
- rc.endField("x", 0);
- rc.startField("y", 1);
- rc.addFloat(2.0f);
- rc.endField("y", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("x", 0);
+ rc.addFloat(2.0f);
+ rc.endField("x", 0);
+ rc.startField("y", 1);
+ rc.addFloat(2.0f);
+ rc.endField("y", 1);
+ rc.endGroup();
- rc.endField("list_of_points", 0);
- rc.endMessage();
- }
- });
+ rc.endField("list_of_points", 0);
+ rc.endMessage();
+ });
Schema point = record("?",
field("x", primitive(Schema.Type.FLOAT)),
@@ -153,26 +146,23 @@
" repeated int32 array;" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("list_of_ints", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("list_of_ints", 0);
- rc.startGroup();
- rc.startField("array", 0);
+ rc.startGroup();
+ rc.startField("array", 0);
- rc.addInteger(34);
- rc.addInteger(35);
- rc.addInteger(36);
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
- rc.endField("array", 0);
- rc.endGroup();
+ rc.endField("array", 0);
+ rc.endGroup();
- rc.endField("list_of_ints", 0);
- rc.endMessage();
- }
- });
+ rc.endField("list_of_ints", 0);
+ rc.endMessage();
+ });
Schema expectedSchema = record("RepeatedPrimitiveInList",
field("list_of_ints", array(Schema.create(Schema.Type.INT))));
@@ -197,40 +187,37 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("element", 0);
+ rc.startGroup();
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(0.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(180.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup();
+ rc.endField("element", 0);
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
@@ -259,34 +246,31 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("single_element_groups", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("single_element_groups", 0);
- rc.startGroup();
- rc.startField("single_element_group", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("single_element_group", 0); // start writing array contents
- rc.startGroup();
- rc.startField("count", 0);
- rc.addLong(1234L);
- rc.endField("count", 0);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(1234L);
+ rc.endField("count", 0);
+ rc.endGroup();
- rc.startGroup();
- rc.startField("count", 0);
- rc.addLong(2345L);
- rc.endField("count", 0);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(2345L);
+ rc.endField("count", 0);
+ rc.endGroup();
- rc.endField("single_element_group", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("single_element_group", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("single_element_groups", 0);
- rc.endMessage();
- }
- });
+ rc.endField("single_element_groups", 0);
+ rc.endMessage();
+ });
// can't tell from storage whether this should be a list of single-field
// records or if the single_field_group layer is synthetic.
@@ -335,34 +319,31 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("single_element_groups", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("single_element_groups", 0);
- rc.startGroup();
- rc.startField("single_element_group", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("single_element_group", 0); // start writing array contents
- rc.startGroup();
- rc.startField("count", 0);
- rc.addLong(1234L);
- rc.endField("count", 0);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(1234L);
+ rc.endField("count", 0);
+ rc.endGroup();
- rc.startGroup();
- rc.startField("count", 0);
- rc.addLong(2345L);
- rc.endField("count", 0);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(2345L);
+ rc.endField("count", 0);
+ rc.endGroup();
- rc.endField("single_element_group", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("single_element_group", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("single_element_groups", 0);
- rc.endMessage();
- }
- },
+ rc.endField("single_element_groups", 0);
+ rc.endMessage();
+ },
metadata);
GenericRecord expectedRecord = instance(expectedSchema,
@@ -388,58 +369,55 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("list", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("list", 0); // start writing array contents
- // write a non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(0.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- // write a null element (element field is omitted)
- rc.startGroup(); // array level
- rc.endGroup(); // array level
+ // write a null element (element field is omitted)
+ rc.startGroup(); // array level
+ rc.endGroup(); // array level
- // write a second non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(180.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- rc.endField("list", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("list", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
@@ -484,54 +462,51 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("list", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("list", 0); // start writing array contents
- // write a non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(180.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- // write a second non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(0.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- rc.endField("list", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("list", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
@@ -574,54 +549,51 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("array", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("array", 0); // start writing array contents
- // write a non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(180.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- // write a second non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(0.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- rc.endField("array", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("array", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
@@ -656,54 +628,51 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("array", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("array", 0); // start writing array contents
- // write a non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(180.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- // write a second non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(0.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- rc.endField("array", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("array", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
@@ -742,48 +711,45 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("array", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("array", 0); // start writing array contents
- rc.startGroup();
- rc.startField("array", 0); // start writing inner array contents
+ rc.startGroup();
+ rc.startField("array", 0); // start writing inner array contents
- // write [34, 35, 36]
- rc.addInteger(34);
- rc.addInteger(35);
- rc.addInteger(36);
+ // write [34, 35, 36]
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
- rc.endField("array", 0); // finished writing inner array contents
- rc.endGroup();
+ rc.endField("array", 0); // finished writing inner array contents
+ rc.endGroup();
- // write an empty list
- rc.startGroup();
- rc.endGroup();
+ // write an empty list
+ rc.startGroup();
+ rc.endGroup();
- rc.startGroup();
- rc.startField("array", 0); // start writing inner array contents
+ rc.startGroup();
+ rc.startField("array", 0); // start writing inner array contents
- // write [32, 33, 34]
- rc.addInteger(32);
- rc.addInteger(33);
- rc.addInteger(34);
+ // write [32, 33, 34]
+ rc.addInteger(32);
+ rc.addInteger(33);
+ rc.addInteger(34);
- rc.endField("array", 0); // finished writing inner array contents
- rc.endGroup();
+ rc.endField("array", 0); // finished writing inner array contents
+ rc.endGroup();
- rc.endField("array", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("array", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema listOfLists = array(array(primitive(Schema.Type.INT)));
Schema oldSchema = record("AvroCompatListInList",
@@ -810,48 +776,45 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("listOfLists_tuple", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("listOfLists_tuple", 0); // start writing array contents
- rc.startGroup();
- rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
+ rc.startGroup();
+ rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
- // write [34, 35, 36]
- rc.addInteger(34);
- rc.addInteger(35);
- rc.addInteger(36);
+ // write [34, 35, 36]
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
- rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
- rc.endGroup();
+ rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
+ rc.endGroup();
- // write an empty list
- rc.startGroup();
- rc.endGroup();
+ // write an empty list
+ rc.startGroup();
+ rc.endGroup();
- rc.startGroup();
- rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
+ rc.startGroup();
+ rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
- // write [32, 33, 34]
- rc.addInteger(32);
- rc.addInteger(33);
- rc.addInteger(34);
+ // write [32, 33, 34]
+ rc.addInteger(32);
+ rc.addInteger(33);
+ rc.addInteger(34);
- rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
- rc.endGroup();
+ rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
+ rc.endGroup();
- rc.endField("listOfLists_tuple", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("listOfLists_tuple", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema listOfLists = array(array(primitive(Schema.Type.INT)));
Schema oldSchema = record("ThriftCompatListInList",
@@ -881,54 +844,51 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("locations_tuple", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("locations_tuple", 0); // start writing array contents
- // write a non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(180.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- // write a second non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(0.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- rc.endField("locations_tuple", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("locations_tuple", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
@@ -963,54 +923,51 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("locations", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
- rc.startGroup();
- rc.startField("bag", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("bag", 0); // start writing array contents
- // write a non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(180.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- // write a second non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- rc.startGroup();
- rc.startField("latitude", 0);
- rc.addDouble(0.0);
- rc.endField("latitude", 0);
- rc.startField("longitude", 1);
- rc.addDouble(0.0);
- rc.endField("longitude", 1);
- rc.endGroup();
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- rc.endField("bag", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("bag", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("locations", 0);
- rc.endMessage();
- }
- });
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
Schema location = record("element",
field("latitude", primitive(Schema.Type.DOUBLE)),
@@ -1053,50 +1010,47 @@
" }" +
" }" +
"}",
- new DirectWriter() {
- @Override
- public void write(RecordConsumer rc) {
- rc.startMessage();
- rc.startField("list_of_structs", 0);
+ rc -> {
+ rc.startMessage();
+ rc.startField("list_of_structs", 0);
- rc.startGroup();
- rc.startField("list", 0); // start writing array contents
+ rc.startGroup();
+ rc.startField("list", 0); // start writing array contents
- // write a non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- // the inner element field
- rc.startGroup();
- rc.startField("element", 0);
- rc.addFloat(33.0F);
- rc.endField("element", 0);
- rc.endGroup();
+ // the inner element field
+ rc.startGroup();
+ rc.startField("element", 0);
+ rc.addFloat(33.0F);
+ rc.endField("element", 0);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- // write a second non-null element
- rc.startGroup(); // array level
- rc.startField("element", 0);
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
- // the inner element field
- rc.startGroup();
- rc.startField("element", 0);
- rc.addFloat(34.0F);
- rc.endField("element", 0);
- rc.endGroup();
+ // the inner element field
+ rc.startGroup();
+ rc.startField("element", 0);
+ rc.addFloat(34.0F);
+ rc.endField("element", 0);
+ rc.endGroup();
- rc.endField("element", 0);
- rc.endGroup(); // array level
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
- rc.endField("list", 0); // finished writing array contents
- rc.endGroup();
+ rc.endField("list", 0); // finished writing array contents
+ rc.endGroup();
- rc.endField("list_of_structs", 0);
- rc.endMessage();
- }
- });
+ rc.endField("list_of_structs", 0);
+ rc.endMessage();
+ });
Schema structWithElementField = record("element",
field("element", primitive(Schema.Type.FLOAT)));
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 36bd7ba..f5b348b 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -603,12 +603,7 @@
}
assertThrows("Should not allow TIME_MICROS with " + primitive,
- IllegalArgumentException.class, new Runnable() {
- @Override
- public void run() {
- new AvroSchemaConverter().convert(message(type));
- }
- });
+ IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@@ -633,12 +628,7 @@
}
assertThrows("Should not allow TIME_MICROS with " + primitive,
- IllegalArgumentException.class, new Runnable() {
- @Override
- public void run() {
- new AvroSchemaConverter().convert(message(type));
- }
- });
+ IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@@ -663,12 +653,7 @@
}
assertThrows("Should not allow TIME_MICROS with " + primitive,
- IllegalArgumentException.class, new Runnable() {
- @Override
- public void run() {
- new AvroSchemaConverter().convert(message(type));
- }
- });
+ IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@@ -693,12 +678,7 @@
}
assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
- IllegalArgumentException.class, new Runnable() {
- @Override
- public void run() {
- new AvroSchemaConverter().convert(message(type));
- }
- });
+ IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@@ -723,12 +703,7 @@
}
assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
- IllegalArgumentException.class, new Runnable() {
- @Override
- public void run() {
- new AvroSchemaConverter().convert(message(type));
- }
- });
+ IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
diff --git a/parquet-benchmarks/README.md b/parquet-benchmarks/README.md
index 8da067b..63101bd 100644
--- a/parquet-benchmarks/README.md
+++ b/parquet-benchmarks/README.md
@@ -17,22 +17,42 @@
~ under the License.
-->
-##Running Parquet Benchmarks
+# Running Parquet Benchmarks
-First, build the ``parquet-benchmarks`` module
+The Parquet benchmarks in this module are run using the
+[OpenJDK Java Microbenchmarking Harness](http://openjdk.java.net/projects/code-tools/jmh/).
+
+First, building the `parquet-benchmarks` module creates an uber-jar including the Parquet
+classes and all dependencies, and a main class to launch the JMH tool.
```
mvn --projects parquet-benchmarks -amd -DskipTests -Denforcer.skip=true clean package
```
-Then, you can run all the benchmarks with the following command
+JMH doesn't have the notion of "benchmark suites", but there are certain benchmarks that
+make sense to group together or to run in isolation during development. The
+`./parquet-benchmarks/run.sh` script can be used to launch all or some benchmarks:
```
-./parquet-benchmarks/run.sh -wi 5 -i 5 -f 3 -bm all
+# More information about the run script and the available arguments.
+./parquet-benchmarks/run.sh
+
+# More information on the JMH options available.
+./parquet-benchmarks/run.sh all -help
+
+# Run every benchmark once (~20 minutes).
+./parquet-benchmarks/run.sh all -wi 0 -i 1 -f 1
+
+# A more rigourous run of all benchmarks, saving a report for comparison.
+./parquet-benchmarks/run.sh all -wi 5 -i 5 -f 3 -rff /tmp/benchmark1.json
+
+# Run a benchmark "suite" built into the script, with JMH defaults (about 30 minutes)
+./parquet-benchmarks/run.sh checksum
+
+# Running one specific benchmark using a regex.
+./parquet-benchmarks/run.sh all org.apache.parquet.benchmarks.NestedNullWritingBenchmarks
+
+# Manually clean up any state left behind from a previous run.
+./parquet-benchmarks/run.sh clean
```
-To understand what each command line argument means and for more arguments please see
-
-```
-java -jar parquet-benchmarks/target/parquet-benchmarks.jar -help
-```
\ No newline at end of file
diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml
index fe72f3b..5e33057 100644
--- a/parquet-benchmarks/pom.xml
+++ b/parquet-benchmarks/pom.xml
@@ -83,11 +83,6 @@
<build>
<plugins>
- <!-- This module disables semver checks because it is not a public API.
- <plugin>
- <artifactId>maven-enforcer-plugin</artifactId>
- </plugin>
- -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
diff --git a/parquet-benchmarks/run.sh b/parquet-benchmarks/run.sh
index 8aa1e69..ba40766 100755
--- a/parquet-benchmarks/run.sh
+++ b/parquet-benchmarks/run.sh
@@ -20,11 +20,91 @@
SCRIPT_PATH=$( cd "$(dirname "$0")" ; pwd -P )
-echo "Starting WRITE benchmarks"
-java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*Write* "$@"
-echo "Generating test data"
-java -cp ${SCRIPT_PATH}/target/parquet-benchmarks.jar org.apache.parquet.benchmarks.DataGenerator generate
-echo "Data generated, starting READ benchmarks"
-java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*Read* "$@"
-echo "Cleaning up generated data"
-java -cp ${SCRIPT_PATH}/target/parquet-benchmarks.jar org.apache.parquet.benchmarks.DataGenerator cleanup
+BENCHMARK=$1; shift
+JMH_OPTIONS="$@"
+
+if [ -z "$BENCHMARK" ]; then
+
+ # Print usage if run without arguments.
+ cat << EOF
+Runs Parquet JMH-based benchmarks.
+
+Usage:
+ run.sh <BENCHMARK> [JMH_OPTIONS]
+
+Information on the JMH_OPTIONS can be found by running: run.sh all -help
+
+<BENCHMARK> | Description
+----------- | ----------
+all | Runs all benchmarks in the module (listed here and others).
+build | (No benchmark run, shortcut to rebuild the JMH uber jar).
+clean | (No benchmark run, shortcut to clean up any temporary files).
+read | Reading files with different compression, page and block sizes.
+write | Writing files.
+checksum | Reading and writing with and without CRC checksums.
+filter | Filtering column indexes
+
+Examples:
+
+# More information about the run script and the available arguments.
+./parquet-benchmarks/run.sh
+
+# More information on the JMH options available.
+./parquet-benchmarks/run.sh all -help
+
+# Run every benchmark once (~20 minutes).
+./parquet-benchmarks/run.sh all -wi 0 -i 1 -f 1
+
+# A more rigourous run of all benchmarks, saving a report for comparison.
+./parquet-benchmarks/run.sh all -wi 5 -i 5 -f 3 -rff /tmp/benchmark1.json
+
+# Run a benchmark "suite" built into the script, with JMH defaults (about 30 minutes)
+./parquet-benchmarks/run.sh checksum
+
+# Running one specific benchmark using a regex.
+./parquet-benchmarks/run.sh all org.apache.parquet.benchmarks.NestedNullWritingBenchmarks
+
+EOF
+
+elif [ "$BENCHMARK" == "build" ]; then
+
+ # Shortcut utility to rebuild the benchmark module only.
+ ( cd $SCRIPT_PATH && mvn -amd -DskipTests -Denforcer.skip=true clean package )
+
+elif [ "$BENCHMARK" == "clean" ]; then
+
+ # Shortcut utility to clean any state left behind from any previous run.
+ java -cp ${SCRIPT_PATH}/target/parquet-benchmarks.jar org.apache.parquet.benchmarks.DataGenerator cleanup
+
+else
+
+ # Actually run a benchmark in the JMH harness.
+
+ # Build the benchmark uberjar if it doesn't already exist.
+ if [ ! -f ${SCRIPT_PATH}/target/parquet-benchmarks.jar ]; then
+ ${SCRIPT_PATH}/run.sh build
+ fi
+
+ # Pick a regex if specified.
+ BENCHMARK_REGEX=""
+ case "$BENCHMARK" in
+ "read")
+ BENCHMARK_REGEX="org.apache.parquet.benchmarks.ReadBenchmarks"
+ ;;
+ "write")
+ BENCHMARK_REGEX="org.apache.parquet.benchmarks.WriteBenchmarks"
+ ;;
+ "checksum")
+ BENCHMARK_REGEX="org.apache.parquet.benchmarks.PageChecksum.*"
+ ;;
+ "filter")
+ BENCHMARK_REGEX="org.apache.parquet.benchmarks.FilteringBenchmarks"
+ ;;
+ esac
+
+ echo JMH command: java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar $BENCHMARK_REGEX $JMH_OPTIONS
+ java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar $BENCHMARK_REGEX $JMH_OPTIONS
+
+ # Clean any data files generated by the benchmarks.
+ ${SCRIPT_PATH}/run.sh clean
+fi
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
index f039403..24da822 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
@@ -25,6 +25,8 @@
public static final Configuration configuration = new Configuration();
public static final String TARGET_DIR = "target/tests/ParquetBenchmarks";
+ public static final Path targetDir = new Path(TARGET_DIR );
+
public static final Path file_1M = new Path(TARGET_DIR + "/PARQUET-1M");
//different block and page sizes
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
index 42d9953..3b5db68 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
@@ -115,14 +115,7 @@
public void cleanup()
{
- deleteIfExists(configuration, file_1M);
- deleteIfExists(configuration, file_1M_BS256M_PS4M);
- deleteIfExists(configuration, file_1M_BS256M_PS8M);
- deleteIfExists(configuration, file_1M_BS512M_PS4M);
- deleteIfExists(configuration, file_1M_BS512M_PS8M);
-// deleteIfExists(configuration, parquetFile_1M_LZO);
- deleteIfExists(configuration, file_1M_SNAPPY);
- deleteIfExists(configuration, file_1M_GZIP);
+ deleteIfExists(configuration, targetDir);
}
public static void main(String[] args) {
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java
index 6c62cc6..49ebdce 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java
@@ -40,7 +40,7 @@
import static org.apache.parquet.benchmarks.BenchmarkUtils.exists;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;
-public class PageChecksumDataGenerator {
+public class PageChecksumDataGenerator extends DataGenerator {
private final MessageType SCHEMA = MessageTypeParser.parseMessageType(
"message m {" +
@@ -103,25 +103,4 @@
throw new RuntimeException(e);
}
}
-
- public void cleanup() {
- deleteIfExists(configuration, file_100K_NOCHECKSUMS_UNCOMPRESSED);
- deleteIfExists(configuration, file_100K_CHECKSUMS_UNCOMPRESSED);
- deleteIfExists(configuration, file_100K_NOCHECKSUMS_GZIP);
- deleteIfExists(configuration, file_100K_CHECKSUMS_GZIP);
- deleteIfExists(configuration, file_100K_NOCHECKSUMS_SNAPPY);
- deleteIfExists(configuration, file_100K_CHECKSUMS_SNAPPY);
- deleteIfExists(configuration, file_1M_NOCHECKSUMS_UNCOMPRESSED);
- deleteIfExists(configuration, file_1M_CHECKSUMS_UNCOMPRESSED);
- deleteIfExists(configuration, file_1M_NOCHECKSUMS_GZIP);
- deleteIfExists(configuration, file_1M_CHECKSUMS_GZIP);
- deleteIfExists(configuration, file_1M_NOCHECKSUMS_SNAPPY);
- deleteIfExists(configuration, file_1M_CHECKSUMS_SNAPPY);
- deleteIfExists(configuration, file_10M_NOCHECKSUMS_UNCOMPRESSED);
- deleteIfExists(configuration, file_10M_CHECKSUMS_UNCOMPRESSED);
- deleteIfExists(configuration, file_10M_NOCHECKSUMS_GZIP);
- deleteIfExists(configuration, file_10M_CHECKSUMS_GZIP);
- deleteIfExists(configuration, file_10M_NOCHECKSUMS_SNAPPY);
- deleteIfExists(configuration, file_10M_CHECKSUMS_SNAPPY);
- }
}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java
index db23eeb..be2ebe4 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java
@@ -51,16 +51,15 @@
private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator();
+ /**
+ * This needs to be done exactly once. To avoid needlessly regenerating the files for reading, they aren't cleaned
+ * as part of the benchmark. If the files exist, a message will be printed and they will not be regenerated.
+ */
@Setup(Level.Trial)
public void setup() {
pageChecksumDataGenerator.generateAll();
}
- @Setup(Level.Trial)
- public void cleanup() {
- pageChecksumDataGenerator.cleanup();
- }
-
private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole)
throws IOException {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
@@ -82,96 +81,114 @@
// 100k rows, uncompressed, GZIP, Snappy
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole);
}
// 1M rows, uncompressed, GZIP, Snappy
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole);
}
// 10M rows, uncompressed, GZIP, Snappy
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole);
}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java
index c743dde..e892d53 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java
@@ -57,102 +57,120 @@
private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator();
@Setup(Level.Iteration)
- public void cleanup() {
+ public void setup() {
pageChecksumDataGenerator.cleanup();
}
// 100k rows, uncompressed, GZIP, Snappy
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write100KRowsUncompressedWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, UNCOMPRESSED);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write100KRowsUncompressedWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write100KRowsGzipWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_GZIP, 100 * ONE_K, false, GZIP);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write100KRowsGzipWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write100KRowsSnappyWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_SNAPPY, 100 * ONE_K, false, SNAPPY);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write100KRowsSnappyWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
}
// 1M rows, uncompressed, GZIP, Snappy
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsUncompressedWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, UNCOMPRESSED);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsUncompressedWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsGzipWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_GZIP, ONE_MILLION, false, GZIP);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsGzipWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsSnappyWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_SNAPPY, ONE_MILLION, false, SNAPPY);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsSnappyWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
}
// 10M rows, uncompressed, GZIP, Snappy
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write10MRowsUncompressedWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, UNCOMPRESSED);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write10MRowsUncompressedWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write10MRowsGzipWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_GZIP, 10 * ONE_MILLION, false, GZIP);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write10MRowsGzipWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write10MRowsSnappyWithoutChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, SNAPPY);
}
- @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write10MRowsSnappyWithChecksums() throws IOException {
pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java
index dba5544..e74204a 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java
@@ -20,6 +20,13 @@
import org.apache.hadoop.fs.Path;
import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
@@ -29,7 +36,9 @@
import java.io.IOException;
+@State(Scope.Benchmark)
public class ReadBenchmarks {
+
private void read(Path parquetFile, int nRows, Blackhole blackhole) throws IOException
{
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), parquetFile).withConf(configuration).build();
@@ -47,7 +56,17 @@
reader.close();
}
+ /**
+ * This needs to be done exactly once. To avoid needlessly regenerating the files for reading, they aren't cleaned
+ * as part of the benchmark. If the files exist, a message will be printed and they will not be regenerated.
+ */
+ @Setup(Level.Trial)
+ public void generateFilesForRead() {
+ new DataGenerator().generateAll();
+ }
+
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsDefaultBlockAndPageSizeUncompressed(Blackhole blackhole)
throws IOException
{
@@ -55,6 +74,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsBS256MPS4MUncompressed(Blackhole blackhole)
throws IOException
{
@@ -62,6 +82,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsBS256MPS8MUncompressed(Blackhole blackhole)
throws IOException
{
@@ -69,6 +90,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsBS512MPS4MUncompressed(Blackhole blackhole)
throws IOException
{
@@ -76,6 +98,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsBS512MPS8MUncompressed(Blackhole blackhole)
throws IOException
{
@@ -91,6 +114,7 @@
// }
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsDefaultBlockAndPageSizeSNAPPY(Blackhole blackhole)
throws IOException
{
@@ -98,6 +122,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsDefaultBlockAndPageSizeGZIP(Blackhole blackhole)
throws IOException
{
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/WriteBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/WriteBenchmarks.java
index 5c26a84..0a2d2c0 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/WriteBenchmarks.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/WriteBenchmarks.java
@@ -19,7 +19,9 @@
package org.apache.parquet.benchmarks;
import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
@@ -39,12 +41,13 @@
private DataGenerator dataGenerator = new DataGenerator();
@Setup(Level.Iteration)
- public void cleanup() {
+ public void setup() {
//clean existing test data at the beginning of each iteration
dataGenerator.cleanup();
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsDefaultBlockAndPageSizeUncompressed()
throws IOException
{
@@ -59,6 +62,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsBS256MPS4MUncompressed()
throws IOException
{
@@ -73,6 +77,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsBS256MPS8MUncompressed()
throws IOException
{
@@ -87,6 +92,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsBS512MPS4MUncompressed()
throws IOException
{
@@ -101,6 +107,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsBS512MPS8MUncompressed()
throws IOException
{
@@ -130,6 +137,7 @@
// }
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsDefaultBlockAndPageSizeSNAPPY()
throws IOException
{
@@ -144,6 +152,7 @@
}
@Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
public void write1MRowsDefaultBlockAndPageSizeGZIP()
throws IOException
{
diff --git a/parquet-benchmarks/run_checksums.sh b/parquet-benchmarks/src/main/resources/log4j.properties
old mode 100755
new mode 100644
similarity index 67%
rename from parquet-benchmarks/run_checksums.sh
rename to parquet-benchmarks/src/main/resources/log4j.properties
index e798488..f4737c8
--- a/parquet-benchmarks/run_checksums.sh
+++ b/parquet-benchmarks/src/main/resources/log4j.properties
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,12 +16,9 @@
# under the License.
#
-# !/usr/bin/env bash
+log4j.rootLogger=INFO, stdout
-SCRIPT_PATH=$( cd "$(dirname "$0")" ; pwd -P )
-
-echo "Page level CRC checksum benchmarks"
-echo "Running write benchmarks"
-java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumWriteBenchmarks -bm ss "$@"
-echo "Running read benchmarks"
-java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumReadBenchmarks -bm ss "$@"
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p :: %m [%C]%n
diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml
index 84bf966..809b1f7 100644
--- a/parquet-cli/pom.xml
+++ b/parquet-cli/pom.xml
@@ -44,6 +44,11 @@
<version>${avro.version}</version>
</dependency>
<dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>${zstd-jni.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
@@ -61,7 +66,7 @@
<dependency>
<groupId>${jackson.groupId}</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
@@ -90,11 +95,6 @@
<build>
<plugins>
- <!-- This module disables semver checks because it is not a public API.
- <plugin>
- <artifactId>maven-enforcer-plugin</artifactId>
- </plugin>
- -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index 1bd74bd..cdef53d 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -226,7 +226,9 @@
@Override
public Configuration getConf() {
- return conf;
+ // In case conf is null, we'll return an empty configuration
+ // this can be on a local development machine
+ return null != conf ? conf : new Configuration();
}
/**
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
index ceb11cf..d659109 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
@@ -86,30 +86,30 @@
CodecFactory codecFactory = Codecs.avroCodec(compressionCodecName);
- Schema schema;
+ final Schema schema;
if (avroSchemaFile != null) {
schema = Schemas.fromAvsc(open(avroSchemaFile));
} else {
schema = getAvroSchema(source);
}
- Schema projection = filterSchema(schema, columns);
+ final Schema projection = filterSchema(schema, columns);
Path outPath = qualifiedPath(outputPath);
- FileSystem outFS = outPath.getFileSystem(getConf());
- if (overwrite && outFS.exists(outPath)) {
- console.debug("Deleting output file {} (already exists)", outPath);
- outFS.delete(outPath);
+ try (FileSystem outFS = outPath.getFileSystem(getConf())) {
+ if (overwrite && outFS.exists(outPath)) {
+ console.debug("Deleting output file {} (already exists)", outPath);
+ outFS.delete(outPath);
+ }
}
Iterable<Record> reader = openDataFile(source, projection);
boolean threw = true;
long count = 0;
- try {
- DatumWriter<Record> datumWriter = new GenericDatumWriter<>(schema);
- DataFileWriter<Record> w = new DataFileWriter<>(datumWriter);
- w.setCodec(codecFactory);
- try (DataFileWriter<Record> writer = w.create(projection, create(outputPath))) {
+ DatumWriter<Record> datumWriter = new GenericDatumWriter<>(schema);
+ try (DataFileWriter<Record> fileWriter = new DataFileWriter<>(datumWriter)) {
+ fileWriter.setCodec(codecFactory);
+ try (DataFileWriter<Record> writer=fileWriter.create(projection, create(outputPath))) {
for (Record record : reader) {
writer.append(record);
count += 1;
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
index 9adf22e..b26e64b 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
@@ -39,7 +39,7 @@
this.recordClass = recordClass;
// initialize the index and field arrays
- fields = schema.getFields().toArray(new Schema.Field[schema.getFields().size()]);
+ fields = schema.getFields().toArray(new Schema.Field[0]);
indexes = new int[fields.length];
if (header != null) {
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
index 9f4756d..4b989d7 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
@@ -64,7 +64,9 @@
private static final JsonFactory FACTORY = new JsonFactory(MAPPER);
public static Iterator<JsonNode> parser(final InputStream stream) {
- try(JsonParser parser = FACTORY.createParser(stream)) {
+ try {
+ // Don't close the parser until the iterator has been consumed
+ JsonParser parser = FACTORY.createParser(stream);
return parser.readValuesAs(JsonNode.class);
} catch (IOException e) {
throw new RuntimeIOException("Cannot read from stream", e);
@@ -78,9 +80,7 @@
public static <T> T parse(String json, Class<T> returnType) {
try {
return MAPPER.readValue(json, returnType);
- } catch (JsonParseException e) {
- throw new IllegalArgumentException("Invalid JSON", e);
- } catch (JsonMappingException e) {
+ } catch (JsonParseException | JsonMappingException e) {
throw new IllegalArgumentException("Invalid JSON", e);
} catch (IOException e) {
throw new RuntimeIOException("Cannot initialize JSON parser", e);
@@ -94,9 +94,7 @@
public static <T> T parse(InputStream json, Class<T> returnType) {
try {
return MAPPER.readValue(json, returnType);
- } catch (JsonParseException e) {
- throw new IllegalArgumentException("Invalid JSON stream", e);
- } catch (JsonMappingException e) {
+ } catch (JsonParseException | JsonMappingException e) {
throw new IllegalArgumentException("Invalid JSON stream", e);
} catch (IOException e) {
throw new RuntimeIOException("Cannot initialize JSON parser", e);
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Codecs.java b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Codecs.java
index 06f12fd..ee79ee6 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Codecs.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Codecs.java
@@ -42,6 +42,8 @@
return CodecFactory.snappyCodec();
case GZIP:
return CodecFactory.deflateCodec(9);
+ case ZSTD:
+ return CodecFactory.zstandardCodec(CodecFactory.DEFAULT_ZSTANDARD_LEVEL);
default:
throw new IllegalArgumentException(
"Codec incompatible with Avro: " + codec);
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/GetClassLoader.java b/parquet-cli/src/main/java/org/apache/parquet/cli/util/GetClassLoader.java
index 1cacbd5..9000a3a 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/GetClassLoader.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/GetClassLoader.java
@@ -28,7 +28,7 @@
private final URL[] urls;
public GetClassLoader(List<URL> urls) {
- this.urls = urls.toArray(new URL[urls.size()]);
+ this.urls = urls.toArray(new URL[0]);
}
@Override
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
index dd57cd8..40c05c7 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
@@ -27,10 +27,15 @@
public class AvroFileTest extends ParquetFileTest {
protected File toAvro(File parquetFile) throws IOException {
+ return toAvro(parquetFile, "GZIP");
+ }
+
+ protected File toAvro(File parquetFile, String compressionCodecName) throws IOException {
ToAvroCommand command = new ToAvroCommand(createLogger());
command.targets = Arrays.asList(parquetFile.getAbsolutePath());
File output = new File(getTempFolder(), getClass().getSimpleName() + ".avro");
command.outputPath = output.getAbsolutePath();
+ command.compressionCodecName = compressionCodecName;
command.setConf(new Configuration());
int exitCode = command.run();
assert(exitCode == 0);
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
index ca7fda2..9344a78 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
@@ -16,18 +16,79 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.parquet.cli.commands;
+import com.beust.jcommander.JCommander;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
public class ToAvroCommandTest extends AvroFileTest {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
@Test
- public void testToAvroCommand() throws IOException {
+ public void testToAvroCommandFromParquet() throws IOException {
File avroFile = toAvro(parquetFile());
Assert.assertTrue(avroFile.exists());
}
+
+ @Test
+ public void testToAvroCommandFromJson() throws IOException {
+ final File jsonInputFile = folder.newFile("sample.json");
+ final File avroOutputFile = folder.newFile("sample.avro");
+
+ // Write the json to the file, so we can read it again.
+ final String inputJson = "{\"id\": 1, \"name\": \"Alice\"}\n" +
+ "{\"id\": 2, \"name\": \"Bob\"}\n" +
+ "{\"id\": 3, \"name\": \"Carol\"}\n" +
+ "{\"id\": 4, \"name\": \"Dave\"}";
+
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(jsonInputFile))) {
+ writer.write(inputJson);
+ }
+
+ ToAvroCommand cmd = new ToAvroCommand(null);
+
+ JCommander
+ .newBuilder()
+ .addObject(cmd)
+ .build()
+ .parse(
+ jsonInputFile.getAbsolutePath(),
+ "--output",
+ avroOutputFile.getAbsolutePath()
+ );
+
+ assert (cmd.run() == 0);
+ }
+
+ public void testToAvroCommandWithGzipCompression() throws IOException {
+ File avroFile = toAvro(parquetFile(), "GZIP");
+ Assert.assertTrue(avroFile.exists());
+ }
+
+ @Test
+ public void testToAvroCommandWithSnappyCompression() throws IOException {
+ File avroFile = toAvro(parquetFile(), "SNAPPY");
+ Assert.assertTrue(avroFile.exists());
+ }
+
+ @Test
+ public void testToAvroCommandWithZstdCompression() throws IOException {
+ File avroFile = toAvro(parquetFile(), "ZSTD");
+ Assert.assertTrue(avroFile.exists());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testToAvroCommandWithInvalidCompression() throws IOException {
+ toAvro(parquetFile(), "FOO");
+ }
}
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index bce3cf1..322fa28 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -106,28 +106,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <minimizeJar>true</minimizeJar>
- <artifactSet>
- <includes>
- <include>com.google.guava:guava</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google.common</pattern>
- <shadedPattern>org.apache.parquet.com.google.common</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
diff --git a/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java b/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java
index d19e489..221a876 100644
--- a/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java
+++ b/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java
@@ -84,10 +84,7 @@
try {
return requiresSequentialReads(VersionParser.parse(createdBy), encoding);
- } catch (RuntimeException e) {
- warnParseError(createdBy, e);
- return true;
- } catch (VersionParser.VersionParseException e) {
+ } catch (RuntimeException | VersionParser.VersionParseException e) {
warnParseError(createdBy, e);
return true;
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
index 5724602..1f93735 100644
--- a/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
@@ -94,17 +94,7 @@
// this file was created after the fix
return false;
- } catch (RuntimeException e) {
- // couldn't parse the created_by field, log what went wrong, don't trust the stats,
- // but don't make this fatal.
- warnParseErrorOnce(createdBy, e);
- return true;
- } catch (SemanticVersionParseException e) {
- // couldn't parse the created_by field, log what went wrong, don't trust the stats,
- // but don't make this fatal.
- warnParseErrorOnce(createdBy, e);
- return true;
- } catch (VersionParseException e) {
+ } catch (RuntimeException | SemanticVersionParseException | VersionParseException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseErrorOnce(createdBy, e);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java b/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java
index a8b95f8..b9f9b67 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java
@@ -57,19 +57,13 @@
}
public int getNumDictionaryPagesEncodedAs(Encoding enc) {
- if (dictStats.containsKey(enc)) {
- return dictStats.get(enc);
- } else {
- return 0;
- }
+ final Integer i = dictStats.get(enc);
+ return (i == null) ? 0 : i.intValue();
}
public int getNumDataPagesEncodedAs(Encoding enc) {
- if (dataStats.containsKey(enc)) {
- return dataStats.get(enc);
- } else {
- return 0;
- }
+ final Integer i = dataStats.get(enc);
+ return (i == null) ? 0 : i.intValue();
}
public boolean hasDictionaryPages() {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 519c0f3..ad09223 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -54,6 +54,7 @@
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
+ public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
@@ -95,6 +96,7 @@
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
+ private final int statisticsTruncateLength;
// The key-value pair represents the column name and its expected distinct number of values in a row group.
private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
@@ -106,8 +108,9 @@
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
- boolean pageWriteChecksumEnabled, Map<String, Long> bloomFilterExpectedDistinctNumber,
- Set<String> bloomFilterColumns, int maxBloomFilterBytes) {
+ boolean pageWriteChecksumEnabled, int statisticsTruncateLength,
+ Map<String, Long> bloomFilterExpectedDistinctNumber, Set<String> bloomFilterColumns,
+ int maxBloomFilterBytes) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -121,6 +124,7 @@
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
+ this.statisticsTruncateLength = statisticsTruncateLength;
this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
this.bloomFilterColumns = bloomFilterColumns;
this.maxBloomFilterBytes = maxBloomFilterBytes;
@@ -187,6 +191,18 @@
}
public ColumnWriteStore newColumnWriteStore(MessageType schema,
+ PageWriteStore pageStore) {
+ switch (writerVersion) {
+ case PARQUET_1_0:
+ return new ColumnWriteStoreV1(schema, pageStore, this);
+ case PARQUET_2_0:
+ return new ColumnWriteStoreV2(schema, pageStore, this);
+ default:
+ throw new IllegalArgumentException("unknown version " + writerVersion);
+ }
+ }
+
+ public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore,
BloomFilterWriteStore bloomFilterWriteStore) {
switch (writerVersion) {
@@ -215,6 +231,10 @@
return columnIndexTruncateLength;
}
+ public int getStatisticsTruncateLength() {
+ return statisticsTruncateLength;
+ }
+
public boolean estimateNextSizeCheck() {
return estimateNextSizeCheck;
}
@@ -258,6 +278,7 @@
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+ private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
private Set<String> bloomFilterColumns = new HashSet<>();
@@ -365,11 +386,17 @@
}
public Builder withColumnIndexTruncateLength(int length) {
- Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
+ Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative or zero) : %s", length);
this.columnIndexTruncateLength = length;
return this;
}
+ public Builder withStatisticsTruncateLength(int length) {
+ Preconditions.checkArgument(length > 0, "Invalid statistics min/max truncate length (negative or zero) : %s", length);
+ this.statisticsTruncateLength = length;
+ return this;
+ }
+
/**
* Set max Bloom filter bytes for related columns.
*
@@ -419,8 +446,8 @@
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
- pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs,
- bloomFilterColumns, maxBloomFilterBytes);
+ pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength,
+ bloomFilterColumnExpectedNDVs, bloomFilterColumns, maxBloomFilterBytes);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index 755985d..a39e5e1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -65,9 +65,7 @@
ParsedVersion version;
try {
version = VersionParser.parse(createdBy);
- } catch (RuntimeException e) {
- version = null;
- } catch (VersionParseException e) {
+ } catch (RuntimeException | VersionParseException e) {
version = null;
}
this.writerVersion = version;
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
index 6746729..6cd5395 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -103,6 +103,17 @@
return !hasNonNullValue() || ((min.length() + max.length()) < size);
}
+ public boolean isSmallerThanWithTruncation(long size, int truncationLength) {
+ if (!hasNonNullValue()) {
+ return true;
+ }
+
+ int minTruncateLength = Math.min(min.length(), truncationLength);
+ int maxTruncateLength = Math.min(max.length(), truncationLength);
+
+ return minTruncateLength + maxTruncateLength < size;
+ }
+
/**
* @param min_value a min binary
* @param max_value a max binary
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
index 6411325..9cbedc5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -38,9 +38,7 @@
try {
int length = BytesUtils.readIntLittleEndian(in);
return Binary.fromConstantByteBuffer(in.slice(length));
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
- } catch (RuntimeException e) {
+ } catch (IOException | RuntimeException e) {
throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
}
}
@@ -50,9 +48,7 @@
try {
int length = BytesUtils.readIntLittleEndian(in);
in.skipFully(length);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
- } catch (RuntimeException e) {
+ } catch (IOException | RuntimeException e) {
throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
index eca0f67..f39a5d3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
@@ -402,9 +402,7 @@
public U getUserDefinedPredicate() {
try {
return udpClass.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
index a76b5ee..87f7cf1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
@@ -96,11 +96,8 @@
private ValueInspector[] getValueInspectors(ColumnPath columnPath) {
List<ValueInspector> inspectorsList = valueInspectorsByColumn.get(columnPath);
- if (inspectorsList == null) {
- return new ValueInspector[] {};
- } else {
- return inspectorsList.toArray(new ValueInspector[inspectorsList.size()]);
- }
+ return inspectorsList == null ? new ValueInspector[0]
+ : inspectorsList.toArray(new ValueInspector[0]);
}
@Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
index d8fa677..d9d8e1d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
@@ -19,10 +19,12 @@
package org.apache.parquet.filter2.recordlevel;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
@@ -61,31 +63,20 @@
this.delegate = checkNotNull(delegate, "delegate");
// keep track of which path of indices leads to which primitive column
- Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap<List<Integer>, PrimitiveColumnIO>();
+ Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap<>();
for (PrimitiveColumnIO c : columnIOs) {
- columnIOsByIndexFieldPath.put(getIndexFieldPathList(c), c);
+ List<Integer> indexFieldPath = Arrays.stream(c.getIndexFieldPath())
+ .boxed().collect(Collectors.toList());
+ columnIOsByIndexFieldPath.put(indexFieldPath, c);
}
// create a proxy for the delegate's root converter
this.rootConverter = new FilteringGroupConverter(
- delegate.getRootConverter(), Collections.<Integer>emptyList(), valueInspectorsByColumn, columnIOsByIndexFieldPath);
+ delegate.getRootConverter(), Collections.emptyList(),
+ valueInspectorsByColumn, columnIOsByIndexFieldPath);
}
- public static List<Integer> getIndexFieldPathList(PrimitiveColumnIO c) {
- return intArrayToList(c.getIndexFieldPath());
- }
-
- public static List<Integer> intArrayToList(int[] arr) {
- List<Integer> list = new ArrayList<Integer>(arr.length);
- for (int i : arr) {
- list.add(i);
- }
- return list;
- }
-
-
-
@Override
public T getCurrentRecord() {
@@ -95,12 +86,8 @@
// reset the stateful predicate no matter what
IncrementallyUpdatedFilterPredicateResetter.reset(filterPredicate);
- if (keep) {
- return delegate.getCurrentRecord();
- } else {
- // signals a skip
- return null;
- }
+ // null - signals a skip
+ return (keep) ? delegate.getCurrentRecord() : null;
}
@Override
@@ -112,4 +99,19 @@
public GroupConverter getRootConverter() {
return rootConverter;
}
+
+ // The following two methods are kept for backward compatibility
+ @Deprecated
+ public static List<Integer> getIndexFieldPathList(PrimitiveColumnIO c) {
+ return intArrayToList(c.getIndexFieldPath());
+ }
+
+ @Deprecated
+ public static List<Integer> intArrayToList(int[] arr) {
+ List<Integer> list = new ArrayList<>(arr.length);
+ for (int i : arr) {
+ list.add(i);
+ }
+ return list;
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
index 490cc3e..c5aa8a7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -112,8 +112,8 @@
@Override
ColumnIndexBase<Binary> createColumnIndex(PrimitiveType type) {
BinaryColumnIndex columnIndex = new BinaryColumnIndex(type);
- columnIndex.minValues = minValues.toArray(new Binary[minValues.size()]);
- columnIndex.maxValues = maxValues.toArray(new Binary[maxValues.size()]);
+ columnIndex.minValues = minValues.toArray(new Binary[0]);
+ columnIndex.maxValues = maxValues.toArray(new Binary[0]);
return columnIndex;
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
index a301f67..8a6f007 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
@@ -34,7 +34,7 @@
/**
* Class for truncating min/max values for binary types.
*/
-abstract class BinaryTruncator {
+public abstract class BinaryTruncator {
enum Validity {
VALID, MALFORMED, UNMAPPABLE;
}
@@ -69,12 +69,12 @@
private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() {
@Override
- Binary truncateMin(Binary minValue, int length) {
+ public Binary truncateMin(Binary minValue, int length) {
return minValue;
}
@Override
- Binary truncateMax(Binary maxValue, int length) {
+ public Binary truncateMax(Binary maxValue, int length) {
return maxValue;
}
};
@@ -83,7 +83,7 @@
private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8);
@Override
- Binary truncateMin(Binary minValue, int length) {
+ public Binary truncateMin(Binary minValue, int length) {
if (minValue.length() <= length) {
return minValue;
}
@@ -98,7 +98,7 @@
}
@Override
- Binary truncateMax(Binary maxValue, int length) {
+ public Binary truncateMax(Binary maxValue, int length) {
if (maxValue.length() <= length) {
return maxValue;
}
@@ -176,7 +176,7 @@
}
};
- static BinaryTruncator getTruncator(PrimitiveType type) {
+ public static BinaryTruncator getTruncator(PrimitiveType type) {
if (type == null) {
return NO_OP_TRUNCATOR;
}
@@ -215,7 +215,7 @@
}
}
- abstract Binary truncateMin(Binary minValue, int length);
+ public abstract Binary truncateMin(Binary minValue, int length);
- abstract Binary truncateMax(Binary maxValue, int length);
+ public abstract Binary truncateMax(Binary maxValue, int length);
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
index 9bb3ee4..268a078 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
@@ -52,7 +52,7 @@
type,
getRepetitionLevel(),
getDefinitionLevel());
- this.path = path.toArray(new ColumnIO[path.size()]);
+ this.path = path.toArray(new ColumnIO[0]);
}
@Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
index b4ac363..9bca6e8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
@@ -249,7 +249,7 @@
public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer<T> recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) {
this.recordMaterializer = recordMaterializer;
this.recordRootConverter = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
- PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[root.getLeaves().size()]);
+ PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[0]);
columnReaders = new ColumnReader[leaves.length];
int[][] nextColumnIdxForRepLevel = new int[leaves.length][];
int[][] levelToClose = new int[leaves.length][];
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index 021d171..a6f0145 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -232,12 +232,7 @@
}
private static final ThreadLocal<CharsetEncoder> ENCODER =
- new ThreadLocal<CharsetEncoder>() {
- @Override
- protected CharsetEncoder initialValue() {
- return StandardCharsets.UTF_8.newEncoder();
- }
- };
+ ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder);
private static ByteBuffer encodeUTF8(CharSequence value) {
try {
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
index 52184e1..897fdf8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
@@ -171,10 +171,11 @@
* @return the index of the field with that name
*/
public int getFieldIndex(String name) {
- if (!indexByName.containsKey(name)) {
+ Integer i = indexByName.get(name);
+ if (i == null) {
throw new InvalidRecordException(name + " not found in " + this);
}
- return indexByName.get(name);
+ return i.intValue();
}
/**
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
index 4472376..5f61ed6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
@@ -555,9 +555,6 @@
@Override
@InterfaceAudience.Private
public OriginalType toOriginalType() {
- if (!isAdjustedToUTC) {
- return null;
- }
switch (unit) {
case MILLIS:
return OriginalType.TIME_MILLIS;
@@ -637,9 +634,6 @@
@Override
@InterfaceAudience.Private
public OriginalType toOriginalType() {
- if (!isAdjustedToUTC) {
- return null;
- }
switch (unit) {
case MILLIS:
return OriginalType.TIMESTAMP_MILLIS;
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index a1cd736..456bf14 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -39,7 +39,7 @@
* Types.optional(INT32).named("number");
* </pre>
* <p>
- * The required(PrimitiveTypeName) factory method produces a primitive
+ * The required({@link PrimitiveTypeName}) factory method produces a primitive
* type builder, and the {@link PrimitiveBuilder#named(String)} builds the
* {@link PrimitiveType}. Between {@code required} and {@code named}, other
* builder methods can be used to add type annotations or other type metadata:
@@ -48,7 +48,7 @@
* Types.optional(FIXED_LEN_BYTE_ARRAY).length(20).named("sha1");
* </pre>
* <p>
- * Optional types are built using optional(PrimitiveTypeName) to get
+ * Optional types are built using {@link #optional(PrimitiveTypeName)} to get
* the builder.
* <p>
* Groups are built similarly, using {@code requiredGroup()} (or the optional
@@ -94,7 +94,7 @@
* .named("User")
* </pre>
* <p>
- * Maps are built similarly, using {@code requiredMap()} (or the optionalMap()
+ * Maps are built similarly, using {@code requiredMap()} (or the {@link #optionalMap()}
* version) to return a map builder. Map builders provide {@code key} to add
* a primitive as key or a {@code groupKey} to add a group as key. {@code key()}
* returns a MapKey builder, which extends a primitive builder. On the other hand,
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
index 82e58ef..d0254b8 100644
--- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
@@ -282,92 +282,52 @@
@Test
public void testDecimalAnnotationMissingPrecision() {
assertThrows("Should reject decimal annotation without precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(INT32).as(DECIMAL).scale(2)
- .named("aDecimal")
- .named("DecimalMessage");
- }
- });
- assertThrows("Should reject decimal annotation without precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(INT64).as(DECIMAL).scale(2)
- .named("aDecimal")
- .named("DecimalMessage");
- }
- });
- assertThrows("Should reject decimal annotation without precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(BINARY).as(DECIMAL).scale(2)
- .named("aDecimal")
- .named("DecimalMessage");
- }
- });
- assertThrows("Should reject decimal annotation without precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(FIXED_LEN_BYTE_ARRAY).length(7)
- .as(DECIMAL).scale(2)
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(INT32).as(DECIMAL).scale(2)
.named("aDecimal")
- .named("DecimalMessage");
- }
- }
+ .named("DecimalMessage"));
+ assertThrows("Should reject decimal annotation without precision",
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(INT64).as(DECIMAL).scale(2)
+ .named("aDecimal")
+ .named("DecimalMessage"));
+ assertThrows("Should reject decimal annotation without precision",
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(BINARY).as(DECIMAL).scale(2)
+ .named("aDecimal")
+ .named("DecimalMessage"));
+ assertThrows("Should reject decimal annotation without precision",
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(FIXED_LEN_BYTE_ARRAY).length(7)
+ .as(DECIMAL).scale(2)
+ .named("aDecimal")
+ .named("DecimalMessage")
);
}
@Test
public void testDecimalAnnotationPrecisionScaleBound() {
assertThrows("Should reject scale greater than precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(INT32).as(DECIMAL).precision(3).scale(4)
- .named("aDecimal")
- .named("DecimalMessage");
- }
- });
- assertThrows("Should reject scale greater than precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(INT64).as(DECIMAL).precision(3).scale(4)
- .named("aDecimal")
- .named("DecimalMessage");
- }
- });
- assertThrows("Should reject scale greater than precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(BINARY).as(DECIMAL).precision(3).scale(4)
- .named("aDecimal")
- .named("DecimalMessage");
- }
- });
- assertThrows("Should reject scale greater than precision",
- IllegalArgumentException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.buildMessage()
- .required(FIXED_LEN_BYTE_ARRAY).length(7)
- .as(DECIMAL).precision(3).scale(4)
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(INT32).as(DECIMAL).precision(3).scale(4)
.named("aDecimal")
- .named("DecimalMessage");
- }
- }
+ .named("DecimalMessage"));
+ assertThrows("Should reject scale greater than precision",
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(INT64).as(DECIMAL).precision(3).scale(4)
+ .named("aDecimal")
+ .named("DecimalMessage"));
+ assertThrows("Should reject scale greater than precision",
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(BINARY).as(DECIMAL).precision(3).scale(4)
+ .named("aDecimal")
+ .named("DecimalMessage"));
+ assertThrows("Should reject scale greater than precision",
+ IllegalArgumentException.class, (Callable<Type>) () -> Types.buildMessage()
+ .required(FIXED_LEN_BYTE_ARRAY).length(7)
+ .as(DECIMAL).precision(3).scale(4)
+ .named("aDecimal")
+ .named("DecimalMessage")
);
}
@@ -375,42 +335,22 @@
public void testDecimalAnnotationLengthCheck() {
// maximum precision for 4 bytes is 9
assertThrows("should reject precision 10 with length 4",
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(FIXED_LEN_BYTE_ARRAY).length(4)
- .as(DECIMAL).precision(10).scale(2)
- .named("aDecimal");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(FIXED_LEN_BYTE_ARRAY).length(4)
+ .as(DECIMAL).precision(10).scale(2)
+ .named("aDecimal"));
assertThrows("should reject precision 10 with length 4",
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(INT32)
- .as(DECIMAL).precision(10).scale(2)
- .named("aDecimal");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(INT32)
+ .as(DECIMAL).precision(10).scale(2)
+ .named("aDecimal"));
// maximum precision for 8 bytes is 19
assertThrows("should reject precision 19 with length 8",
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(FIXED_LEN_BYTE_ARRAY).length(8)
- .as(DECIMAL).precision(19).scale(4)
- .named("aDecimal");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(FIXED_LEN_BYTE_ARRAY).length(8)
+ .as(DECIMAL).precision(19).scale(4)
+ .named("aDecimal"));
assertThrows("should reject precision 19 with length 8",
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(INT64).length(8)
- .as(DECIMAL).precision(19).scale(4)
- .named("aDecimal");
- }
- }
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(INT64).length(8)
+ .as(DECIMAL).precision(19).scale(4)
+ .named("aDecimal")
);
}
@@ -421,14 +361,9 @@
};
for (final PrimitiveTypeName type : unsupported) {
assertThrows("Should reject non-binary type: " + type,
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(type)
- .as(DECIMAL).precision(9).scale(2)
- .named("d");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(type)
+ .as(DECIMAL).precision(9).scale(2)
+ .named("d"));
}
}
@@ -453,21 +388,11 @@
};
for (final PrimitiveTypeName type : nonBinary) {
assertThrows("Should reject non-binary type: " + type,
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(type).as(logicalType).named("col");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(type).as(logicalType).named("col"));
}
assertThrows("Should reject non-binary type: FIXED_LEN_BYTE_ARRAY",
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
- .as(logicalType).named("col");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
+ .as(logicalType).named("col"));
}
}
@@ -492,21 +417,11 @@
};
for (final PrimitiveTypeName type : nonInt32) {
assertThrows("Should reject non-int32 type: " + type,
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(type).as(logicalType).named("col");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(type).as(logicalType).named("col"));
}
assertThrows("Should reject non-int32 type: FIXED_LEN_BYTE_ARRAY",
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
- .as(logicalType).named("col");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
+ .as(logicalType).named("col"));
}
}
@@ -531,21 +446,11 @@
};
for (final PrimitiveTypeName type : nonInt64) {
assertThrows("Should reject non-int64 type: " + type,
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(type).as(logicalType).named("col");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(type).as(logicalType).named("col"));
}
assertThrows("Should reject non-int64 type: FIXED_LEN_BYTE_ARRAY",
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
- .as(logicalType).named("col");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
+ .as(logicalType).named("col"));
}
}
@@ -564,25 +469,15 @@
};
for (final PrimitiveTypeName type : nonFixed) {
assertThrows("Should reject non-fixed type: " + type,
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(type).as(INTERVAL).named("interval");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(type).as(INTERVAL).named("interval"));
}
}
@Test
public void testIntervalAnnotationRejectsNonFixed12() {
assertThrows("Should reject fixed with length != 12: " + 11,
- IllegalStateException.class, new Callable<Type>() {
- @Override
- public Type call() throws Exception {
- return Types.required(FIXED_LEN_BYTE_ARRAY).length(11)
- .as(INTERVAL).named("interval");
- }
- });
+ IllegalStateException.class, (Callable<Type>) () -> Types.required(FIXED_LEN_BYTE_ARRAY).length(11)
+ .as(INTERVAL).named("interval"));
}
@Test
@@ -1384,19 +1279,11 @@
@Test
public void testTypeConstructionWithUnsupportedColumnOrder() {
- assertThrows(null, IllegalArgumentException.class, new Callable<PrimitiveType>() {
- @Override
- public PrimitiveType call() {
- return Types.optional(INT96).columnOrder(ColumnOrder.typeDefined()).named("int96_unsupported");
- }
- });
- assertThrows(null, IllegalArgumentException.class, new Callable<PrimitiveType>() {
- @Override
- public PrimitiveType call() {
- return Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL)
- .columnOrder(ColumnOrder.typeDefined()).named("interval_unsupported");
- }
- });
+ assertThrows(null, IllegalArgumentException.class,
+ (Callable<PrimitiveType>) () -> Types.optional(INT96).columnOrder(ColumnOrder.typeDefined()).named("int96_unsupported"));
+ assertThrows(null, IllegalArgumentException.class,
+ (Callable<PrimitiveType>) () -> Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL)
+ .columnOrder(ColumnOrder.typeDefined()).named("interval_unsupported"));
}
@Test
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeUtil.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeUtil.java
index 12ee3e1..d941543 100644
--- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeUtil.java
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeUtil.java
@@ -37,13 +37,10 @@
TestTypeBuilders.assertThrows("Should complain about empty MessageType",
InvalidSchemaException.class,
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- TypeUtil.checkValidWriteSchema(new MessageType("invalid_schema"));
- return null;
- }
- });
+ (Callable<Void>) () -> {
+ TypeUtil.checkValidWriteSchema(new MessageType("invalid_schema"));
+ return null;
+ });
}
@Test
@@ -55,14 +52,11 @@
TestTypeBuilders.assertThrows("Should complain about empty GroupType",
InvalidSchemaException.class,
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- TypeUtil.checkValidWriteSchema(
- new GroupType(REPEATED, "invalid_group"));
- return null;
- }
- });
+ (Callable<Void>) () -> {
+ TypeUtil.checkValidWriteSchema(
+ new GroupType(REPEATED, "invalid_group"));
+ return null;
+ });
}
@Test
@@ -76,14 +70,11 @@
TestTypeBuilders.assertThrows("Should complain about empty GroupType",
InvalidSchemaException.class,
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- TypeUtil.checkValidWriteSchema(Types.buildMessage()
- .addField(new GroupType(REPEATED, "invalid_group"))
- .named("invalid_message"));
- return null;
- }
- });
+ (Callable<Void>) () -> {
+ TypeUtil.checkValidWriteSchema(Types.buildMessage()
+ .addField(new GroupType(REPEATED, "invalid_group"))
+ .named("invalid_message"));
+ return null;
+ });
}
}
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index c5b0779..7935751 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -65,7 +65,7 @@
<dependency>
<groupId>org.apache.yetus</groupId>
<artifactId>audience-annotations</artifactId>
- <version>0.7.0</version>
+ <version>0.11.0</version>
</dependency>
</dependencies>
diff --git a/parquet-common/src/main/java/org/apache/parquet/Strings.java b/parquet-common/src/main/java/org/apache/parquet/Strings.java
index 17a0456..1a9fcee 100644
--- a/parquet-common/src/main/java/org/apache/parquet/Strings.java
+++ b/parquet-common/src/main/java/org/apache/parquet/Strings.java
@@ -38,7 +38,9 @@
* @param s an iterable of strings
* @param on the delimiter
* @return a single joined string
+ * @deprecated Use {@link String#join(CharSequence, Iterable)}
*/
+ @Deprecated
public static String join(Iterable<String> s, String on) {
return join(s.iterator(), on);
}
@@ -51,7 +53,9 @@
* @param iter an iterator of strings
* @param on the delimiter
* @return a single joined string
+ * @deprecated Use {@link String#join(CharSequence, Iterable)}
*/
+ @Deprecated
public static String join(Iterator<String> iter, String on) {
StringBuilder sb = new StringBuilder();
while (iter.hasNext()) {
@@ -71,7 +75,9 @@
* @param s an iterable of strings
* @param on the delimiter
* @return a single joined string
+ * @deprecated Use {@link String#join(CharSequence, Iterable)}
*/
+ @Deprecated
public static String join(String[] s, String on) {
return join(Arrays.asList(s), on);
}
diff --git a/parquet-common/src/main/java/org/apache/parquet/VersionParser.java b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
index fc4ff6c..88b39a6 100644
--- a/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
+++ b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
@@ -53,10 +53,7 @@
try {
sv = SemanticVersion.parse(version);
hasSemver = true;
- } catch (RuntimeException e) {
- sv = null;
- hasSemver = false;
- } catch (SemanticVersionParseException e) {
+ } catch (RuntimeException | SemanticVersionParseException e) {
sv = null;
hasSemver = false;
}
diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java
index 4383b0f..0b3f365 100644
--- a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java
+++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java
@@ -22,8 +22,6 @@
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.parquet.Strings;
-
import static org.apache.parquet.Preconditions.checkNotNull;
public final class ColumnPath implements Iterable<String>, Serializable {
@@ -68,7 +66,7 @@
}
public String toDotString() {
- return Strings.join(p, ".");
+ return String.join(".", p);
}
@Override
diff --git a/parquet-common/src/main/java/org/apache/parquet/util/DynConstructors.java b/parquet-common/src/main/java/org/apache/parquet/util/DynConstructors.java
index e1dddf1..19a1093 100644
--- a/parquet-common/src/main/java/org/apache/parquet/util/DynConstructors.java
+++ b/parquet-common/src/main/java/org/apache/parquet/util/DynConstructors.java
@@ -47,9 +47,7 @@
public C newInstanceChecked(Object... args) throws Exception {
try {
return ctor.newInstance(args);
- } catch (InstantiationException e) {
- throw e;
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
throw e;
} catch (InvocationTargetException e) {
throwIfInstance(e.getCause(), Exception.class);
@@ -136,13 +134,11 @@
try {
Class<?> targetClass = Class.forName(className, true, loader);
impl(targetClass, types);
- } catch (NoClassDefFoundError e) {
+ } catch (NoClassDefFoundError | ClassNotFoundException e) {
// cannot load this implementation
problems.put(className, e);
- } catch (ClassNotFoundException e) {
- // not the right implementation
- problems.put(className, e);
}
+
return this;
}
@@ -176,12 +172,9 @@
try {
Class targetClass = Class.forName(className, true, loader);
hiddenImpl(targetClass, types);
- } catch (NoClassDefFoundError e) {
+ } catch (NoClassDefFoundError | ClassNotFoundException e) {
// cannot load this implementation
problems.put(className, e);
- } catch (ClassNotFoundException e) {
- // not the right implementation
- problems.put(className, e);
}
return this;
}
@@ -196,11 +189,8 @@
Constructor<T> hidden = targetClass.getDeclaredConstructor(types);
AccessController.doPrivileged(new MakeAccessible(hidden));
ctor = new Ctor<T>(hidden, targetClass);
- } catch (SecurityException e) {
- // unusable
- problems.put(methodName(targetClass, types), e);
- } catch (NoSuchMethodException e) {
- // not the right implementation
+ } catch (NoSuchMethodException | SecurityException e) {
+ // unusable or not the right implementation
problems.put(methodName(targetClass, types), e);
}
return this;
diff --git a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java
index e4f025d..c2de44a 100644
--- a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java
+++ b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java
@@ -386,10 +386,8 @@
Method hidden = targetClass.getDeclaredMethod(methodName, argClasses);
AccessController.doPrivileged(new MakeAccessible(hidden));
this.method = new UnboundMethod(hidden, name);
- } catch (SecurityException e) {
- // unusable
- } catch (NoSuchMethodException e) {
- // not the right implementation
+ } catch (SecurityException | NoSuchMethodException e) {
+ // unusable or not the right implementation
}
return this;
}
diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
index 6151c0a..0dc565f 100644
--- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
+++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
@@ -176,12 +176,7 @@
}
assertThrows("Should throw EOFException at end of stream",
- EOFException.class, new Callable<Integer>() {
- @Override
- public Integer call() throws IOException {
- return stream.read();
- }
- });
+ EOFException.class, (Callable<Integer>) stream::read);
checkOriginalData();
}
@@ -233,12 +228,7 @@
Assert.assertEquals("Should consume all buffers", length, stream.position());
assertThrows("Should throw EOFException when empty",
- EOFException.class, new Callable<List<ByteBuffer>>() {
- @Override
- public List<ByteBuffer> call() throws Exception {
- return stream.sliceBuffers(length);
- }
- });
+ EOFException.class, (Callable<List<ByteBuffer>>) () -> stream.sliceBuffers(length));
ByteBufferInputStream copy = ByteBufferInputStream.wrap(buffers);
for (int i = 0; i < length; i += 1) {
@@ -365,12 +355,9 @@
final int length = stream2.available();
assertThrows("Should throw when out of bytes",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- stream2.skipFully(length + 10);
- return null;
- }
+ EOFException.class, () -> {
+ stream2.skipFully(length + 10);
+ return null;
});
}
@@ -499,12 +486,9 @@
final ByteBufferInputStream stream = newStream();
assertThrows("Should throw an error for reset() without mark()",
- IOException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- stream.reset();
- return null;
- }
+ IOException.class, () -> {
+ stream.reset();
+ return null;
});
}
@@ -549,12 +533,9 @@
Assert.assertEquals("Should read 6 bytes", 6, stream.read(new byte[6]));
assertThrows("Should throw an error for reset() after limit",
- IOException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- stream.reset();
- return null;
- }
+ IOException.class, () -> {
+ stream.reset();
+ return null;
});
}
@@ -568,12 +549,9 @@
stream.reset();
assertThrows("Should throw an error for double reset()",
- IOException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- stream.reset();
- return null;
- }
+ IOException.class, () -> {
+ stream.reset();
+ return null;
});
}
diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java b/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
index 078bc8f..a9150dd 100644
--- a/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
+++ b/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
@@ -68,12 +68,9 @@
Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos());
TestUtils.assertThrows("Should throw EOFException if no more bytes left",
- EOFException.class, new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- DelegatingSeekableInputStream.readFully(stream, buffer, 0, 1);
- return null;
- }
+ EOFException.class, (Callable<Void>) () -> {
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, 1);
+ return null;
});
}
@@ -84,12 +81,9 @@
final MockInputStream stream = new MockInputStream(2, 3, 3);
TestUtils.assertThrows("Should throw EOFException if no more bytes left",
- EOFException.class, new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
- return null;
- }
+ EOFException.class, (Callable<Void>) () -> {
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+ return null;
});
Assert.assertArrayEquals("Should have consumed bytes",
@@ -131,12 +125,7 @@
Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
}
- private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
- @Override
- protected byte[] initialValue() {
- return new byte[8192];
- }
- };
+ private static final ThreadLocal<byte[]> TEMP = ThreadLocal.withInitial(() -> new byte[8192]);
@Test
public void testHeapRead() throws Exception {
@@ -523,12 +512,9 @@
final MockInputStream stream = new MockInputStream();
TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
- return null;
- }
+ EOFException.class, () -> {
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ return null;
});
Assert.assertEquals(0, readBuffer.position());
@@ -684,12 +670,9 @@
final MockInputStream stream = new MockInputStream();
TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
- return null;
- }
+ EOFException.class, () -> {
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ return null;
});
// NOTE: This behavior differs from readFullyHeapBuffer because direct uses
diff --git a/parquet-common/src/test/java/org/apache/parquet/util/TestDynConstructors.java b/parquet-common/src/test/java/org/apache/parquet/util/TestDynConstructors.java
index 1ab9582..2a8a58f 100644
--- a/parquet-common/src/test/java/org/apache/parquet/util/TestDynConstructors.java
+++ b/parquet-common/src/test/java/org/apache/parquet/util/TestDynConstructors.java
@@ -31,20 +31,10 @@
final DynConstructors.Builder builder = new DynConstructors.Builder();
TestUtils.assertThrows("Checked build should throw NoSuchMethodException",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return builder.buildChecked();
- }
- });
+ NoSuchMethodException.class, (Callable) builder::buildChecked);
TestUtils.assertThrows("Normal build should throw RuntimeException",
- RuntimeException.class, new Runnable() {
- @Override
- public void run() {
- builder.build();
- }
- });
+ RuntimeException.class, (Runnable) builder::build);
}
@Test
@@ -53,20 +43,10 @@
.impl("not.a.RealClass");
TestUtils.assertThrows("Checked build should throw NoSuchMethodException",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return builder.buildChecked();
- }
- });
+ NoSuchMethodException.class, (Callable) builder::buildChecked);
TestUtils.assertThrows("Normal build should throw RuntimeException",
- RuntimeException.class, new Runnable() {
- @Override
- public void run() {
- builder.build();
- }
- });
+ RuntimeException.class, (Callable) builder::build);
}
@Test
@@ -75,20 +55,10 @@
.impl(Concatenator.class, String.class, String.class);
TestUtils.assertThrows("Checked build should throw NoSuchMethodException",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return builder.buildChecked();
- }
- });
+ NoSuchMethodException.class, (Callable) builder::buildChecked);
TestUtils.assertThrows("Normal build should throw RuntimeException",
- RuntimeException.class, new Runnable() {
- @Override
- public void run() {
- builder.build();
- }
- });
+ RuntimeException.class, (Callable) builder::build);
}
@Test
@@ -104,20 +74,10 @@
"a-b", dashCat.concat("a", "b"));
TestUtils.assertThrows("Should complain about extra arguments",
- IllegalArgumentException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return sepCtor.newInstanceChecked("/", "-");
- }
- });
+ IllegalArgumentException.class, () -> sepCtor.newInstanceChecked("/", "-"));
TestUtils.assertThrows("Should complain about extra arguments",
- IllegalArgumentException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return sepCtor.newInstance("/", "-");
- }
- });
+ IllegalArgumentException.class, () -> sepCtor.newInstance("/", "-"));
DynConstructors.Ctor<Concatenator> defaultCtor = new DynConstructors.Builder()
.impl("not.a.RealClass", String.class)
@@ -139,20 +99,10 @@
.buildChecked();
TestUtils.assertThrows("Should re-throw the exception",
- SomeCheckedException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return sepCtor.newInstanceChecked(exc);
- }
- });
+ SomeCheckedException.class, () -> sepCtor.newInstanceChecked(exc));
TestUtils.assertThrows("Should wrap the exception in RuntimeException",
- RuntimeException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return sepCtor.newInstance(exc);
- }
- });
+ RuntimeException.class, () -> sepCtor.newInstance(exc));
}
@Test
@@ -167,14 +117,9 @@
@Test
public void testHiddenMethod() throws Exception {
TestUtils.assertThrows("Should fail to find hidden method",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return new DynMethods.Builder("setSeparator")
- .impl(Concatenator.class, char.class)
- .buildChecked();
- }
- });
+ NoSuchMethodException.class, () -> new DynMethods.Builder("setSeparator")
+ .impl(Concatenator.class, char.class)
+ .buildChecked());
final DynConstructors.Ctor<Concatenator> sepCtor = new DynConstructors.Builder()
.hiddenImpl(Concatenator.class.getName(), char.class)
@@ -197,12 +142,7 @@
Assert.assertTrue("Should always be static", ctor.isStatic());
TestUtils.assertThrows("Should complain that method is static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return ctor.bind(null);
- }
- });
+ IllegalStateException.class, () -> ctor.bind(null));
}
@Test
@@ -212,20 +152,10 @@
.buildChecked();
TestUtils.assertThrows("Should complain that target must be null",
- IllegalArgumentException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return ctor.invokeChecked("a");
- }
- });
+ IllegalArgumentException.class, () -> ctor.invokeChecked("a"));
TestUtils.assertThrows("Should complain that target must be null",
- IllegalArgumentException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return ctor.invoke("a");
- }
- });
+ IllegalArgumentException.class, () -> ctor.invoke("a"));
Assert.assertNotNull("Should allow invokeChecked(null, ...)",
ctor.invokeChecked(null));
diff --git a/parquet-common/src/test/java/org/apache/parquet/util/TestDynMethods.java b/parquet-common/src/test/java/org/apache/parquet/util/TestDynMethods.java
index 7017c67..f2e832f 100644
--- a/parquet-common/src/test/java/org/apache/parquet/util/TestDynMethods.java
+++ b/parquet-common/src/test/java/org/apache/parquet/util/TestDynMethods.java
@@ -31,20 +31,10 @@
final DynMethods.Builder builder = new DynMethods.Builder("concat");
TestUtils.assertThrows("Checked build should throw NoSuchMethodException",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return builder.buildChecked();
- }
- });
+ NoSuchMethodException.class, (Callable) builder::buildChecked);
TestUtils.assertThrows("Normal build should throw RuntimeException",
- RuntimeException.class, new Runnable() {
- @Override
- public void run() {
- builder.build();
- }
- });
+ RuntimeException.class, (Callable) builder::build);
}
@Test
@@ -53,20 +43,10 @@
.impl("not.a.RealClass", String.class, String.class);
TestUtils.assertThrows("Checked build should throw NoSuchMethodException",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return builder.buildChecked();
- }
- });
+ NoSuchMethodException.class, (Callable) builder::buildChecked);
TestUtils.assertThrows("Normal build should throw RuntimeException",
- RuntimeException.class, new Runnable() {
- @Override
- public void run() {
- builder.build();
- }
- });
+ RuntimeException.class, (Runnable) builder::build);
}
@Test
@@ -75,20 +55,10 @@
.impl(Concatenator.class, "cat2strings", String.class, String.class);
TestUtils.assertThrows("Checked build should throw NoSuchMethodException",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return builder.buildChecked();
- }
- });
+ NoSuchMethodException.class, (Callable) builder::buildChecked);
TestUtils.assertThrows("Normal build should throw RuntimeException",
- RuntimeException.class, new Runnable() {
- @Override
- public void run() {
- builder.build();
- }
- });
+ RuntimeException.class, (Runnable) builder::build);
}
@Test
@@ -144,20 +114,10 @@
.buildChecked();
TestUtils.assertThrows("Should fail if non-string arguments are passed",
- IllegalArgumentException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return cat.invoke(obj, 3, 4);
- }
- });
+ IllegalArgumentException.class, () -> cat.invoke(obj, 3, 4));
TestUtils.assertThrows("Should fail if non-string arguments are passed",
- IllegalArgumentException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return cat.invokeChecked(obj, 3, 4);
- }
- });
+ IllegalArgumentException.class, () -> cat.invokeChecked(obj, 3, 4));
}
@Test
@@ -170,20 +130,10 @@
.buildChecked();
TestUtils.assertThrows("Should re-throw the exception",
- SomeCheckedException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return cat.invokeChecked(obj, exc);
- }
- });
+ SomeCheckedException.class, () -> cat.invokeChecked(obj, exc));
TestUtils.assertThrows("Should wrap the exception in RuntimeException",
- RuntimeException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return cat.invoke(obj, exc);
- }
- });
+ RuntimeException.class, () -> cat.invoke(obj, exc));
}
@Test
@@ -213,14 +163,9 @@
Concatenator obj = new Concatenator("-");
TestUtils.assertThrows("Should fail to find hidden method",
- NoSuchMethodException.class, new Callable() {
- @Override
- public Object call() throws NoSuchMethodException {
- return new DynMethods.Builder("setSeparator")
- .impl(Concatenator.class, String.class)
- .buildChecked();
- }
- });
+ NoSuchMethodException.class, () -> new DynMethods.Builder("setSeparator")
+ .impl(Concatenator.class, String.class)
+ .buildChecked());
DynMethods.UnboundMethod changeSep = new DynMethods.Builder("setSeparator")
.hiddenImpl(Concatenator.class, String.class)
@@ -264,31 +209,16 @@
.impl(Concatenator.class, String[].class);
TestUtils.assertThrows("Should complain that method is static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return builder.buildChecked(new Concatenator());
- }
- });
+ IllegalStateException.class, () -> builder.buildChecked(new Concatenator()));
TestUtils.assertThrows("Should complain that method is static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return builder.build(new Concatenator());
- }
- });
+ IllegalStateException.class, () -> builder.build(new Concatenator()));
final DynMethods.UnboundMethod staticCat = builder.buildChecked();
Assert.assertTrue("Should be static", staticCat.isStatic());
TestUtils.assertThrows("Should complain that method is static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return staticCat.bind(new Concatenator());
- }
- });
+ IllegalStateException.class, () -> staticCat.bind(new Concatenator()));
}
@Test
@@ -308,32 +238,17 @@
.impl(Concatenator.class, String.class, String.class);
TestUtils.assertThrows("Should complain that method is not static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return builder.buildStatic();
- }
- });
+ IllegalStateException.class, builder::buildStatic);
TestUtils.assertThrows("Should complain that method is not static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return builder.buildStaticChecked();
- }
- });
+ IllegalStateException.class, builder::buildStaticChecked);
final DynMethods.UnboundMethod cat2 = builder.buildChecked();
Assert.assertFalse("concat(String,String) should not be static",
cat2.isStatic());
TestUtils.assertThrows("Should complain that method is not static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return cat2.asStatic();
- }
- });
+ IllegalStateException.class, cat2::asStatic);
}
@Test
@@ -352,19 +267,9 @@
// constructors cannot be bound
TestUtils.assertThrows("Should complain that ctor method is static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return builder.buildChecked(new Concatenator());
- }
- });
+ IllegalStateException.class, () -> builder.buildChecked(new Concatenator()));
TestUtils.assertThrows("Should complain that ctor method is static",
- IllegalStateException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- return builder.build(new Concatenator());
- }
- });
+ IllegalStateException.class, () -> builder.build(new Concatenator()));
Concatenator concatenator = newConcatenator.asStatic().invoke("*");
Assert.assertEquals("Should function as a concatenator",
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
index f345596..177b45f 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
@@ -76,15 +76,8 @@
private static Object getStaticField(String className, String fieldName) {
try {
return Class.forName(className).getField(fieldName).get(null);
- } catch (IllegalArgumentException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- } catch (SecurityException e) {
- throw new RuntimeException(e);
- } catch (ClassNotFoundException e) {
+ } catch (IllegalArgumentException | IllegalAccessException
+ | NoSuchFieldException | SecurityException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml
index a05cd2b..0ccf2ae 100644
--- a/parquet-format-structures/pom.xml
+++ b/parquet-format-structures/pom.xml
@@ -126,9 +126,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
+ <version>${maven-javadoc-plugin.version}</version>
<configuration>
<!-- We have to turn off the javadoc check because thrift generates improper comments -->
- <additionalparam>-Xdoclint:none</additionalparam>
+ <doclint>none</doclint>
+ <source>8</source>
</configuration>
</plugin>
</plugins>
@@ -139,9 +141,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
+ <version>${maven-javadoc-plugin.version}</version>
<configuration>
<!-- We have to turn off the javadoc check because thrift generates improper comments -->
- <additionalparam>-Xdoclint:none</additionalparam>
+ <doclint>none</doclint>
+ <source>8</source>
</configuration>
</plugin>
</plugins>
diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/event/Consumers.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/event/Consumers.java
index ef87997..ae6e908 100644
--- a/parquet-format-structures/src/main/java/org/apache/parquet/format/event/Consumers.java
+++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/event/Consumers.java
@@ -183,9 +183,7 @@
protected T newObject() {
try {
return c.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(c.getName(), e);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(c.getName(), e);
}
}
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index d4fbf05..c493941 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -71,7 +71,7 @@
<dependency>
<groupId>${jackson.groupId}</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index f059023..5e4bd09 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
@@ -134,11 +135,9 @@
}
public boolean isEnabled(String property, boolean defaultValue) {
- if (properties.containsKey(property)) {
- return Boolean.valueOf(properties.get(property));
- } else {
- return defaultValue;
- }
+ Optional<String> propValue = Optional.ofNullable(properties.get(property));
+ return propValue.isPresent() ? Boolean.valueOf(propValue.get())
+ : defaultValue;
}
public static Builder builder() {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 5041d7d..625bbc3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -44,6 +44,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.format.BsonType;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.DateType;
@@ -91,10 +93,12 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.BinaryTruncator;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
@@ -120,13 +124,17 @@
private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR = new LogicalTypeConverterVisitor();
private static final ConvertedTypeConverterVisitor CONVERTED_TYPE_CONVERTER_VISITOR = new ConvertedTypeConverterVisitor();
-
+ private final int statisticsTruncateLength;
private final boolean useSignedStringMinMax;
public ParquetMetadataConverter() {
this(false);
}
+ public ParquetMetadataConverter(int statisticsTruncateLength) {
+ this(false, statisticsTruncateLength);
+ }
+
/**
* @param conf a configuration
* @deprecated will be removed in 2.0.0; use {@code ParquetMetadataConverter(ParquetReadOptions)}
@@ -141,7 +149,15 @@
}
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
+ this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ }
+
+ private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
+ if (statisticsTruncateLength <= 0) {
+ throw new IllegalArgumentException("Truncate length should be greater than 0");
+ }
this.useSignedStringMinMax = useSignedStringMinMax;
+ this.statisticsTruncateLength = statisticsTruncateLength;
}
// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -309,9 +325,6 @@
@Override
public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
- if (!timeLogicalType.isAdjustedToUTC()) {
- return empty();
- }
switch (timeLogicalType.getUnit()) {
case MILLIS:
return of(ConvertedType.TIME_MILLIS);
@@ -326,9 +339,6 @@
@Override
public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
- if (!timestampLogicalType.isAdjustedToUTC()) {
- return empty();
- }
switch (timestampLogicalType.getUnit()) {
case MICROS:
return of(ConvertedType.TIMESTAMP_MICROS);
@@ -465,7 +475,7 @@
columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
if (!columnMetaData.getStatistics().isEmpty()) {
- columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
+ columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
}
if (columnMetaData.getEncodingStats() != null) {
columnChunk.meta_data.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
@@ -583,18 +593,31 @@
}
public static Statistics toParquetStatistics(
- org.apache.parquet.column.statistics.Statistics stats) {
+ org.apache.parquet.column.statistics.Statistics stats) {
+ return toParquetStatistics(stats, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ }
+
+ public static Statistics toParquetStatistics(
+ org.apache.parquet.column.statistics.Statistics stats, int truncateLength) {
Statistics formatStats = new Statistics();
// Don't write stats larger than the max size rather than truncating. The
// rationale is that some engines may use the minimum value in the page as
// the true minimum for aggregations and there is no way to mark that a
// value has been truncated and is a lower bound and not in the page.
- if (!stats.isEmpty() && stats.isSmallerThan(MAX_STATS_SIZE)) {
+ if (!stats.isEmpty() && withinLimit(stats, truncateLength)) {
formatStats.setNull_count(stats.getNumNulls());
if (stats.hasNonNullValue()) {
- byte[] min = stats.getMinBytes();
- byte[] max = stats.getMaxBytes();
+ byte[] min;
+ byte[] max;
+ if (stats instanceof BinaryStatistics && truncateLength != Integer.MAX_VALUE) {
+ BinaryTruncator truncator = BinaryTruncator.getTruncator(stats.type());
+ min = tuncateMin(truncator, truncateLength, stats.getMinBytes());
+ max = tuncateMax(truncator, truncateLength, stats.getMaxBytes());
+ } else {
+ min = stats.getMinBytes();
+ max = stats.getMaxBytes();
+ }
// Fill the former min-max statistics only if the comparison logic is
// signed so the logic of V1 and V2 stats are the same (which is
// trivially true for equal min-max values)
@@ -612,6 +635,27 @@
return formatStats;
}
+ private static boolean withinLimit(org.apache.parquet.column.statistics.Statistics stats, int truncateLength) {
+ if (stats.isSmallerThan(MAX_STATS_SIZE)) {
+ return true;
+ }
+
+ if (!(stats instanceof BinaryStatistics)) {
+ return false;
+ }
+
+ BinaryStatistics binaryStatistics = (BinaryStatistics) stats;
+ return binaryStatistics.isSmallerThanWithTruncation(MAX_STATS_SIZE, truncateLength);
+ }
+
+ private static byte[] tuncateMin(BinaryTruncator truncator, int truncateLength, byte[] input) {
+ return truncator.truncateMin(Binary.fromConstantByteArray(input), truncateLength).getBytes();
+ }
+
+ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, byte[] input) {
+ return truncator.truncateMax(Binary.fromConstantByteArray(input), truncateLength).getBytes();
+ }
+
private static boolean isMinMaxStatsSupported(PrimitiveType type) {
return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER;
}
@@ -1234,7 +1278,7 @@
}
private static ColumnPath getPath(ColumnMetaData metaData) {
- String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
+ String[] path = metaData.path_in_schema.toArray(new String[0]);
return ColumnPath.get(path);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 2e646e7..6f21fa3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -19,12 +19,14 @@
package org.apache.parquet.hadoop;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PrimitiveIterator;
+import java.util.Queue;
+
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.DataPage;
@@ -61,7 +63,7 @@
private final BytesInputDecompressor decompressor;
private final long valueCount;
- private final List<DataPage> compressedPages;
+ private final Queue<DataPage> compressedPages;
private final DictionaryPage compressedDictionaryPage;
// null means no page synchronization is required; firstRowIndex will not be returned by the pages
private final OffsetIndex offsetIndex;
@@ -71,7 +73,7 @@
ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages,
DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount) {
this.decompressor = decompressor;
- this.compressedPages = new LinkedList<DataPage>(compressedPages);
+ this.compressedPages = new ArrayDeque<DataPage>(compressedPages);
this.compressedDictionaryPage = compressedDictionaryPage;
long count = 0;
for (DataPage p : compressedPages) {
@@ -89,10 +91,10 @@
@Override
public DataPage readPage() {
- if (compressedPages.isEmpty()) {
+ final DataPage compressedPage = compressedPages.poll();
+ if (compressedPage == null) {
return null;
}
- DataPage compressedPage = compressedPages.remove(0);
final int currentPageIndex = pageIndex++;
return compressedPage.accept(new DataPage.Visitor<DataPage>() {
@Override
@@ -225,10 +227,11 @@
@Override
public PageReader getPageReader(ColumnDescriptor path) {
- if (!readers.containsKey(path)) {
+ final PageReader pageReader = readers.get(path);
+ if (pageReader == null) {
throw new IllegalArgumentException(path + " is not in the store: " + readers.keySet() + " " + rowCount);
}
- return readers.get(path);
+ return pageReader;
}
@Override
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java
new file mode 100644
index 0000000..b9cb4ab
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_ASCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_DESCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_GTEQ_VALUE;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_ASCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_DESCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_LTEQ_VALUE;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_COUNT_CORRECT;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_MAX;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_MIN;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_VALUES;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReadStore;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.DummyRecordConverter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ColumnIndexValidator {
+
+ public enum Contract {
+ MIN_LTEQ_VALUE(
+ "The min value stored in the index for the page must be less than or equal to all values in the page.\n"
+ + "Actual value in the page: %s\n"
+ + "Min value in the index: %s\n"),
+ MAX_GTEQ_VALUE(
+ "The max value stored in the index for the page must be greater than or equal to all values in the page.\n"
+ + "Actual value in the page: %s\n"
+ + "Max value in the index: %s\n"),
+ NULL_COUNT_CORRECT(
+ "The null count stored in the index for the page must be equal to the number of nulls in the page.\n"
+ + "Actual null count: %s\n"
+ + "Null count in the index: %s\n"),
+ NULL_PAGE_HAS_NO_VALUES("Only pages consisting entirely of NULL-s can be marked as a null page in the index.\n"
+ + "Actual non-null value in the page: %s"),
+ NULL_PAGE_HAS_NO_MIN("A null page shall not have a min value in the index\n"
+ + "Min value in the index: %s\n"),
+ NULL_PAGE_HAS_NO_MAX("A null page shall not have a max value in the index\n"
+ + "Max value in the index: %s\n"),
+ MIN_ASCENDING(
+ "According to the ASCENDING boundary order, the min value for a page must be greater than or equal to the min value of the previous page.\n"
+ + "Min value for the page: %s\n"
+ + "Min value for the previous page: %s\n"),
+ MAX_ASCENDING(
+ "According to the ASCENDING boundary order, the max value for a page must be greater than or equal to the max value of the previous page.\n"
+ + "Max value for the page: %s\n"
+ + "Max value for the previous page: %s\n"),
+ MIN_DESCENDING(
+ "According to the DESCENDING boundary order, the min value for a page must be less than or equal to the min value of the previous page.\n"
+ + "Min value for the page: %s\n"
+ + "Min value for the previous page: %s\n"),
+ MAX_DESCENDING(
+ "According to the DESCENDING boundary order, the max value for a page must be less than or equal to the max value of the previous page.\n"
+ + "Max value for the page: %s\n"
+ + "Max value for the previous page: %s\n");
+
+ public final String description;
+
+ Contract(String description) {
+ this.description = description;
+ }
+ }
+
+ public static class ContractViolation {
+ public ContractViolation(Contract violatedContract, String referenceValue, String offendingValue,
+ int rowGroupNumber, int columnNumber, ColumnPath columnPath, int pageNumber) {
+ this.violatedContract = violatedContract;
+ this.referenceValue = referenceValue;
+ this.offendingValue = offendingValue;
+ this.rowGroupNumber = rowGroupNumber;
+ this.columnNumber = columnNumber;
+ this.columnPath = columnPath;
+ this.pageNumber = pageNumber;
+ }
+
+ private final Contract violatedContract;
+ private final String referenceValue;
+ private final String offendingValue;
+ private final int rowGroupNumber;
+ private final int columnNumber;
+ private final ColumnPath columnPath;
+ private final int pageNumber;
+
+ @Override
+ public String toString() {
+ return String.format(
+ "Contract violation\nLocation: row group %d, column %d (\"%s\"), page %d\nViolated contract: "
+ + violatedContract.description,
+ rowGroupNumber, columnNumber, columnPath.toDotString(), pageNumber,
+ referenceValue,
+ offendingValue);
+ }
+ }
+
+ static interface StatValue extends Comparable<StatValue> {
+ int compareToValue(ColumnReader reader);
+
+ abstract class Builder {
+ final PrimitiveComparator<Binary> comparator;
+ final PrimitiveStringifier stringifier;
+
+ Builder(PrimitiveType type) {
+ comparator = type.comparator();
+ stringifier = type.stringifier();
+ }
+
+ abstract StatValue build(ByteBuffer value);
+
+ abstract String stringifyValue(ColumnReader reader);
+ }
+ }
+
+ static StatValue.Builder getBuilder(PrimitiveType type) {
+ switch (type.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ return new BinaryStatValueBuilder(type);
+ case BOOLEAN:
+ return new BooleanStatValueBuilder(type);
+ case DOUBLE:
+ return new DoubleStatValueBuilder(type);
+ case FLOAT:
+ return new FloatStatValueBuilder(type);
+ case INT32:
+ return new IntStatValueBuilder(type);
+ case INT64:
+ return new LongStatValueBuilder(type);
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+ }
+
+ private static class BinaryStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final Binary value;
+
+ private Value(Binary value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getBinary());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private BinaryStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(Binary.fromConstantByteBuffer(value));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getBinary());
+ }
+ }
+
+ private static class BooleanStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final boolean value;
+
+ private Value(boolean value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getBoolean());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private BooleanStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.get(0) != 0);
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getBoolean());
+ }
+ }
+
+ private static class DoubleStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final double value;
+
+ private Value(double value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getDouble());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private DoubleStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getDouble(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getDouble());
+ }
+ }
+
+ private static class FloatStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final float value;
+
+ private Value(float value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getFloat());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private FloatStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getFloat(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getFloat());
+ }
+ }
+
+ private static class IntStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final int value;
+
+ private Value(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getInteger());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private IntStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getInt(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getInteger());
+ }
+ }
+
+ private static class LongStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final long value;
+
+ private Value(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getLong());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private LongStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getLong(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getLong());
+ }
+ }
+
+ private static class PageValidator {
+ private final int rowGroupNumber;
+ private final int columnNumber;
+ private final ColumnPath columnPath;
+ private final int pageNumber;
+ private final int maxDefinitionLevel;
+ private final long nullCountInIndex;
+ private long nullCountActual;
+ private final boolean isNullPage;
+ private final ColumnReader columnReader;
+ private final List<ContractViolation> violations;
+ private final Set<Contract> pageViolations = EnumSet.noneOf(Contract.class);
+ private final StatValue minValue;
+ private final StatValue maxValue;
+ private final StatValue.Builder statValueBuilder;
+
+ PageValidator(
+ PrimitiveType type,
+ int rowGroupNumber,
+ int columnNumber,
+ ColumnPath columnPath,
+ int pageNumber,
+ List<ContractViolation> violations,
+ ColumnReader columnReader,
+ ByteBuffer minValue,
+ ByteBuffer maxValue,
+ ByteBuffer prevMinValue,
+ ByteBuffer prevMaxValue,
+ BoundaryOrder boundaryOrder,
+ long nullCount,
+ boolean isNullPage) {
+ this.columnReader = columnReader;
+ this.rowGroupNumber = rowGroupNumber;
+ this.columnNumber = columnNumber;
+ this.columnPath = columnPath;
+ this.pageNumber = pageNumber;
+ this.nullCountInIndex = nullCount;
+ this.nullCountActual = 0;
+ this.isNullPage = isNullPage;
+ this.maxDefinitionLevel = columnReader.getDescriptor().getMaxDefinitionLevel();
+ this.violations = violations;
+ this.statValueBuilder = getBuilder(type);
+ this.minValue = isNullPage ? null : statValueBuilder.build(minValue);
+ this.maxValue = isNullPage ? null : statValueBuilder.build(maxValue);
+
+ if (isNullPage) {
+ // By specification null pages have empty byte arrays as min/max values
+ validateContract(!minValue.hasRemaining(),
+ NULL_PAGE_HAS_NO_MIN,
+ () -> statValueBuilder.build(minValue).toString());
+ validateContract(!maxValue.hasRemaining(),
+ NULL_PAGE_HAS_NO_MAX,
+ () -> statValueBuilder.build(maxValue).toString());
+ } else if (prevMinValue != null) {
+ validateBoundaryOrder(statValueBuilder.build(prevMinValue), statValueBuilder.build(prevMaxValue),
+ boundaryOrder);
+ }
+ }
+
+ void validateValuesBelongingToRow() {
+ do {
+ if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) {
+ validateValue();
+ } else {
+ ++nullCountActual;
+ }
+ columnReader.consume();
+ } while (columnReader.getCurrentRepetitionLevel() != 0);
+ }
+
+ void finishPage() {
+ validateContract(nullCountInIndex == nullCountActual,
+ NULL_COUNT_CORRECT,
+ () -> Long.toString(nullCountActual),
+ () -> Long.toString(nullCountInIndex));
+ }
+
+ void validateContract(boolean contractCondition,
+ Contract type,
+ Supplier<String> value1) {
+ validateContract(contractCondition, type, value1, () -> "N/A");
+ }
+
+ void validateContract(boolean contractCondition,
+ Contract type,
+ Supplier<String> value1,
+ Supplier<String> value2) {
+ if (!contractCondition && !pageViolations.contains(type)) {
+ violations.add(
+ new ContractViolation(type, value1.get(), value2.get(), rowGroupNumber,
+ columnNumber, columnPath, pageNumber));
+ pageViolations.add(type);
+ }
+ }
+
+ private void validateValue() {
+ validateContract(!isNullPage,
+ NULL_PAGE_HAS_NO_VALUES,
+ () -> statValueBuilder.stringifyValue(columnReader));
+ validateContract(minValue.compareToValue(columnReader) <= 0,
+ MIN_LTEQ_VALUE,
+ () -> statValueBuilder.stringifyValue(columnReader),
+ minValue::toString);
+ validateContract(maxValue.compareToValue(columnReader) >= 0,
+ MAX_GTEQ_VALUE,
+ () -> statValueBuilder.stringifyValue(columnReader),
+ maxValue::toString);
+ }
+
+ private void validateBoundaryOrder(StatValue prevMinValue, StatValue prevMaxValue, BoundaryOrder boundaryOrder) {
+ switch (boundaryOrder) {
+ case ASCENDING:
+ validateContract(minValue.compareTo(prevMinValue) >= 0,
+ MIN_ASCENDING,
+ minValue::toString,
+ prevMinValue::toString);
+ validateContract(maxValue.compareTo(prevMaxValue) >= 0,
+ MAX_ASCENDING,
+ maxValue::toString,
+ prevMaxValue::toString);
+ break;
+ case DESCENDING:
+ validateContract(minValue.compareTo(prevMinValue) <= 0,
+ MIN_DESCENDING,
+ minValue::toString,
+ prevMinValue::toString);
+ validateContract(maxValue.compareTo(prevMaxValue) <= 0,
+ MAX_DESCENDING,
+ maxValue::toString,
+ prevMaxValue::toString);
+ break;
+ case UNORDERED:
+ // No checks necessary.
+ }
+ }
+ }
+
+ public static List<ContractViolation> checkContractViolations(InputFile file) throws IOException {
+ List<ContractViolation> violations = new ArrayList<>();
+ try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+ FileMetaData meta = reader.getFooter().getFileMetaData();
+ MessageType schema = meta.getSchema();
+ List<ColumnDescriptor> columns = schema.getColumns();
+
+ List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+ int rowGroupNumber = 0;
+ PageReadStore rowGroup = reader.readNextRowGroup();
+ while (rowGroup != null) {
+ ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup,
+ new DummyRecordConverter(schema).getRootConverter(), schema, null);
+ List<ColumnChunkMetaData> columnChunks = blocks.get(rowGroupNumber).getColumns();
+ assert (columnChunks.size() == columns.size());
+ for (int columnNumber = 0; columnNumber < columns.size(); ++columnNumber) {
+ ColumnDescriptor column = columns.get(columnNumber);
+ ColumnChunkMetaData columnChunk = columnChunks.get(columnNumber);
+ ColumnIndex columnIndex = reader.readColumnIndex(columnChunk);
+ if (columnIndex == null) {
+ continue;
+ }
+ ColumnPath columnPath = columnChunk.getPath();
+ OffsetIndex offsetIndex = reader.readOffsetIndex(columnChunk);
+ List<ByteBuffer> minValues = columnIndex.getMinValues();
+ List<ByteBuffer> maxValues = columnIndex.getMaxValues();
+ BoundaryOrder boundaryOrder = columnIndex.getBoundaryOrder();
+ List<Long> nullCounts = columnIndex.getNullCounts();
+ List<Boolean> nullPages = columnIndex.getNullPages();
+ long rowNumber = 0;
+ ColumnReader columnReader = columnReadStore.getColumnReader(column);
+ ByteBuffer prevMinValue = null;
+ ByteBuffer prevMaxValue = null;
+ for (int pageNumber = 0; pageNumber < offsetIndex.getPageCount(); ++pageNumber) {
+ boolean isNullPage = nullPages.get(pageNumber);
+ ByteBuffer minValue = minValues.get(pageNumber);
+ ByteBuffer maxValue = maxValues.get(pageNumber);
+ PageValidator pageValidator = new PageValidator(
+ column.getPrimitiveType(),
+ rowGroupNumber, columnNumber, columnPath, pageNumber,
+ violations, columnReader,
+ minValue,
+ maxValue,
+ prevMinValue,
+ prevMaxValue,
+ boundaryOrder,
+ nullCounts.get(pageNumber),
+ isNullPage);
+ if (!isNullPage) {
+ prevMinValue = minValue;
+ prevMaxValue = maxValue;
+ }
+ long lastRowNumberInPage = offsetIndex.getLastRowIndex(pageNumber, rowGroup.getRowCount());
+ while (rowNumber <= lastRowNumberInPage) {
+ pageValidator.validateValuesBelongingToRow();
+ ++rowNumber;
+ }
+ pageValidator.finishPage();
+ }
+ }
+ rowGroup = reader.readNextRowGroup();
+ rowGroupNumber++;
+ }
+ }
+ return violations;
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 2be7ffe..99643ed 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -18,7 +18,6 @@
*/
package org.apache.parquet.hadoop;
-import org.apache.parquet.Strings;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
@@ -31,7 +30,10 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
@@ -48,12 +50,23 @@
private final ParquetFileReader reader;
private final Map<String, ColumnChunkMetaData> columns;
- private final Map<String, DictionaryPage> cache = new HashMap<String, DictionaryPage>();
+ private final Map<String, Optional<DictionaryPage>> dictionaryPageCache;
private ColumnChunkPageReadStore rowGroup = null;
+ /**
+ * Instantiate a new DictionaryPageReader.
+ *
+ * @param reader The target ParquetFileReader
+ * @param block The target BlockMetaData
+ *
+ * @throws NullPointerException if {@code reader} or {@code block} is
+ * {@code null}
+ */
DictionaryPageReader(ParquetFileReader reader, BlockMetaData block) {
- this.reader = reader;
- this.columns = new HashMap<String, ColumnChunkMetaData>();
+ this.reader = Objects.requireNonNull(reader);
+ this.columns = new HashMap<>();
+ this.dictionaryPageCache = new ConcurrentHashMap<>();
+
for (ColumnChunkMetaData column : block.getColumns()) {
columns.put(column.getPath().toDotString(), column);
}
@@ -78,47 +91,32 @@
return rowGroup.readDictionaryPage(descriptor);
}
- String dotPath = Strings.join(descriptor.getPath(), ".");
+ String dotPath = String.join(".", descriptor.getPath());
ColumnChunkMetaData column = columns.get(dotPath);
if (column == null) {
throw new ParquetDecodingException(
- "Cannot load dictionary, unknown column: " + dotPath);
+ "Failed to load dictionary, unknown column: " + dotPath);
}
- if (cache.containsKey(dotPath)) {
- return cache.get(dotPath);
- }
+ return dictionaryPageCache.computeIfAbsent(dotPath, key -> {
+ try {
+ final DictionaryPage dict =
+ hasDictionaryPage(column) ? reader.readDictionary(column) : null;
- try {
- synchronized (cache) {
- // check the cache again in case this thread waited on another reading the same page
- if (!cache.containsKey(dotPath)) {
- DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null;
- // copy the dictionary to ensure it can be reused if it is returned
- // more than once. this can happen when a DictionaryFilter has two or
- // more predicates for the same column.
- cache.put(dotPath, reusableCopy(dict));
- }
+ // Copy the dictionary to ensure it can be reused if it is returned
+ // more than once. This can happen when a DictionaryFilter has two or
+ // more predicates for the same column. Cache misses as well.
+ return (dict != null) ? Optional.of(reusableCopy(dict)) : Optional.empty();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read dictionary", e);
}
-
- return cache.get(dotPath);
- } catch (IOException e) {
- throw new ParquetDecodingException(
- "Failed to read dictionary", e);
- }
+ }).orElse(null);
}
- private static DictionaryPage reusableCopy(DictionaryPage dict) {
- if (dict == null) {
- return null;
- }
- try {
- return new DictionaryPage(
- BytesInput.from(dict.getBytes().toByteArray()),
- dict.getDictionarySize(), dict.getEncoding());
- } catch (IOException e) {
- throw new ParquetDecodingException("Cannot read dictionary", e);
- }
+ private static DictionaryPage reusableCopy(DictionaryPage dict)
+ throws IOException {
+ return new DictionaryPage(BytesInput.from(dict.getBytes().toByteArray()),
+ dict.getDictionarySize(), dict.getEncoding());
}
private boolean hasDictionaryPage(ColumnChunkMetaData column) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index d201fc8..007c374 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -65,10 +65,8 @@
tempClass = Class.forName("org.apache.hadoop.io.compress.DirectDecompressionCodec");
tempCreateMethod = tempClass.getMethod("createDirectDecompressor");
tempDecompressMethod = tempClass.getMethod("decompress", ByteBuffer.class, ByteBuffer.class);
- } catch (ClassNotFoundException e) {
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
// do nothing, the class will just be assigned null
- } catch (NoSuchMethodException e) {
- // do nothing, the method will just be assigned null
}
DIRECT_DECOMPRESSION_CODEC_CLASS = tempClass;
CREATE_DIRECT_DECOMPRESSOR_METHOD = tempCreateMethod;
@@ -212,9 +210,7 @@
output.clear();
try {
DECOMPRESS_METHOD.invoke(decompressor, (ByteBuffer) input.limit(compressedSize), (ByteBuffer) output.limit(uncompressedSize));
- } catch (IllegalAccessException e) {
- throw new DirectCodecPool.ParquetCompressionCodecException(e);
- } catch (InvocationTargetException e) {
+ } catch (IllegalAccessException | InvocationTargetException e) {
throw new DirectCodecPool.ParquetCompressionCodecException(e);
}
output.position(uncompressedSize);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
index dc5c31d..d6fabb2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
@@ -92,9 +92,7 @@
* @param writer the writer that has been closed
*/
synchronized void removeWriter(InternalParquetRecordWriter writer) {
- if (writerList.containsKey(writer)) {
- writerList.remove(writer);
- }
+ writerList.remove(writer);
if (!writerList.isEmpty()) {
updateAllocation();
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 4cd846c..52cacb0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -30,6 +30,7 @@
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
import java.io.Closeable;
+import java.io.InputStream;
import java.io.IOException;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
@@ -163,30 +164,27 @@
// read corresponding summary files if they exist
List<Callable<Map<Path, Footer>>> summaries = new ArrayList<Callable<Map<Path, Footer>>>();
for (final Path path : parents) {
- summaries.add(new Callable<Map<Path, Footer>>() {
- @Override
- public Map<Path, Footer> call() throws Exception {
- ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
- if (mergedMetadata != null) {
- final List<Footer> footers;
- if (skipRowGroups) {
- footers = new ArrayList<Footer>();
- for (FileStatus f : partFiles) {
- footers.add(new Footer(f.getPath(), mergedMetadata));
- }
- } else {
- footers = footersFromSummaryFile(path, mergedMetadata);
+ summaries.add(() -> {
+ ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
+ if (mergedMetadata != null) {
+ final List<Footer> footers;
+ if (skipRowGroups) {
+ footers = new ArrayList<Footer>();
+ for (FileStatus f : partFiles) {
+ footers.add(new Footer(f.getPath(), mergedMetadata));
}
- Map<Path, Footer> map = new HashMap<Path, Footer>();
- for (Footer footer : footers) {
- // the folder may have been moved
- footer = new Footer(new Path(path, footer.getFile().getName()), footer.getParquetMetadata());
- map.put(footer.getFile(), footer);
- }
- return map;
} else {
- return Collections.emptyMap();
+ footers = footersFromSummaryFile(path, mergedMetadata);
}
+ Map<Path, Footer> map = new HashMap<Path, Footer>();
+ for (Footer footer : footers) {
+ // the folder may have been moved
+ footer = new Footer(new Path(path, footer.getFile().getName()), footer.getParquetMetadata());
+ map.put(footer.getFile(), footer);
+ }
+ return map;
+ } else {
+ return Collections.emptyMap();
}
});
}
@@ -271,14 +269,11 @@
public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
for (final FileStatus currentFile : partFiles) {
- footers.add(new Callable<Footer>() {
- @Override
- public Footer call() throws Exception {
- try {
- return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
- } catch (IOException e) {
- throw new IOException("Could not read footer for file " + currentFile, e);
- }
+ footers.add(() -> {
+ try {
+ return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
+ } catch (IOException e) {
+ throw new IOException("Could not read footer for file " + currentFile, e);
}
});
}
@@ -524,11 +519,10 @@
private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
long fileLen = file.getLength();
- String filePath = file.toString();
LOG.debug("File length {}", fileLen);
int FOOTER_LENGTH_SIZE = 4;
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
- throw new RuntimeException(filePath + " is not a Parquet file (too small length: " + fileLen + ")");
+ throw new RuntimeException(file.toString() + " is not a Parquet file (too small length: " + fileLen + ")");
}
long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
LOG.debug("reading footer index at {}", footerLengthIndex);
@@ -538,7 +532,7 @@
byte[] magic = new byte[MAGIC.length];
f.readFully(magic);
if (!Arrays.equals(MAGIC, magic)) {
- throw new RuntimeException(filePath + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
+ throw new RuntimeException(file.toString() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
}
long footerIndex = footerLengthIndex - footerLength;
LOG.debug("read footer length: {}, footer index: {}", footerLength, footerIndex);
@@ -546,7 +540,14 @@
throw new RuntimeException("corrupted file: the footer index is not within the file: " + footerIndex);
}
f.seek(footerIndex);
- return converter.readParquetMetadata(f, options.getMetadataFilter());
+ // Read all the footer bytes in one time to avoid multiple read operations,
+ // since it can be pretty time consuming for a single read operation in HDFS.
+ ByteBuffer footerBytesBuffer = ByteBuffer.allocate(footerLength);
+ f.readFully(footerBytesBuffer);
+ LOG.debug("Finished to read all footer bytes.");
+ footerBytesBuffer.flip();
+ InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer);
+ return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter());
}
/**
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 4dae4de..63ef7bc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Preconditions;
-import org.apache.parquet.Strings;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
@@ -87,7 +86,7 @@
public class ParquetFileWriter {
private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class);
- private static ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+ private final ParquetMetadataConverter metadataConverter;
public static final String PARQUET_METADATA_FILE = "_metadata";
public static final String MAGIC_STR = "PAR1";
@@ -266,8 +265,10 @@
throws IOException {
this(file, schema, mode, rowGroupSize, maxPaddingSize,
ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}
+
/**
* @param file OutputFile to create or overwrite
* @param schema the schema of the data
@@ -275,13 +276,14 @@
* @param rowGroupSize the row group size
* @param maxPaddingSize the maximum padding
* @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+ * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to
* @param pageWriteChecksumEnabled whether to write out page level checksums
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
- boolean pageWriteChecksumEnabled)
- throws IOException {
+ int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+ throws IOException {
TypeUtil.checkValidWriteSchema(schema);
this.schema = schema;
@@ -304,6 +306,8 @@
this.columnIndexTruncateLength = columnIndexTruncateLength;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+ this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
}
/**
@@ -330,6 +334,7 @@
this.columnIndexTruncateLength = Integer.MAX_VALUE;
this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+ this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
}
/**
* start the file
@@ -782,7 +787,7 @@
if (!dropColumns && !columnsToCopy.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Columns cannot be copied (missing from target schema): %s",
- Strings.join(columnsToCopy.keySet(), ", ")));
+ String.join(", ", columnsToCopy.keySet())));
}
// copy the data for all chunks
@@ -838,13 +843,7 @@
}
// Buffers for the copy function.
- private static final ThreadLocal<byte[]> COPY_BUFFER =
- new ThreadLocal<byte[]>() {
- @Override
- protected byte[] initialValue() {
- return new byte[8192];
- }
- };
+ private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);
/**
* Copy from a FS input stream to an output stream. Thread-safe
@@ -958,6 +957,7 @@
private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
long footerIndex = out.getPos();
+ ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
writeFileMetaData(parquetMetadata, out);
LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index fede210..62f21c8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -195,10 +195,9 @@
}
return unboundRecordFilter;
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate unbound record filter class", e);
- } catch (IllegalAccessException e) {
- throw new BadConfigurationException("could not instantiate unbound record filter class", e);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new BadConfigurationException(
+ "could not instantiate unbound record filter class", e);
}
}
@@ -312,9 +311,7 @@
Class<? extends ReadSupport<T>> readSupportClass){
try {
return readSupportClass.newInstance();
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate read support class", e);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
throw new BadConfigurationException("could not instantiate read support class", e);
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 4a72134..fd79d7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -149,6 +149,7 @@
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
+ public static final String STATISTICS_TRUNCATE_LENGTH = "parquet.statistics.truncate.length";
public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names";
public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
@@ -372,6 +373,18 @@
return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
}
+ public static void setStatisticsTruncateLength(JobContext jobContext, int length) {
+ setStatisticsTruncateLength(getConfiguration(jobContext), length);
+ }
+
+ private static void setStatisticsTruncateLength(Configuration conf, int length) {
+ conf.setInt(STATISTICS_TRUNCATE_LENGTH, length);
+ }
+
+ private static int getStatisticsTruncateLength(Configuration conf) {
+ return conf.getInt(STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ }
+
public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
}
@@ -461,14 +474,15 @@
.withPageSize(getPageSize(conf))
.withDictionaryPageSize(getDictionaryPageSize(conf))
.withDictionaryEncoding(getEnableDictionary(conf))
- .withBloomFilterColumnNames(getBloomFilterColumns(conf))
- .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
- .withBloomFilterColumnNdvs(getBloomFilterColumnExpectedNDVs(conf))
.withWriterVersion(getWriterVersion(conf))
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
+ .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
+ .withBloomFilterColumnNames(getBloomFilterColumns(conf))
+ .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
+ .withBloomFilterColumnNdvs(getBloomFilterColumnExpectedNDVs(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.build();
@@ -489,8 +503,9 @@
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
- LOG.info("Max Bloom filter size for a column is {}", props.getMaxBloomFilterBytes());
+ LOG.info("Truncate length for statistics min/max is: {}", props.getStatisticsTruncateLength());
LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumns());
+ LOG.info("Max Bloom filter size for a column is {}", props.getMaxBloomFilterBytes());
LOG.info("Bloom filter enabled column expected number of distinct values are: {}",
props.getBloomFilterColumnExpectedNDVs().values());
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
@@ -500,7 +515,7 @@
WriteContext init = writeSupport.init(conf);
ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
- props.getPageWriteChecksumEnabled());
+ props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled());
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
@@ -540,9 +555,7 @@
Class<?> writeSupportClass = getWriteSupportClass(configuration);
try {
return (WriteSupport<T>)checkNotNull(writeSupportClass, "writeSupportClass").newInstance();
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 638d4e7..0ebd7d3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -281,7 +281,8 @@
ParquetFileWriter fileWriter = new ParquetFileWriter(
file, schema, mode, rowGroupSize, maxPaddingSize,
- encodingProps.getColumnIndexTruncateLength(), encodingProps.getPageWriteChecksumEnabled());
+ encodingProps.getColumnIndexTruncateLength(), encodingProps.getStatisticsTruncateLength(),
+ encodingProps.getPageWriteChecksumEnabled());
fileWriter.start();
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java
index e04ea62..b73b873 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java
@@ -92,15 +92,11 @@
long t0 = System.currentTimeMillis();
Deque<Future<ParquetMetadata>> footers = new LinkedBlockingDeque<Future<ParquetMetadata>>();
for (final FileStatus currentFile : statuses) {
- footers.add(threadPool.submit(new Callable<ParquetMetadata>() {
- @Override
- public ParquetMetadata call() throws Exception {
- try {
- ParquetMetadata footer = ParquetFileReader.readFooter(configuration, currentFile, NO_FILTER);
- return footer;
- } catch (Exception e) {
- throw new ParquetDecodingException("could not read footer", e);
- }
+ footers.add(threadPool.submit(() -> {
+ try {
+ return ParquetFileReader.readFooter(configuration, currentFile, NO_FILTER);
+ } catch (Exception e) {
+ throw new ParquetDecodingException("could not read footer", e);
}
}));
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
index 82b1414..fed2999 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
@@ -108,5 +108,4 @@
}
}
}
-
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
index 8c7a33a..00d02a4 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
@@ -69,10 +69,6 @@
} else {
objectMapper.writeValue(stringWriter, parquetMetaData);
}
- } catch (JsonGenerationException e) {
- throw new RuntimeException(e);
- } catch (JsonMappingException e) {
- throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -87,10 +83,6 @@
public static ParquetMetadata fromJSON(String json) {
try {
return objectMapper.readValue(new StringReader(json), ParquetMetadata.class);
- } catch (JsonParseException e) {
- throw new RuntimeException(e);
- } catch (JsonMappingException e) {
- throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
index 1114ed3..7c5f93e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
@@ -206,11 +206,8 @@
try {
return (JobContext)
JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, jobId);
- } catch (InstantiationException e) {
- throw new IllegalArgumentException("Can't instantiate JobContext", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Can't instantiate JobContext", e);
- } catch (InvocationTargetException e) {
+ } catch (InstantiationException | IllegalAccessException
+ | InvocationTargetException e) {
throw new IllegalArgumentException("Can't instantiate JobContext", e);
}
}
@@ -228,12 +225,10 @@
try {
return (TaskAttemptContext)
TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, taskAttemptId);
- } catch (InstantiationException e) {
- throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
- } catch (InvocationTargetException e) {
- throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
+ } catch (InstantiationException | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't instantiate TaskAttemptContext",
+ e);
}
}
@@ -248,11 +243,8 @@
try {
return (Counter)
GENERIC_COUNTER_CONSTRUCTOR.newInstance(name, displayName, value);
- } catch (InstantiationException e) {
- throw new IllegalArgumentException("Can't instantiate Counter", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Can't instantiate Counter", e);
- } catch (InvocationTargetException e) {
+ } catch (InstantiationException | IllegalAccessException
+ | InvocationTargetException e) {
throw new IllegalArgumentException("Can't instantiate Counter", e);
}
}
@@ -267,9 +259,7 @@
public static Configuration getConfiguration(JobContext context) {
try {
return (Configuration) GET_CONFIGURATION_METHOD.invoke(context);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Can't invoke method", e);
- } catch (InvocationTargetException e) {
+ } catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalArgumentException("Can't invoke method", e);
}
}
@@ -314,9 +304,7 @@
private static Object invoke(Method method, Object obj, Object... args) {
try {
return method.invoke(obj, args);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
- } catch (InvocationTargetException e) {
+ } catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index c35e98f..37e285d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -54,10 +54,7 @@
byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
try {
return h2SeekableConstructor.newInstance(stream);
- } catch (InstantiationException e) {
- LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
- return new H1SeekableInputStream(stream);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
return new H1SeekableInputStream(stream);
} catch (InvocationTargetException e) {
@@ -72,9 +69,7 @@
private static Class<?> getReadableClass() {
try {
return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
- } catch (ClassNotFoundException e) {
- return null;
- } catch (NoClassDefFoundError e) {
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
return null;
}
}
@@ -84,9 +79,7 @@
try {
return (Class<SeekableInputStream>) Class.forName(
"org.apache.parquet.hadoop.util.H2SeekableInputStream");
- } catch (ClassNotFoundException e) {
- return null;
- } catch (NoClassDefFoundError e) {
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
return null;
}
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 65244f4..e510b60 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -53,6 +53,8 @@
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -65,9 +67,11 @@
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.UTF8;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.DoubleStatistics;
@@ -113,6 +117,11 @@
import com.google.common.collect.Lists;
public class TestParquetMetadataConverter {
+ private static SecureRandom random = new SecureRandom();
+ private static final String CHAR_LOWER = "abcdefghijklmnopqrstuvwxyz";
+ private static final String CHAR_UPPER = CHAR_LOWER.toUpperCase();
+ private static final String NUMBER = "0123456789";
+ private static final String DATA_FOR_RANDOM_STRING = CHAR_LOWER + CHAR_UPPER + NUMBER;
@Test
public void testPageHeader() throws IOException {
@@ -269,15 +278,15 @@
assertEquals(ConvertedType.TIMESTAMP_MILLIS, parquetMetadataConverter.convertToConvertedType(timestampType(true, MILLIS)));
assertEquals(ConvertedType.TIMESTAMP_MICROS, parquetMetadataConverter.convertToConvertedType(timestampType(true, MICROS)));
assertNull(parquetMetadataConverter.convertToConvertedType(timestampType(true, NANOS)));
- assertNull(parquetMetadataConverter.convertToConvertedType(timestampType(false, MILLIS)));
- assertNull(parquetMetadataConverter.convertToConvertedType(timestampType(false, MICROS)));
+ assertEquals(ConvertedType.TIMESTAMP_MILLIS, parquetMetadataConverter.convertToConvertedType(timestampType(false, MILLIS)));
+ assertEquals(ConvertedType.TIMESTAMP_MICROS, parquetMetadataConverter.convertToConvertedType(timestampType(false, MICROS)));
assertNull(parquetMetadataConverter.convertToConvertedType(timestampType(false, NANOS)));
assertEquals(ConvertedType.TIME_MILLIS, parquetMetadataConverter.convertToConvertedType(timeType(true, MILLIS)));
assertEquals(ConvertedType.TIME_MICROS, parquetMetadataConverter.convertToConvertedType(timeType(true, MICROS)));
assertNull(parquetMetadataConverter.convertToConvertedType(timeType(true, NANOS)));
- assertNull(parquetMetadataConverter.convertToConvertedType(timeType(false, MILLIS)));
- assertNull(parquetMetadataConverter.convertToConvertedType(timeType(false, MICROS)));
+ assertEquals(ConvertedType.TIME_MILLIS, parquetMetadataConverter.convertToConvertedType(timeType(false, MILLIS)));
+ assertEquals(ConvertedType.TIME_MICROS, parquetMetadataConverter.convertToConvertedType(timeType(false, MICROS)));
assertNull(parquetMetadataConverter.convertToConvertedType(timeType(false, NANOS)));
assertEquals(ConvertedType.DATE, parquetMetadataConverter.convertToConvertedType(dateType()));
@@ -581,6 +590,63 @@
}
@Test
+ public void testBinaryStatsWithTruncation() {
+ int defaultTruncLen = ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+ int[] validLengths = {1, 2, 16, 64, defaultTruncLen - 1};
+ for (int len : validLengths) {
+ testBinaryStatsWithTruncation(len, 60, 70);
+ testBinaryStatsWithTruncation(len, (int) ParquetMetadataConverter.MAX_STATS_SIZE, 190);
+ testBinaryStatsWithTruncation(len, 280, (int) ParquetMetadataConverter.MAX_STATS_SIZE);
+ testBinaryStatsWithTruncation(len, (int) ParquetMetadataConverter.MAX_STATS_SIZE, (int) ParquetMetadataConverter.MAX_STATS_SIZE);
+ }
+
+ int[] invalidLengths = {-1, 0, Integer.MAX_VALUE + 1};
+ for (int len : invalidLengths) {
+ try {
+ testBinaryStatsWithTruncation(len, 80, 20);
+ Assert.fail("Expected IllegalArgumentException but didn't happen");
+ } catch (IllegalArgumentException e) {
+ // expected, nothing to do
+ }
+ }
+ }
+
+ // The number of minLen and maxLen shouldn't matter because the comparision is controlled by prefix
+ private void testBinaryStatsWithTruncation(int truncateLen, int minLen, int maxLen) {
+ BinaryStatistics stats = new BinaryStatistics();
+ byte[] min = generateRandomString("a", minLen).getBytes();
+ byte[] max = generateRandomString("b", maxLen).getBytes();
+ stats.updateStats(Binary.fromConstantByteArray(min));
+ stats.updateStats(Binary.fromConstantByteArray(max));
+ ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(truncateLen);
+ org.apache.parquet.format.Statistics formatStats = metadataConverter.toParquetStatistics(stats);
+
+ if (minLen + maxLen >= ParquetMetadataConverter.MAX_STATS_SIZE) {
+ assertNull(formatStats.getMin_value());
+ assertNull(formatStats.getMax_value());
+ } else {
+ String minString = new String(min, Charset.forName("UTF-8"));
+ String minStatString = new String(formatStats.getMin_value(), Charset.forName("UTF-8"));
+ assertTrue(minStatString.compareTo(minString) <= 0);
+ String maxString = new String(max, Charset.forName("UTF-8"));
+ String maxStatString = new String(formatStats.getMax_value(), Charset.forName("UTF-8"));
+ assertTrue(maxStatString.compareTo(maxString) >= 0);
+ }
+ }
+
+ private static String generateRandomString(String prefix, int length) {
+ assertTrue(prefix.length() <= length);
+ StringBuilder sb = new StringBuilder(length);
+ sb.append(prefix);
+ for (int i = 0; i < length - prefix.length(); i++) {
+ int rndCharAt = random.nextInt(DATA_FOR_RANDOM_STRING.length());
+ char rndChar = DATA_FOR_RANDOM_STRING.charAt(rndCharAt);
+ sb.append(rndChar);
+ }
+ return sb.toString();
+ }
+
+ @Test
public void testIntegerStatsV1() {
testIntegerStats(StatsHelper.V1);
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
index 115a6c1..836ca6d 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
@@ -134,12 +134,7 @@
Assert.assertTrue("Pool should not hold 3 full row groups",
poolSize < (3 * rowGroupSize));
- Runnable callback = new Runnable() {
- @Override
- public void run() {
- counter++;
- }
- };
+ Runnable callback = () -> counter++;
// first-time registration should succeed
ParquetOutputFormat.getMemoryManager()
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 343b1fa..9b01bb5 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -162,16 +162,13 @@
file.delete();
TestUtils.assertThrows("Should reject a schema with an empty group",
- InvalidSchemaException.class, new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- ExampleParquetWriter.builder(new Path(file.toString()))
- .withType(Types.buildMessage()
- .addField(new GroupType(REQUIRED, "invalid_group"))
- .named("invalid_message"))
- .build();
- return null;
- }
+ InvalidSchemaException.class, (Callable<Void>) () -> {
+ ExampleParquetWriter.builder(new Path(file.toString()))
+ .withType(Types.buildMessage()
+ .addField(new GroupType(REQUIRED, "invalid_group"))
+ .named("invalid_message"))
+ .build();
+ return null;
});
Assert.assertFalse("Should not create a file when schema is rejected",
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
index ae37f63..bda5333 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
@@ -269,13 +269,10 @@
TestUtils.assertThrows("Should complain that id column is dropped",
IllegalArgumentException.class,
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- writer.appendRowGroups(incoming, footer.getBlocks(), false);
- return null;
- }
- });
+ (Callable<Void>) () -> {
+ writer.appendRowGroups(incoming, footer.getBlocks(), false);
+ return null;
+ });
}
@Test
@@ -293,13 +290,10 @@
TestUtils.assertThrows("Should complain that value column is missing",
IllegalArgumentException.class,
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- writer.appendFile(CONF, file1);
- return null;
- }
- });
+ (Callable<Void>) () -> {
+ writer.appendFile(CONF, file1);
+ return null;
+ });
}
private Path newTemp() throws IOException {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
index 0a38303..47645ac 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
@@ -48,7 +48,6 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.parquet.Strings;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.junit.Before;
import org.junit.Test;
@@ -307,7 +306,7 @@
sbFound.deleteCharAt(sbFound.length() - 1);
- assertEquals(Strings.join(expected, "\n"), sbFound.toString());
+ assertEquals(String.join("\n", expected), sbFound.toString());
}
@Test
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
index 68c9b3b..1b1e373 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -83,12 +83,9 @@
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- H2SeekableInputStream.readFully(reader, readBuffer);
- return null;
- }
+ EOFException.class, () -> {
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ return null;
});
// NOTE: This behavior differs from readFullyHeapBuffer because direct uses
@@ -257,12 +254,9 @@
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- H2SeekableInputStream.readFully(reader, readBuffer);
- return null;
- }
+ EOFException.class, () -> {
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ return null;
});
// NOTE: This behavior differs from readFullyHeapBuffer because direct uses
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
index 16db5cb..152f6ec 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
@@ -19,14 +19,20 @@
package org.apache.parquet.statistics;
-import org.apache.parquet.io.api.Binary;
import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
import java.util.Random;
+import java.util.function.Supplier;
+
+import org.apache.parquet.io.api.Binary;
public class RandomValues {
private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890";
- static abstract class RandomValueGenerator<T extends Comparable<T>> {
+ static abstract class RandomValueGenerator<T extends Comparable<T>> implements Supplier<T> {
private final Random random;
protected RandomValueGenerator(long seed) {
@@ -80,6 +86,11 @@
}
public abstract T nextValue();
+
+ @Override
+ public T get() {
+ return nextValue();
+ }
}
static abstract class RandomBinaryBase<T extends Comparable<T>> extends RandomValueGenerator<T> {
@@ -277,7 +288,6 @@
return asReusedBinary(nextValue().getBytes());
}
}
-
public static class BinaryGenerator extends RandomBinaryBase<Binary> {
private static final int MAX_STRING_LENGTH = 16;
public BinaryGenerator(long seed) {
@@ -339,4 +349,34 @@
public T minimum() { return this.minimum; }
public T maximum() { return this.maximum; }
}
+
+ public static Supplier<Binary> binaryStringGenerator(long seed) {
+ final StringGenerator generator = new StringGenerator(seed);
+ return generator::nextBinaryValue;
+ }
+
+ public static Supplier<Binary> int96Generator(long seed) {
+ final Int96Generator generator = new Int96Generator(seed);
+ return generator::nextBinaryValue;
+ }
+
+ public static <T extends Comparable<T>> Supplier<T> wrapSorted(Supplier<T> supplier,
+ int recordCount, boolean ascending) {
+ return wrapSorted(supplier, recordCount, ascending, (a, b) -> a.compareTo(b));
+ }
+
+ public static <T> Supplier<T> wrapSorted(Supplier<T> supplier, int recordCount, boolean ascending,
+ Comparator<T> cmp) {
+ List<T> values = new ArrayList<>(recordCount);
+ for (int i = 0; i < recordCount; ++i) {
+ values.add(supplier.get());
+ }
+ if (ascending) {
+ values.sort(cmp);
+ } else {
+ values.sort((a, b) -> cmp.compare(b, a));
+ }
+ final Iterator<T> it = values.iterator();
+ return it::next;
+ }
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java
new file mode 100644
index 0000000..aac8e43
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.statistics;
+
+import static org.apache.parquet.schema.LogicalTypeAnnotation.bsonType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.jsonType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ColumnIndexValidator;
+import org.apache.parquet.hadoop.ColumnIndexValidator.ContractViolation;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+@RunWith(Parameterized.class)
+public class TestColumnIndexes {
+ private static final int MAX_TOTAL_ROWS = 100_000;
+ private static final MessageType SCHEMA = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT32, "i32"),
+ new PrimitiveType(OPTIONAL, INT64, "i64"),
+ new PrimitiveType(OPTIONAL, INT96, "i96"),
+ new PrimitiveType(OPTIONAL, FLOAT, "sngl"),
+ new PrimitiveType(OPTIONAL, DOUBLE, "dbl"),
+ new PrimitiveType(OPTIONAL, BINARY, "strings"),
+ new PrimitiveType(OPTIONAL, BINARY, "binary"),
+ new PrimitiveType(OPTIONAL, FIXED_LEN_BYTE_ARRAY, 17, "fixed-binary"),
+ new PrimitiveType(REQUIRED, INT32, "unconstrained-i32"),
+ new PrimitiveType(REQUIRED, INT64, "unconstrained-i64"),
+ new PrimitiveType(REQUIRED, FLOAT, "unconstrained-sngl"),
+ new PrimitiveType(REQUIRED, DOUBLE, "unconstrained-dbl"),
+ Types.optional(INT32).as(intType(8, true)).named("int8"),
+ Types.optional(INT32).as(intType(8, false)).named("uint8"),
+ Types.optional(INT32).as(intType(16, true)).named("int16"),
+ Types.optional(INT32).as(intType(16, false)).named("uint16"),
+ Types.optional(INT32).as(intType(32, true)).named("int32"),
+ Types.optional(INT32).as(intType(32, false)).named("uint32"),
+ Types.optional(INT64).as(intType(64, true)).named("int64"),
+ Types.optional(INT64).as(intType(64, false)).named("uint64"),
+ Types.optional(INT32).as(decimalType(2, 9)).named("decimal-int32"),
+ Types.optional(INT64).as(decimalType(4, 18)).named("decimal-int64"),
+ Types.optional(FIXED_LEN_BYTE_ARRAY).length(19).as(decimalType(25, 45)).named("decimal-fixed"),
+ Types.optional(BINARY).as(decimalType(20, 38)).named("decimal-binary"),
+ Types.optional(BINARY).as(stringType()).named("utf8"),
+ Types.optional(BINARY).as(enumType()).named("enum"),
+ Types.optional(BINARY).as(jsonType()).named("json"),
+ Types.optional(BINARY).as(bsonType()).named("bson"),
+ Types.optional(INT32).as(dateType()).named("date"),
+ Types.optional(INT32).as(timeType(true, TimeUnit.MILLIS)).named("time-millis"),
+ Types.optional(INT64).as(timeType(false, TimeUnit.MICROS)).named("time-micros"),
+ Types.optional(INT64).as(timestampType(true, TimeUnit.MILLIS)).named("timestamp-millis"),
+ Types.optional(INT64).as(timestampType(false, TimeUnit.NANOS)).named("timestamp-nanos"),
+ Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(OriginalType.INTERVAL).named("interval"),
+ Types.optional(BINARY).as(stringType()).named("always-null"));
+
+ private static List<Supplier<?>> buildGenerators(int recordCount, Random random) {
+ int fieldIndex = 0;
+ return Arrays.<Supplier<?>>asList(
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.int96Generator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.FloatGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.DoubleGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 17), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedFloatGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedDoubleGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong(), Byte.MIN_VALUE, Byte.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UIntGenerator(random.nextLong(), Byte.MIN_VALUE, Byte.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong(), Short.MIN_VALUE, Short.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UIntGenerator(random.nextLong(), Short.MIN_VALUE, Short.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 19), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 12), random, recordCount, fieldIndex++),
+ null);
+ }
+
+ private static <T> Supplier<T> sortedOrRandom(Supplier<T> generator, Random random, int recordCount, int fieldIndex) {
+ Comparator<T> cmp = SCHEMA.getType(fieldIndex).asPrimitiveType().comparator();
+
+ // 20% chance for ascending, 20% for descending, 60% to remain random
+ switch (random.nextInt(5)) {
+ case 1:
+ return RandomValues.wrapSorted(generator, recordCount, true, cmp);
+ case 2:
+ return RandomValues.wrapSorted(generator, recordCount, false, cmp);
+ default:
+ return generator;
+ }
+ }
+
+ public static class WriteContext {
+ private static final GroupFactory FACTORY = new SimpleGroupFactory(SCHEMA);
+ private final long seed;
+ private final int pageRowCountLimit;
+ private final int columnIndexTruncateLength;
+
+ private WriteContext(long seed, int pageRowCountLimit, int columnIndexTruncateLength) {
+ this.seed = seed;
+ this.pageRowCountLimit = pageRowCountLimit;
+ this.columnIndexTruncateLength = columnIndexTruncateLength;
+ }
+
+ public Path write(Path directory) throws IOException {
+ Path file = new Path(directory, "testColumnIndexes_" + this + ".parquet");
+ Random random = new Random(seed);
+ int recordCount = random.nextInt(MAX_TOTAL_ROWS) + 1;
+ List<Supplier<?>> generators = buildGenerators(recordCount, random);
+ Configuration conf = new Configuration();
+ ParquetOutputFormat.setColumnIndexTruncateLength(conf, columnIndexTruncateLength);
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+ .withType(SCHEMA)
+ .withPageRowCountLimit(pageRowCountLimit)
+ .withConf(conf)
+ .build()) {
+ for (int i = 0; i < recordCount; i++) {
+ writer.write(createGroup(generators, random));
+ }
+ }
+ return file;
+ }
+
+ private Group createGroup(List<Supplier<?>> generators, Random random) {
+ Group group = FACTORY.newGroup();
+ for (int column = 0, columnCnt = SCHEMA.getFieldCount(); column < columnCnt; ++column) {
+ Type type = SCHEMA.getType(column);
+ Supplier<?> generator = generators.get(column);
+ // 2% chance of null value for an optional column
+ if (generator == null || (type.isRepetition(OPTIONAL) && random.nextInt(50) == 0)) {
+ continue;
+ }
+ switch (type.asPrimitiveType().getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ group.append(type.getName(), (Binary) generator.get());
+ break;
+ case INT32:
+ group.append(type.getName(), (Integer) generator.get());
+ break;
+ case INT64:
+ group.append(type.getName(), (Long) generator.get());
+ break;
+ case FLOAT:
+ group.append(type.getName(), (Float) generator.get());
+ break;
+ case DOUBLE:
+ group.append(type.getName(), (Double) generator.get());
+ break;
+ case BOOLEAN:
+ group.append(type.getName(), (Boolean) generator.get());
+ break;
+ }
+ }
+ return group;
+ }
+
+ @Override
+ public String toString() {
+ return "seed=" + seed
+ + ",pageRowCountLimit=" + pageRowCountLimit
+ + ",columnIndexTruncateLength=" + columnIndexTruncateLength;
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexes.class);
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ @Parameters
+ public static Collection<WriteContext> getContexts() {
+ return Arrays.asList(
+ new WriteContext(System.nanoTime(), 1000, 8),
+ new WriteContext(System.nanoTime(), 20000, 64),
+ new WriteContext(System.nanoTime(), 50000, 10));
+ }
+
+ public TestColumnIndexes(WriteContext context) {
+ this.context = context;
+ }
+
+ private final WriteContext context;
+
+ @Test
+ public void testColumnIndexes() throws IOException {
+ LOGGER.info("Starting test with context: {}", context);
+
+ Path file = null;
+ try {
+ file = context.write(new Path(tmp.getRoot().getAbsolutePath()));
+ LOGGER.info("Parquet file \"{}\" is successfully created for the context: {}", file, context);
+
+ List<ContractViolation> violations = ColumnIndexValidator
+ .checkContractViolations(HadoopInputFile.fromPath(file, new Configuration()));
+ assertTrue(violations.toString(), violations.isEmpty());
+ } finally {
+ if (file != null) {
+ file.getFileSystem(new Configuration()).delete(file, false);
+ }
+ }
+ }
+}
diff --git a/parquet-jackson/pom.xml b/parquet-jackson/pom.xml
index a44c67c..3e8e209 100644
--- a/parquet-jackson/pom.xml
+++ b/parquet-jackson/pom.xml
@@ -41,7 +41,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <version>${jackson-databind.version}</version>
</dependency>
</dependencies>
@@ -66,7 +66,7 @@
</goals>
<configuration combine.self="override">
<minimizeJar>false</minimizeJar>
- <createSourcesJar>${shade.createSourcesJar}</createSourcesJar>
+ <createSourcesJar>${shade.createSourcesJar}</createSourcesJar>
<artifactSet>
<includes>
<include>${jackson.groupId}:*</include>
@@ -91,7 +91,7 @@
</executions>
</plugin>
</plugins>
- </build>
+ </build>
<profiles>
<profile>
<id>sonatype-oss-release</id>
diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml
index b5f7ec2..a8ab606 100644
--- a/parquet-pig/pom.xml
+++ b/parquet-pig/pom.xml
@@ -77,7 +77,7 @@
<dependency>
<groupId>${jackson.groupId}</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
index 566dbee..e13bc64 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
@@ -256,7 +256,7 @@
return null;
}
} catch (InterruptedException e) {
- Thread.interrupted();
+ Thread.currentThread().interrupt();
throw new ParquetDecodingException("Interrupted", e);
}
}
@@ -331,7 +331,8 @@
length += split.getLength();
}
} catch (InterruptedException e) {
- LOG.warn("Interrupted: ", e);
+ LOG.warn("Interrupted", e);
+ Thread.currentThread().interrupt();
return null;
}
ResourceStatistics stats = new ResourceStatistics();
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
index 2cf676c..68a7d7d 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
@@ -97,9 +97,7 @@
recordConsumer.startMessage();
writeTuple(rootSchema, rootPigSchema, t);
recordConsumer.endMessage();
- } catch (ExecException e) {
- throw new RuntimeException(e);
- } catch (FrontendException e) {
+ } catch (ExecException | FrontendException e) {
throw new RuntimeException(e);
}
}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
index 48bb753..7b7fadb 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
@@ -142,10 +142,9 @@
default:
throw new TupleConversionException("unsupported pig type: " + pigField);
}
- } catch (FrontendException e) {
- throw new TupleConversionException("error while preparing converter for:\n" + pigField + "\n" + type, e);
- } catch (RuntimeException e) {
- throw new TupleConversionException("error while preparing converter for:\n" + pigField + "\n" + type, e);
+ } catch (FrontendException | RuntimeException e) {
+ throw new TupleConversionException(
+ "error while preparing converter for:\n" + pigField + "\n" + type, e);
}
}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/SummaryData.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/SummaryData.java
index e392740..2d97dbb 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/SummaryData.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/SummaryData.java
@@ -58,10 +58,6 @@
StringWriter stringWriter = new StringWriter();
try {
mapper.writeValue(stringWriter, summaryData);
- } catch (JsonGenerationException e) {
- throw new RuntimeException(e);
- } catch (JsonMappingException e) {
- throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -71,10 +67,6 @@
public static <T extends SummaryData> T fromJSON(String json, Class<T> clazz) {
try {
return objectMapper.readValue(new StringReader(json), clazz);
- } catch (JsonParseException e) {
- throw new RuntimeException(e);
- } catch (JsonMappingException e) {
- throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml
index a25a3b3..6870762 100644
--- a/parquet-protobuf/pom.xml
+++ b/parquet-protobuf/pom.xml
@@ -69,6 +69,13 @@
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-core</artifactId>
<version>${elephant-bird.version}</version>
+ <exclusions>
+ <!-- hadoop-lzo is not required for parquet build/tests and there are issues downloading it -->
+ <exclusion>
+ <groupId>com.hadoop.gplcompression</groupId>
+ <artifactId>hadoop-lzo</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
index db5be14..b8c2545 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
@@ -54,7 +54,7 @@
}
/**
- * Instanciate a schema converter to get the parquet schema corresponding to protobuf classes.
+ * Instantiate a schema converter to get the parquet schema corresponding to protobuf classes.
* @param parquetSpecsCompliant If set to false, the parquet schema generated will be using the old
* schema style (prior to PARQUET-968) to provide backward-compatibility
* but which does not use LIST and MAP wrappers around collections as required
diff --git a/parquet-scrooge/pom.xml b/parquet-scrooge/pom.xml
index 6da3c0a..3040888 100644
--- a/parquet-scrooge/pom.xml
+++ b/parquet-scrooge/pom.xml
@@ -38,7 +38,7 @@
<url>https://conjars.org/repo</url>
</repository>
</repositories>
-
+
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
@@ -88,7 +88,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scrooge-core_${scala.binary.version}</artifactId>
- <version>4.7.0</version>
+ <version>${scrooge.verion}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
@@ -193,7 +193,7 @@
<plugin>
<groupId>com.twitter</groupId>
<artifactId>scrooge-maven-plugin</artifactId>
- <version>3.17.0</version>
+ <version>${scrooge.verion}</version>
<configuration>
<outputDirectory>${project.build.directory}/generated-test-sources/scrooge</outputDirectory>
<thriftNamespaceMappings>
@@ -202,20 +202,18 @@
<to>org.apache.parquet.scrooge.test</to>
</thriftNamespaceMapping>
<thriftNamespaceMapping>
- <from>org.apache.parquet.thrift.test.compat</from>
- <to>org.apache.parquet.scrooge.test.compat</to>
+ <from>org.apache.parquet.thrift.test.compat</from>
+ <to>org.apache.parquet.scrooge.test.compat</to>
+ </thriftNamespaceMapping>
+ <thriftNamespaceMapping>
+ <from>org.apache.parquet.thrift.test.binary</from>
+ <to>org.apache.parquet.scrooge.test.binary</to>
</thriftNamespaceMapping>
</thriftNamespaceMappings>
+ <buildExtractedThrift>false</buildExtractedThrift>
</configuration>
<executions>
<execution>
- <id>thrift-sources</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
<id>thrift-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
diff --git a/parquet-scrooge/src/test/thrift/test.thrift b/parquet-scrooge/src/test/thrift/test.thrift
index 6db7dc1..a80dbd9 100644
--- a/parquet-scrooge/src/test/thrift/test.thrift
+++ b/parquet-scrooge/src/test/thrift/test.thrift
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,7 @@
}
struct Address {
- 1: required string street,
+ 1: string street,
2: required string zip
}
@@ -38,7 +38,7 @@
}
struct Phone {
- 1: required string mobile
+ 1: string mobile
2: required string work
}
@@ -284,7 +284,7 @@
3: ABool aNewBool
}
-struct StructWithUnionV2 {
+struct StructWithUnionV2 {
1: required string name,
2: required UnionV2 aUnion
}
@@ -295,7 +295,7 @@
3: optional ABool aNewBool
}
-struct StructWithAStructThatLooksLikeUnionV2 {
+struct StructWithAStructThatLooksLikeUnionV2 {
1: required string name,
2: required AStructThatLooksLikeUnionV2 aNotQuiteUnion
}
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index 13b9da5..ba843ed 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -63,6 +63,13 @@
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-core</artifactId>
<version>${elephant-bird.version}</version>
+ <exclusions>
+ <!-- hadoop-lzo is not required for parquet build/tests and there are issues downloading it -->
+ <exclusion>
+ <groupId>com.hadoop.gplcompression</groupId>
+ <artifactId>hadoop-lzo</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.twitter.elephantbird</groupId>
@@ -82,7 +89,7 @@
<dependency>
<groupId>${jackson.groupId}</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
index 6f5d50d..78fe2d7 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
@@ -106,9 +106,7 @@
if (this.protocolFactory == null) {
try {
this.protocolFactory = getTProtocolFactoryClass(configuration).newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
index 1a3a2c0..9b6881e 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -264,18 +264,14 @@
Constructor<ThriftRecordConverter<T>> constructor =
converterClass.getConstructor(Class.class, MessageType.class, StructType.class, Configuration.class);
return constructor.newInstance(thriftClass, requestedSchema, descriptor, conf);
- } catch (IllegalAccessException e) {
+ } catch (IllegalAccessException | NoSuchMethodException e) {
// try the other constructor pattern
- } catch (NoSuchMethodException e) {
- // try to find the other constructor pattern
}
Constructor<ThriftRecordConverter<T>> constructor =
converterClass.getConstructor(Class.class, MessageType.class, StructType.class);
return constructor.newInstance(thriftClass, requestedSchema, descriptor);
- } catch (InstantiationException e) {
- throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e);
- } catch (InvocationTargetException e) {
+ } catch (InstantiationException | InvocationTargetException e) {
throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Cannot access constructor for Thrift converter class: " + converterClassName, e);
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
index 54aac6d..2e76642 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
@@ -19,7 +19,7 @@
package org.apache.parquet.thrift;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import org.apache.thrift.TException;
@@ -140,7 +140,7 @@
*/
@Override
public void readOne(TProtocol in, TProtocol out) throws TException {
- List<Action> buffer = new LinkedList<Action>();
+ List<Action> buffer = new ArrayList<Action>(1);
try{
boolean hasFieldsIgnored = readOneStruct(in, buffer, thriftType);
if (hasFieldsIgnored) {
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
index b72c85c..c85b13b 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
@@ -20,9 +20,9 @@
import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
-import java.util.LinkedList;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TField;
@@ -43,7 +43,7 @@
super("read");
}
- private Deque<TProtocol> events = new LinkedList<TProtocol>();
+ private Deque<TProtocol> events = new ArrayDeque<TProtocol>();
public void add(TProtocol p) {
events.addLast(p);
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index 7bfcdb1..4cd3cf5 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -314,7 +314,7 @@
@Override
public ConvertedField visit(I16Type i16Type, State state) {
- return visitPrimitiveType(INT32, state);
+ return visitPrimitiveType(INT32, LogicalTypeAnnotation.intType(16, true),state);
}
@Override
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/CompatibilityChecker.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/CompatibilityChecker.java
index f5e215c..7091711 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/CompatibilityChecker.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/struct/CompatibilityChecker.java
@@ -34,7 +34,6 @@
import org.apache.parquet.thrift.struct.ThriftType.I32Type;
import org.apache.parquet.thrift.struct.ThriftType.I64Type;
import org.apache.parquet.thrift.struct.ThriftType.StringType;
-import org.apache.parquet.Strings;
/**
* A checker for thrift struct to enforce its backward compatibility, returns compatibility report based on following rules:
@@ -81,8 +80,7 @@
}
public String prettyMessages() {
-
- return Strings.join(messages, "\n");
+ return String.join("\n", messages);
}
@Override
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java
index 255515a..06c8cef 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftRecordConverter.java
@@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Arrays;
-import org.apache.parquet.Strings;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.thrift.ThriftRecordConverter.FieldEnumConverter;
@@ -78,10 +77,10 @@
@Test
public void constructorDoesNotRequireStructOrUnionTypeMeta() throws Exception {
- String jsonWithNoStructOrUnionMeta = Strings.join(
+ String jsonWithNoStructOrUnionMeta = String.join("\n",
Files.readAllLines(
new File("src/test/resources/org/apache/parquet/thrift/StructWithUnionV1NoStructOrUnionMeta.json").toPath(),
- StandardCharsets.UTF_8), "\n");
+ StandardCharsets.UTF_8));
StructType noStructOrUnionMeta = (StructType) ThriftType.fromJSON(jsonWithNoStructOrUnionMeta);
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftSchemaConverter.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftSchemaConverter.java
index 2cde15b..5a7b209 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftSchemaConverter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftSchemaConverter.java
@@ -28,6 +28,7 @@
import org.apache.parquet.thrift.struct.ThriftType.StructType;
import org.apache.parquet.thrift.test.compat.MapStructV2;
import org.apache.parquet.thrift.test.compat.SetStructV2;
+import org.apache.parquet.thrift.test.TestLogicalType;
import org.apache.thrift.TBase;
import org.junit.Test;
@@ -337,4 +338,16 @@
final ThriftType fromJSON = StructType.fromJSON(json);
assertEquals(json, fromJSON.toJSON());
}
+
+ @Test
+ public void testLogicalTypeConvertion() throws Exception {
+ String expected =
+ "message ParquetSchema {\n" +
+ " required int32 test_i16 (INTEGER(16,true)) = 1;" +
+ "}";
+ ThriftSchemaConverter schemaConverter = new ThriftSchemaConverter();
+ final MessageType converted = schemaConverter.convert(TestLogicalType.class);
+ assertEquals(MessageTypeParser.parseMessageType(expected), converted);
+ }
+
}
diff --git a/parquet-thrift/src/test/thrift/test.thrift b/parquet-thrift/src/test/thrift/test.thrift
index d25e540..9b3ac85 100644
--- a/parquet-thrift/src/test/thrift/test.thrift
+++ b/parquet-thrift/src/test/thrift/test.thrift
@@ -35,7 +35,7 @@
struct Phone {
1: string mobile
- 2: string work
+ 2: required string work
}
struct TestPerson {
@@ -95,3 +95,7 @@
3: required Phone extraPhone,
6: required Phone phone
}
+
+struct TestLogicalType {
+ 1: required i16 test_i16
+}
diff --git a/parquet-tools/README.md b/parquet-tools/README.md
index 9e3deb4..22fdb57 100644
--- a/parquet-tools/README.md
+++ b/parquet-tools/README.md
@@ -43,7 +43,7 @@
The resulting jar is target/parquet-tools-<Version>.jar, you can copy it to the place where you
want to use it
-#Run from hadoop
+### Run from hadoop
See Commands Usage for command to use
@@ -51,7 +51,7 @@
hadoop jar ./parquet-tools-<VERSION>.jar <command> my_parquet_file.lzo.parquet
```
-#Run locally
+### Run locally
See Commands Usage for command to use
diff --git a/parquet-tools/pom.xml b/parquet-tools/pom.xml
index 196f8c2..3da1c65 100644
--- a/parquet-tools/pom.xml
+++ b/parquet-tools/pom.xml
@@ -81,7 +81,6 @@
<build>
<plugins>
- <!--We do not turn on semver checking for parquet-tools, since it's not considered as an API-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
index 0bade37..3e68c86 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
@@ -188,7 +188,7 @@
if (container != null) {
cpath.add(type.getName());
- String[] paths = cpath.toArray(new String[cpath.size()]);
+ String[] paths = cpath.toArray(new String[0]);
cpath.remove(cpath.size() - 1);
ColumnDescriptor desc = container.getColumnDescription(paths);
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java b/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
index 206028a..b5b37aa 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
@@ -210,7 +210,7 @@
if (container != null) {
cpath.add(type.getName());
- String[] paths = cpath.toArray(new String[cpath.size()]);
+ String[] paths = cpath.toArray(new String[0]);
cpath.remove(cpath.size() - 1);
ColumnDescriptor desc = container.getColumnDescription(paths);
diff --git a/pom.xml b/pom.xml
index 6ea28ba..77e942a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,20 +75,22 @@
<maven-jar-plugin.version>2.4</maven-jar-plugin.version>
<jackson.groupId>com.fasterxml.jackson.core</jackson.groupId>
<jackson.package>com.fasterxml.jackson</jackson.package>
- <jackson.version>2.9.9</jackson.version>
+ <jackson.version>2.9.10</jackson.version>
+ <jackson-databind.version>2.9.10</jackson-databind.version>
+ <japicmp.version>0.14.2</japicmp.version>
<shade.prefix>shaded.parquet</shade.prefix>
<hadoop.version>2.7.3</hadoop.version>
- <hadoop1.version>1.2.1</hadoop1.version>
<cascading.version>2.7.1</cascading.version>
<cascading3.version>3.1.2</cascading3.version>
<parquet.format.version>2.7.0</parquet.format.version>
- <previous.version>1.7.0</previous.version>
+ <previous.version>1.11.0</previous.version>
<thrift.executable>thrift</thrift.executable>
<format.thrift.executable>thrift</format.thrift.executable>
- <scala.version>2.10.6</scala.version>
+ <scala.version>2.12.8</scala.version>
<!-- scala.binary.version is used for projects that fetch dependencies that are in scala -->
- <scala.binary.version>2.10</scala.binary.version>
+ <scala.binary.version>2.12</scala.binary.version>
<scala.maven.test.skip>false</scala.maven.test.skip>
+ <scrooge.verion>19.10.0</scrooge.verion>
<pig.version>0.16.0</pig.version>
<pig.classifier>h2</pig.classifier>
<thrift-maven-plugin.version>0.10.0</thrift-maven-plugin.version>
@@ -97,14 +99,16 @@
<fastutil.version>8.2.3</fastutil.version>
<semver.api.version>0.9.33</semver.api.version>
<slf4j.version>1.7.22</slf4j.version>
- <avro.version>1.9.0</avro.version>
+ <avro.version>1.9.1</avro.version>
<guava.version>27.0.1-jre</guava.version>
<brotli-codec.version>0.1.1</brotli-codec.version>
<mockito.version>1.10.19</mockito.version>
+ <maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
<!-- parquet-cli dependencies -->
<opencsv.version>2.3</opencsv.version>
- <jcommander.version>1.35</jcommander.version>
+ <jcommander.version>1.72</jcommander.version>
+ <zstd-jni.version>1.4.0-1</zstd-jni.version>
</properties>
<modules>
@@ -146,13 +150,6 @@
<version>3.4</version>
<scope>test</scope>
</dependency>
- <!-- hadoop-1 requires the old httpclient for testing -->
- <dependency>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
- <version>3.1</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<reporting>
@@ -160,7 +157,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
- <version>2.9</version>
+ <version>${maven-javadoc-plugin.version}</version>
<reportSets>
<reportSet><!-- by default, id = "default" -->
<reports><!-- select non-aggregate reports -->
@@ -180,6 +177,7 @@
<sourceFileExcludes>
<sourceFileExclude>**/generated-sources/**/*.java</sourceFileExclude>
</sourceFileExcludes>
+ <source>8</source>
</configuration>
</plugin>
<plugin>
@@ -238,50 +236,6 @@
</dependencies>
<executions>
<execution>
- <id>check</id>
- <phase>verify</phase>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <rules>
- <requireBackwardCompatibility implementation="org.semver.enforcer.RequireSemanticVersioningConformance">
- <dumpDetails>true</dumpDetails>
- <previousVersion>${previous.version}</previousVersion>
- <excludes>
- <exclude>org/apache/parquet/hadoop/util/**</exclude>
- <exclude>org/apache/parquet/thrift/projection/**</exclude>
- <exclude>org/apache/parquet/thrift/ThriftSchemaConverter</exclude>
- <exclude>org/apache/parquet/filter2/**</exclude>
- <exclude>org/apache/parquet/column/**</exclude>
- <exclude>org/apache/parquet/hadoop/ParquetInputSplit</exclude>
- <exclude>org/apache/parquet/hadoop/CodecFactory**</exclude>
- <exclude>shaded/**</exclude> <!-- shaded by parquet -->
- <exclude>org/apache/parquet/it/unimi/dsi/fastutil/**</exclude> <!-- Another shaded dependency from parquet-column -->
- <exclude>org/apache/parquet/benchmarks/**</exclude>
- <exclude>org/openjdk/**</exclude>
- <!-- temporary exclusions for false-positives -->
- <exclude>org/apache/parquet/Version</exclude>
- <exclude>org/apache/parquet/schema/**</exclude> <!-- methods moved to new superclass -->
- <exclude>org/apache/parquet/thrift/ThriftSchemaConvertVisitor</exclude> <!-- not public -->
- <exclude>org/apache/parquet/avro/AvroParquetReader</exclude> <!-- returns subclass of old return class -->
- <exclude>org/apache/parquet/avro/SpecificDataSupplier</exclude> <!-- made public -->
- <exclude>org/apache/parquet/io/ColumnIOFactory$ColumnIOCreatorVisitor</exclude> <!-- removed non-API class -->
- <exclude>org/apache/parquet/io/ColumnIOFactory/**</exclude> <!-- removed non-API class and methods-->
- <exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added synchronized modifier -->
- <exclude>org/apache/parquet/bytes/BytesInput</exclude> <!-- moved to parquet-common -->
- <exclude>org/apache/parquet/bytes/CapacityByteArrayOutputStream</exclude> <!-- moved to parquet-common -->
- <exclude>org/apache/parquet/bytes/ConcatenatingByteArrayCollector</exclude> <!-- moved to parquet-common -->
- <exclude>org/apache/parquet/bytes/LittleEndianDataInputStream</exclude> <!-- moved to parquet-common -->
- <exclude>org/apache/parquet/bytes/LittleEndianDataOutputStream</exclude> <!-- moved to parquet-common -->
- <exclude>org/apache/parquet/hadoop/metadata/CompressionCodecName</exclude> <!-- moved to parquet-common -->
- <exclude>org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException</exclude> <!-- moved to parquet-common -->
- </excludes>
- </requireBackwardCompatibility>
- </rules>
- </configuration>
- </execution>
- <execution>
<id>enforce-banned-dependencies</id>
<goals>
<goal>enforce</goal>
@@ -315,7 +269,6 @@
<includes>
<include>${jackson.groupId}:*</include>
<include>it.unimi.dsi:fastutil</include>
- <include>com.google.guava:guava</include>
</includes>
</artifactSet>
<!-- Shade jackson but do not include any class. Let parquet-jackson handle this -->
@@ -337,8 +290,8 @@
<shadedPattern>${shade.prefix}.it.unimi.dsi</shadedPattern>
</relocation>
<relocation>
- <pattern>com.google.guava</pattern>
- <shadedPattern>${shade.prefix}.com.google.guava</shadedPattern>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>${shade.prefix}.com.google.common</shadedPattern>
</relocation>
</relocations>
</configuration>
@@ -368,6 +321,17 @@
</pluginManagement>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>${maven-javadoc-plugin.version}</version>
+ <configuration>
+ <sourceFileExcludes>
+ <sourceFileExclude>**/generated-sources/**/*.java</sourceFileExclude>
+ </sourceFileExcludes>
+ <source>8</source>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>com.mycila.maven-license-plugin</groupId>
<artifactId>maven-license-plugin</artifactId>
<version>1.10.b1</version>
@@ -500,6 +464,54 @@
</excludes>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>com.github.siom79.japicmp</groupId>
+ <artifactId>japicmp-maven-plugin</artifactId>
+ <version>${japicmp.version}</version>
+ <configuration>
+ <parameter>
+ <oldVersionPattern>${previous.version}</oldVersionPattern>
+ <breakBuildBasedOnSemanticVersioning>true</breakBuildBasedOnSemanticVersioning>
+ <onlyModified>true</onlyModified>
+ <overrideCompatibilityChangeParameters>
+ <!-- Adding a new method with default implementation to an interface should be a compatible change.
+ That's why they invented it. -->
+ <overrideCompatibilityChangeParameter>
+ <compatibilityChange>METHOD_NEW_DEFAULT</compatibilityChange>
+ <semanticVersionLevel>MINOR</semanticVersionLevel>
+ <binaryCompatible>true</binaryCompatible>
+ <sourceCompatible>true</sourceCompatible>
+ </overrideCompatibilityChangeParameter>
+ </overrideCompatibilityChangeParameters>
+ <excludeModules>
+ <!-- Excluding the following modules because they are not part of the parquet public API -->
+ <excludeModule>parquet-benchmarks</excludeModule>
+ <excludeModule>parquet-cli</excludeModule>
+ <excludeModule>parquet-tools</excludeModule>
+ <excludeModule>parquet-format-structures</excludeModule>
+
+ <!-- Excluding the following modules because bundles do not contain any java classes while they still fail the
+ compatibility check because of missing dependencies -->
+ <excludeModule>parquet-hadoop-bundle</excludeModule>
+ <excludeModule>parquet-hive-bundle</excludeModule>
+ <excludeModule>parquet-pig-bundle</excludeModule>
+ </excludeModules>
+ <excludes>
+ <exclude>${shade.prefix}</exclude>
+ </excludes>
+ </parameter>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>cmp</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
@@ -569,21 +581,6 @@
</profile>
<profile>
- <id>hadoop-1</id>
- <activation>
- <property>
- <name>hadoop.profile</name>
- <value>hadoop1</value>
- </property>
- </activation>
- <properties>
- <!-- test hadoop-1 with the same jars that were produced for default profile -->
- <maven.main.skip>true</maven.main.skip>
- <hadoop.version>${hadoop1.version}</hadoop.version>
- <pig.classifier />
- </properties>
- </profile>
- <profile>
<id>thrift9</id>
<properties>
<thrift.version>0.9.0</thrift.version>