PARQUET-970: Add Lz4 and Zstd compression codecs
which are already added in [apache/parquet-format#70](https://github.com/apache/parquet-format/pull/70)
Author: Xianjin YE <advancedxy@gmail.com>
Closes #419 from advancedxy/PARQUET-970 and squashes the following commits:
3501659 [Xianjin YE] PARQUET-970: Add Lz4 and Zstd compression codec
48ad74e [Xianjin YE] PARQUET-970: Add Lz4 and Zstd compression codec
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c524ceb..0183852 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -562,7 +562,9 @@
NOT DEFINED ENV{BROTLI_STATIC_LIB_DEC} OR
NOT DEFINED ENV{BROTLI_STATIC_LIB_COMMON} OR
NOT DEFINED ENV{SNAPPY_STATIC_LIB} OR
- NOT DEFINED ENV{ZLIB_STATIC_LIB})
+ NOT DEFINED ENV{ZLIB_STATIC_LIB} OR
+ NOT DEFINED ENV{LZ4_STATIC_LIB} OR
+ NOT DEFINED ENV{ZSTD_STATIC_LIB})
message(FATAL_ERROR "Missing transitive dependencies for Arrow static linking")
endif()
@@ -571,6 +573,8 @@
set(BROTLI_STATIC_LIB_COMMON "$ENV{BROTLI_STATIC_LIB_COMMON}")
set(SNAPPY_STATIC_LIB "$ENV{SNAPPY_STATIC_LIB}")
set(ZLIB_STATIC_LIB "$ENV{ZLIB_STATIC_LIB}")
+ set(LZ4_STATIC_LIB "$ENV{LZ4_STATIC_LIB}")
+ set(ZSTD_STATIC_LIB "$ENV{ZSTD_STATIC_LIB}")
add_library(brotli_enc STATIC IMPORTED)
set_target_properties(brotli_enc PROPERTIES IMPORTED_LOCATION ${BROTLI_STATIC_LIB_ENC})
@@ -582,6 +586,10 @@
set_target_properties(snappy PROPERTIES IMPORTED_LOCATION ${SNAPPY_STATIC_LIB})
add_library(zlib STATIC IMPORTED)
set_target_properties(zlib PROPERTIES IMPORTED_LOCATION ${ZLIB_STATIC_LIB})
+ add_library(lz4 STATIC IMPORTED)
+ set_target_properties(lz4 PROPERTIES IMPORTED_LOCATION ${LZ4_STATIC_LIB})
+ add_library(zstd STATIC IMPORTED)
+ set_target_properties(zstd PROPERTIES IMPORTED_LOCATION ${ZSTD_STATIC_LIB})
set(TRANSITIVE_LINK_LIBS
snappy
@@ -589,6 +597,8 @@
brotli_enc
brotli_dec
brotli_common
+ lz4
+ zstd
)
set(ARROW_LINK_LIBS
diff --git a/ci/msvc-build.bat b/ci/msvc-build.bat
index 67df565..29d8b83 100644
--- a/ci/msvc-build.bat
+++ b/ci/msvc-build.bat
@@ -28,7 +28,9 @@
)
if "%CONFIGURATION%" == "Toolchain" (
- conda install -y boost-cpp=1.63 brotli=0.6.0 zlib=1.2.11 snappy=1.1.6 thrift-cpp=0.10.0 -c conda-forge
+ conda install -y boost-cpp=1.63 thrift-cpp=0.10.0 ^
+ brotli=0.6.0 zlib=1.2.11 snappy=1.1.6 lz4-c=1.7.5 zstd=1.2.0 ^
+ -c conda-forge
set ARROW_BUILD_TOOLCHAIN=%MINICONDA%/Library
set PARQUET_BUILD_TOOLCHAIN=%MINICONDA%/Library
diff --git a/ci/travis_script_static.sh b/ci/travis_script_static.sh
index 29331e9..6da7a33 100755
--- a/ci/travis_script_static.sh
+++ b/ci/travis_script_static.sh
@@ -62,6 +62,8 @@
export BROTLI_STATIC_LIB_DEC=$BROTLI_EP/libbrotlidec.a
export BROTLI_STATIC_LIB_COMMON=$BROTLI_EP/libbrotlicommon.a
export ZLIB_STATIC_LIB=$ARROW_EP/zlib_ep/src/zlib_ep-install/lib/libz.a
+export LZ4_STATIC_LIB=$ARROW_EP/lz4_ep-prefix/src/lz4_ep/lib/liblz4.a
+export ZSTD_STATIC_LIB=$ARROW_EP/zstd_ep-prefix/src/zstd_ep/lib/libzstd.a
cmake -DPARQUET_CXXFLAGS="$PARQUET_CXXFLAGS" \
-DPARQUET_TEST_MEMCHECK=ON \
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index 53630e6..13ece43 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -359,8 +359,8 @@
-DCMAKE_INSTALL_LIBDIR=${ARROW_LIB_DIR}
-DARROW_JEMALLOC=OFF
-DARROW_IPC=OFF
- -DARROW_WITH_LZ4=OFF
- -DARROW_WITH_ZSTD=OFF
+ -DARROW_WITH_LZ4=ON
+ -DARROW_WITH_ZSTD=ON
-DARROW_BUILD_SHARED=${PARQUET_BUILD_SHARED}
-DARROW_BOOST_USE_SHARED=${PARQUET_BOOST_USE_SHARED}
-DARROW_BUILD_TESTS=OFF)
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index c20d6e2..ec7b52e 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -56,14 +56,16 @@
state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
}
-template <Repetition::type repetition>
+template <Repetition::type repetition,
+ Compression::type codec = Compression::UNCOMPRESSED>
static void BM_WriteInt64Column(::benchmark::State& state) {
format::ColumnChunk thrift_metadata;
std::vector<int64_t> values(state.range(0), 128);
std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
- std::shared_ptr<WriterProperties> properties = default_writer_properties();
+ WriterProperties::Builder builder;
+ std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
@@ -84,6 +86,27 @@
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
+ ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
+ ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::SNAPPY)
+ ->Range(1024, 65536);
+
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4)
+ ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4)
+ ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4)
+ ->Range(1024, 65536);
+
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::ZSTD)
+ ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
+ ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::ZSTD)
+ ->Range(1024, 65536);
+
std::unique_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
int64_t num_values, ColumnDescriptor* schema) {
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
@@ -92,14 +115,16 @@
return std::unique_ptr<Int64Reader>(new Int64Reader(schema, std::move(page_reader)));
}
-template <Repetition::type repetition>
+template <Repetition::type repetition,
+ Compression::type codec = Compression::UNCOMPRESSED>
static void BM_ReadInt64Column(::benchmark::State& state) {
format::ColumnChunk thrift_metadata;
std::vector<int64_t> values(state.range(0), 128);
std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
- std::shared_ptr<WriterProperties> properties = default_writer_properties();
+ WriterProperties::Builder builder;
+ std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
@@ -134,6 +159,27 @@
BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)
->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
+ ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
+ ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::SNAPPY)
+ ->RangePair(1024, 65536, 1, 1024);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4)
+ ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4)
+ ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4)
+ ->RangePair(1024, 65536, 1, 1024);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD)
+ ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
+ ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD)
+ ->RangePair(1024, 65536, 1, 1024);
+
static void BM_RleEncoding(::benchmark::State& state) {
std::vector<int16_t> levels(state.range(0), 0);
int64_t n = 0;
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 3e4c04f..681f022 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -307,6 +307,16 @@
LARGE_SIZE);
}
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) {
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false,
+ LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) {
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false,
+ LARGE_SIZE);
+}
+
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
LARGE_SIZE);
@@ -327,6 +337,16 @@
LARGE_SIZE);
}
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) {
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true,
+ LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) {
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true,
+ LARGE_SIZE);
+}
+
TYPED_TEST(TestPrimitiveWriter, Optional) {
// Optional and non-repeated, with definition levels
// but no repetition levels
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 39ea1d9..0cab75f 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -178,8 +178,9 @@
}
TEST_F(TestPageSerde, Compression) {
- Compression::type codec_types[3] = {Compression::GZIP, Compression::SNAPPY,
- Compression::BROTLI};
+ Compression::type codec_types[5] = {Compression::GZIP, Compression::SNAPPY,
+ Compression::BROTLI, Compression::LZ4,
+ Compression::ZSTD};
const int32_t num_rows = 32; // dummy value
data_page_header_.num_values = num_rows;
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 75f3fbd..f9f12be 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -209,7 +209,17 @@
this->FileSerializeTest(Compression::BROTLI);
}
-TYPED_TEST(TestSerialize, SmallFileGzip) { this->FileSerializeTest(Compression::GZIP); }
+TYPED_TEST(TestSerialize, SmallFileGzip) {
+ this->FileSerializeTest(Compression::GZIP);
+}
+
+TYPED_TEST(TestSerialize, SmallFileLz4) {
+ this->FileSerializeTest(Compression::LZ4);
+}
+
+TYPED_TEST(TestSerialize, SmallFileZstd) {
+ this->FileSerializeTest(Compression::ZSTD);
+}
} // namespace test
diff --git a/src/parquet/parquet.thrift b/src/parquet/parquet.thrift
index f774d49..a72ef2c 100644
--- a/src/parquet/parquet.thrift
+++ b/src/parquet/parquet.thrift
@@ -363,6 +363,8 @@
GZIP = 2;
LZO = 3;
BROTLI = 4;
+ LZ4 = 5;
+ ZSTD = 6;
}
enum PageType {
diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc
index c48fc34..4a063c1 100644
--- a/src/parquet/properties-test.cc
+++ b/src/parquet/properties-test.cc
@@ -46,12 +46,14 @@
TEST(TestWriterProperties, AdvancedHandling) {
WriterProperties::Builder builder;
builder.compression("gzip", Compression::GZIP);
+ builder.compression("zstd", Compression::ZSTD);
builder.compression(Compression::SNAPPY);
builder.encoding(Encoding::DELTA_BINARY_PACKED);
builder.encoding("delta-length", Encoding::DELTA_LENGTH_BYTE_ARRAY);
std::shared_ptr<WriterProperties> props = builder.build();
ASSERT_EQ(Compression::GZIP, props->compression(ColumnPath::FromDotString("gzip")));
+ ASSERT_EQ(Compression::ZSTD, props->compression(ColumnPath::FromDotString("zstd")));
ASSERT_EQ(Compression::SNAPPY,
props->compression(ColumnPath::FromDotString("delta-length")));
ASSERT_EQ(Encoding::DELTA_BINARY_PACKED,
diff --git a/src/parquet/types.cc b/src/parquet/types.cc
index 0652c6a..8ec3f3b 100644
--- a/src/parquet/types.cc
+++ b/src/parquet/types.cc
@@ -108,6 +108,12 @@
case Compression::LZO:
return "LZO";
break;
+ case Compression::LZ4:
+ return "LZ4";
+ break;
+ case Compression::ZSTD:
+ return "ZSTD";
+ break;
default:
return "UNKNOWN";
break;
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 53b33d5..a810944 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -109,7 +109,7 @@
// Compression, mirrors parquet::CompressionCodec
struct Compression {
- enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI };
+ enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD };
};
// parquet::PageType
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 94b86c1..a28917b 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -57,6 +57,12 @@
case Compression::BROTLI:
PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
break;
+ case Compression::LZ4:
+ PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZ4, &result));
+ break;
+ case Compression::ZSTD:
+ PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::ZSTD, &result));
+ break;
default:
break;
}