[SPARK-45481][SQL] Introduce a mapper for parquet compression codecs
### What changes were proposed in this pull request?
Currently, Spark supported all the parquet compression codecs, but the parquet supported compression codecs and spark supported are not completely one-on-one due to Spark introduce a fake compression codecs none.
On the other hand, there are a lot of magic strings copy from parquet compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency.
The `CompressionCodecName`, refer: https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
### Why are the changes needed?
Let developers easy to use parquet compression codecs.
### Does this PR introduce _any_ user-facing change?
'No'.
Introduce a new class.
### How was this patch tested?
Exists test cases.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43308 from beliefer/SPARK-45481.
Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Jiaan Geng <beliefer@163.com>
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
new file mode 100644
index 0000000..1a37c7a
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * A mapper class from Spark supported parquet compression codecs to parquet compression codecs.
+ */
+public enum ParquetCompressionCodec {
+ NONE(CompressionCodecName.UNCOMPRESSED),
+ UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED),
+ SNAPPY(CompressionCodecName.SNAPPY),
+ GZIP(CompressionCodecName.GZIP),
+ LZO(CompressionCodecName.LZO),
+ BROTLI(CompressionCodecName.BROTLI),
+ LZ4(CompressionCodecName.LZ4),
+ LZ4_RAW(CompressionCodecName.LZ4_RAW),
+ ZSTD(CompressionCodecName.ZSTD);
+
+ private final CompressionCodecName compressionCodec;
+
+ ParquetCompressionCodec(CompressionCodecName compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ }
+
+ public CompressionCodecName getCompressionCodec() {
+ return this.compressionCodec;
+ }
+
+ public static ParquetCompressionCodec fromString(String s) {
+ return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
+ }
+
+ public static final List<ParquetCompressionCodec> availableCodecs =
+ Arrays.asList(
+ ParquetCompressionCodec.UNCOMPRESSED,
+ ParquetCompressionCodec.SNAPPY,
+ ParquetCompressionCodec.GZIP,
+ ParquetCompressionCodec.ZSTD,
+ ParquetCompressionCodec.LZ4,
+ ParquetCompressionCodec.LZ4_RAW);
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 559a994..ae110fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -20,7 +20,6 @@
import java.util.Locale
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -88,16 +87,10 @@
object ParquetOptions extends DataSourceOptions {
// The parquet compression short names
- private val shortParquetCompressionCodecNames = Map(
- "none" -> CompressionCodecName.UNCOMPRESSED,
- "uncompressed" -> CompressionCodecName.UNCOMPRESSED,
- "snappy" -> CompressionCodecName.SNAPPY,
- "gzip" -> CompressionCodecName.GZIP,
- "lzo" -> CompressionCodecName.LZO,
- "brotli" -> CompressionCodecName.BROTLI,
- "lz4" -> CompressionCodecName.LZ4,
- "lz4_raw" -> CompressionCodecName.LZ4_RAW,
- "zstd" -> CompressionCodecName.ZSTD)
+ private val shortParquetCompressionCodecNames =
+ ParquetCompressionCodec.values().map {
+ codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec
+ }.toMap
def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index 4752787..ba32288 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -16,9 +16,12 @@
*/
package org.apache.spark.sql.execution.benchmark
+import java.util.Locale
+
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
/**
@@ -51,7 +54,8 @@
mainArgs
}
- spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
+ spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
formats.foreach { format =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index 771f944..a8736c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.benchmark
import java.io.File
+import java.util.Locale
import scala.jdk.CollectionConverters._
import scala.util.Random
@@ -28,7 +29,7 @@
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
@@ -99,15 +100,17 @@
spark.read.json(dir).createOrReplaceTempView("jsonTable")
}
+ val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)
+
private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = {
- df.mode("overwrite").option("compression", "snappy").parquet(dir)
+ df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
}
private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = {
withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
- df.mode("overwrite").option("compression", "snappy").parquet(dir)
+ df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 4862571..10781ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -18,12 +18,14 @@
package org.apache.spark.sql.execution.benchmark
import java.io.File
+import java.util.Locale
import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
@@ -50,7 +52,8 @@
.setIfMissing("spark.driver.memory", "3g")
.setIfMissing("spark.executor.memory", "3g")
.setIfMissing("orc.compression", "snappy")
- .setIfMissing("spark.sql.parquet.compression.codec", "snappy")
+ .setIfMissing("spark.sql.parquet.compression.codec",
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
SparkSession.builder().config(conf).getOrCreate()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index c26272d..f01cfea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.benchmark
+import java.util.Locale
+
import scala.util.Try
import org.apache.spark.SparkConf
@@ -29,6 +31,7 @@
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -51,7 +54,8 @@
val conf = new SparkConf()
.setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
.setAppName("test-sql-context")
- .set("spark.sql.parquet.compression.codec", "snappy")
+ .set("spark.sql.parquet.compression.codec",
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
.set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
.set("spark.driver.memory", "3g")
.set("spark.executor.memory", "3g")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index 11e9f46..1f1805a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -17,7 +17,12 @@
package org.apache.spark.sql.execution.datasources
+import java.util.Locale
+
+import scala.jdk.CollectionConverters._
+
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -58,9 +63,10 @@
// Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central.
- override protected def availableCodecs: Seq[String] = {
- Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw")
- }
+ override protected def availableCodecs: Seq[String] =
+ (ParquetCompressionCodec.NONE +:
+ ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
+ .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
}
class OrcCodecSuite extends FileSourceCodecSuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
index 1a387b7d..28ea430 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
+import java.util.Locale
import scala.jdk.CollectionConverters._
@@ -29,18 +30,9 @@
class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
- Seq(
- "NONE",
- "UNCOMPRESSED",
- "SNAPPY",
- "GZIP",
- "LZO",
- "LZ4",
- "BROTLI",
- "ZSTD",
- "LZ4_RAW").foreach { c =>
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
- val expected = if (c == "NONE") "UNCOMPRESSED" else c
+ ParquetCompressionCodec.values().foreach { codec =>
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
+ val expected = codec.getCompressionCodec.name()
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
@@ -49,25 +41,32 @@
test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
// When "compression" is configured, it should be the first choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
- val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ val props = Map(
+ "compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
+ ParquetOutputFormat.COMPRESSION ->
+ ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
- assert(option.compressionCodecClassName == "UNCOMPRESSED")
+ assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}
// When "compression" is not configured, "parquet.compression" should be the preferred choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
- val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ val props = Map(ParquetOutputFormat.COMPRESSION ->
+ ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
- assert(option.compressionCodecClassName == "GZIP")
+ assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name)
}
// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
- assert(option.compressionCodecClassName == "SNAPPY")
+ assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name)
}
}
@@ -113,8 +112,8 @@
}
test("Create parquet table with compression") {
+ val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name())
Seq(true, false).foreach { isPartitioned =>
- val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW")
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 95a45e5..a5d5f8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -34,8 +34,6 @@
import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.example.ExampleParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
@@ -845,7 +843,7 @@
val data = (0 until 10).map(i => (i, i.toString))
- def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+ def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT)) {
@@ -857,12 +855,9 @@
// Checks default compression codec
checkCompressionCodec(
- CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
+ ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
- checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
- checkCompressionCodec(CompressionCodecName.GZIP)
- checkCompressionCodec(CompressionCodecName.SNAPPY)
- checkCompressionCodec(CompressionCodecName.ZSTD)
+ ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_))
}
private def createParquetWriter(
@@ -878,7 +873,7 @@
.withDictionaryEncoding(dictionaryEnabled)
.withType(schema)
.withWriterVersion(PARQUET_1_0)
- .withCompressionCodec(GZIP)
+ .withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec)
.withRowGroupSize(1024 * 1024)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
@@ -1507,9 +1502,12 @@
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
- val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf)
- assert(option.compressionCodecClassName == "UNCOMPRESSED")
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ val option = new ParquetOptions(
+ Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
+ spark.sessionState.conf)
+ assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
index a5d11f6..df28e7b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
@@ -29,7 +29,7 @@
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.execution.datasources.orc.OrcOptions
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetTest}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetOptions, ParquetTest}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -289,8 +289,14 @@
test("both table-level and session-level compression are set") {
checkForTableWithCompressProp("parquet",
- tableCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"),
- sessionCompressCodecs = List("SNAPPY", "GZIP", "SNAPPY"))
+ tableCompressCodecs = List(
+ ParquetCompressionCodec.UNCOMPRESSED.name,
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name),
+ sessionCompressCodecs = List(
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name,
+ ParquetCompressionCodec.SNAPPY.name))
checkForTableWithCompressProp("orc",
tableCompressCodecs =
List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name),
@@ -301,7 +307,10 @@
test("table-level compression is not set but session-level compressions is set ") {
checkForTableWithCompressProp("parquet",
tableCompressCodecs = List.empty,
- sessionCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"))
+ sessionCompressCodecs = List(
+ ParquetCompressionCodec.UNCOMPRESSED.name,
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name))
checkForTableWithCompressProp("orc",
tableCompressCodecs = List.empty,
sessionCompressCodecs =
@@ -339,7 +348,11 @@
}
test("test table containing mixed compression codec") {
- checkTableWriteWithCompressionCodecs("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP"))
+ checkTableWriteWithCompressionCodecs("parquet",
+ List(
+ ParquetCompressionCodec.UNCOMPRESSED.name,
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name))
checkTableWriteWithCompressionCodecs(
"orc",
List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 2a3c77a..45dd8da 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,9 +19,10 @@
import java.time.{Duration, Period}
import java.time.temporal.ChronoUnit
+import java.util.Locale
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetTest}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -157,7 +158,8 @@
test("SPARK-37098: Alter table properties should invalidate cache") {
// specify the compression in case we change it in future
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) {
withTempPath { dir =>
withTable("t") {
sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 05d2ca1..78365d2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,7 +35,7 @@
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
@@ -2709,7 +2709,9 @@
assert(compression === actualCompression)
}
- Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) =>
+ Seq(
+ ("orc", "ZLIB"),
+ ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case (fileFormat, compression) =>
test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") {
withTable("t") {
@@ -2804,14 +2806,14 @@
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains("parquet"))
val properties = table.properties
- assert(properties.get("parquet.compression") == Some("GZIP"))
+ assert(properties.get("parquet.compression") == Some(ParquetCompressionCodec.GZIP.name))
assert(spark.table("t").collect().isEmpty)
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
- assertCompression(maybeFile, "parquet", "GZIP")
+ assertCompression(maybeFile, "parquet", ParquetCompressionCodec.GZIP.name)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 18e8401..84ee19e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -26,6 +26,7 @@
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -199,7 +200,7 @@
}
test("SPARK-13543: Support for specifying compression codec for Parquet via option()") {
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.UNCOMPRESSED.name) {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")