PARQUET-1784: Column-wise configuration (#754)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnProperty.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnProperty.java
new file mode 100644
index 0000000..e9a21d6
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnProperty.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Represents a Parquet property that may have different values for the different columns.
+ */
+abstract class ColumnProperty<T> {
+ private static class DefaultColumnProperty<T> extends ColumnProperty<T> {
+ private final T defaultValue;
+
+ private DefaultColumnProperty(T defaultValue) {
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public T getValue(ColumnPath columnPath) {
+ return getDefaultValue();
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toString(getDefaultValue());
+ }
+ }
+
+ private static class MultipleColumnProperty<T> extends DefaultColumnProperty<T> {
+ private final Map<ColumnPath, T> values;
+
+ private MultipleColumnProperty(T defaultValue, Map<ColumnPath, T> values) {
+ super(defaultValue);
+ assert !values.isEmpty();
+ this.values = new HashMap<>(values);
+ }
+
+ @Override
+ public T getValue(ColumnPath columnPath) {
+ T value = values.get(columnPath);
+ if (value != null) {
+ return value;
+ }
+ return getDefaultValue();
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toString(getDefaultValue()) + ' ' + values.toString();
+ }
+ }
+
+ static class Builder<T> {
+ private T defaultValue;
+ private final Map<ColumnPath, T> values = new HashMap<>();
+
+ private Builder() {
+ }
+
+ public Builder<T> withDefaultValue(T defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ public Builder<T> withValue(ColumnPath columnPath, T value) {
+ values.put(columnPath, value);
+ return this;
+ }
+
+ public Builder<T> withValue(String columnPath, T value) {
+ return withValue(ColumnPath.fromDotString(columnPath), value);
+ }
+
+ public Builder<T> withValue(ColumnDescriptor columnDescriptor, T value) {
+ return withValue(ColumnPath.get(columnDescriptor.getPath()), value);
+ }
+
+ public ColumnProperty<T> build() {
+ if (values.isEmpty()) {
+ return new DefaultColumnProperty<>(defaultValue);
+ } else {
+ return new MultipleColumnProperty<>(defaultValue, values);
+ }
+ }
+ }
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ public static <T> Builder<T> builder(ColumnProperty<T> toCopy) {
+ Builder<T> builder = new Builder<>();
+ builder.withDefaultValue(((DefaultColumnProperty<T>) toCopy).defaultValue);
+ if (toCopy instanceof MultipleColumnProperty) {
+ builder.values.putAll(((MultipleColumnProperty<T>) toCopy).values);
+ }
+ return builder;
+ }
+
+ public abstract T getDefaultValue();
+
+ public abstract T getValue(ColumnPath columnPath);
+
+ public T getValue(String columnPath) {
+ return getValue(ColumnPath.fromDotString(columnPath));
+ }
+
+ public T getValue(ColumnDescriptor columnDescriptor) {
+ return getValue(ColumnPath.get(columnDescriptor.getPath()));
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 1110d7c..4595723 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -88,7 +88,7 @@
private final int pageSizeThreshold;
private final int dictionaryPageSizeThreshold;
private final WriterVersion writerVersion;
- private final boolean enableDictionary;
+ private final ColumnProperty<Boolean> dictionaryEnabled;
private final int minRowCountForPageSizeCheck;
private final int maxRowCountForPageSizeCheck;
private final boolean estimateNextSizeCheck;
@@ -100,27 +100,24 @@
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;
- private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
- int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
- ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
- boolean pageWriteChecksumEnabled, int statisticsTruncateLength, boolean enableByteStreamSplit) {
- this.pageSizeThreshold = pageSize;
+ private ParquetProperties(Builder builder) {
+ this.pageSizeThreshold = builder.pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
- this.dictionaryPageSizeThreshold = dictPageSize;
- this.writerVersion = writerVersion;
- this.enableDictionary = enableDict;
- this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck;
- this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
- this.estimateNextSizeCheck = estimateNextSizeCheck;
- this.allocator = allocator;
+ this.dictionaryPageSizeThreshold = builder.dictPageSize;
+ this.writerVersion = builder.writerVersion;
+ this.dictionaryEnabled = builder.enableDict.build();
+ this.minRowCountForPageSizeCheck = builder.minRowCountForPageSizeCheck;
+ this.maxRowCountForPageSizeCheck = builder.maxRowCountForPageSizeCheck;
+ this.estimateNextSizeCheck = builder.estimateNextSizeCheck;
+ this.allocator = builder.allocator;
- this.valuesWriterFactory = writerFactory;
- this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
- this.statisticsTruncateLength = statisticsTruncateLength;
- this.pageRowCountLimit = pageRowCountLimit;
- this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
- this.enableByteStreamSplit = enableByteStreamSplit;
+ this.valuesWriterFactory = builder.valuesWriterFactory;
+ this.columnIndexTruncateLength = builder.columnIndexTruncateLength;
+ this.statisticsTruncateLength = builder.statisticsTruncateLength;
+ this.pageRowCountLimit = builder.pageRowCountLimit;
+ this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
+ this.enableByteStreamSplit = builder.enableByteStreamSplit;
}
public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -173,8 +170,13 @@
return writerVersion;
}
+ @Deprecated
public boolean isEnableDictionary() {
- return enableDictionary;
+ return dictionaryEnabled.getDefaultValue();
+ }
+
+ public boolean isDictionaryEnabled(ColumnDescriptor column) {
+ return dictionaryEnabled.getValue(column);
}
public boolean isByteStreamSplitEnabled() {
@@ -237,10 +239,25 @@
return new Builder(toCopy);
}
+ @Override
+ public String toString() {
+ return "Parquet page size to " + getPageSizeThreshold() + '\n'
+ + "Parquet dictionary page size to " + getDictionaryPageSizeThreshold() + '\n'
+ + "Dictionary is " + dictionaryEnabled + '\n'
+ + "Writer version is: " + getWriterVersion() + '\n'
+ + "Page size checking is: " + (estimateNextSizeCheck() ? "estimated" : "constant") + '\n'
+ + "Min row count for page size check is: " + getMinRowCountForPageSizeCheck() + '\n'
+ + "Max row count for page size check is: " + getMaxRowCountForPageSizeCheck() + '\n'
+ + "Truncate length for column indexes is: " + getColumnIndexTruncateLength() + '\n'
+ + "Truncate length for statistics min/max is: " + getStatisticsTruncateLength() + '\n'
+ + "Page row count limit to " + getPageRowCountLimit() + '\n'
+ + "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off");
+ }
+
public static class Builder {
private int pageSize = DEFAULT_PAGE_SIZE;
private int dictPageSize = DEFAULT_DICTIONARY_PAGE_SIZE;
- private boolean enableDict = DEFAULT_IS_DICTIONARY_ENABLED;
+ private final ColumnProperty.Builder<Boolean> enableDict;
private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
@@ -254,11 +271,12 @@
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
private Builder() {
+ enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
}
private Builder(ParquetProperties toCopy) {
this.pageSize = toCopy.pageSizeThreshold;
- this.enableDict = toCopy.enableDictionary;
+ this.enableDict = ColumnProperty.<Boolean>builder(toCopy.dictionaryEnabled);
this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
this.writerVersion = toCopy.writerVersion;
this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
@@ -291,7 +309,19 @@
* @return this builder for method chaining.
*/
public Builder withDictionaryEncoding(boolean enableDictionary) {
- this.enableDict = enableDictionary;
+ this.enableDict.withDefaultValue(enableDictionary);
+ return this;
+ }
+
+ /**
+ * Enable or disable dictionary encoding for the specified column.
+ *
+ * @param columnPath the path of the column (dot-string)
+ * @param enableDictionary whether dictionary encoding should be enabled
+ * @return this builder for method chaining.
+ */
+ public Builder withDictionaryEncoding(String columnPath, boolean enableDictionary) {
+ this.enableDict.withValue(columnPath, enableDictionary);
return this;
}
@@ -378,11 +408,7 @@
}
public ParquetProperties build() {
- ParquetProperties properties =
- new ParquetProperties(writerVersion, pageSize, dictPageSize,
- enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
- estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
- pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength, enableByteStreamSplit);
+ ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
index 6584894..9d66d8d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
@@ -76,7 +76,7 @@
}
static ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, ParquetProperties parquetProperties, Encoding dictPageEncoding, Encoding dataPageEncoding, ValuesWriter writerToFallBackTo) {
- if (parquetProperties.isEnableDictionary()) {
+ if (parquetProperties.isDictionaryEnabled(path)) {
return FallbackValuesWriter.of(
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding),
writerToFallBackTo);
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
index 53bcaef..c1b7f9f 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
@@ -18,6 +18,13 @@
*/
package org.apache.parquet.column.values.factory;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Types.required;
+import static org.junit.Assert.assertTrue;
+
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
@@ -26,7 +33,12 @@
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.*;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
@@ -34,13 +46,8 @@
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-
import org.junit.Test;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class DefaultValuesWriterFactoryTest {
@Test
@@ -412,8 +419,71 @@
PlainDoubleDictionaryValuesWriter.class, ByteStreamSplitValuesWriter.class);
}
+ public void testColumnWiseDictionaryWithFalseDefault() {
+ ValuesWriterFactory factory = getDefaultFactory(WriterVersion.PARQUET_2_0, false,
+ "binary_dict",
+ "boolean_dict",
+ "float_dict",
+ "int32_dict");
+ validateFactory(factory, BINARY, "binary_dict",
+ PlainBinaryDictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+ validateFactory(factory, BINARY, "binary_no_dict",
+ DeltaByteArrayWriter.class);
+ validateFactory(factory, BOOLEAN, "boolean_dict",
+ RunLengthBitPackingHybridValuesWriter.class);
+ validateFactory(factory, BOOLEAN, "boolean_no_dict",
+ RunLengthBitPackingHybridValuesWriter.class);
+ validateFactory(factory, FLOAT, "float_dict",
+ PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+ validateFactory(factory, FLOAT, "float_no_dict",
+ PlainValuesWriter.class);
+ validateFactory(factory, INT32, "int32_dict",
+ PlainIntegerDictionaryValuesWriter.class, DeltaBinaryPackingValuesWriter.class);
+ validateFactory(factory, INT32, "int32_no_dict",
+ DeltaBinaryPackingValuesWriter.class);
+ }
+
+ @Test
+ public void testColumnWiseDictionaryWithTrueDefault() {
+ ValuesWriterFactory factory = getDefaultFactory(WriterVersion.PARQUET_2_0, true,
+ "binary_no_dict",
+ "boolean_no_dict",
+ "float_no_dict",
+ "int32_no_dict");
+ validateFactory(factory, BINARY, "binary_dict",
+ PlainBinaryDictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+ validateFactory(factory, BINARY, "binary_no_dict",
+ DeltaByteArrayWriter.class);
+ validateFactory(factory, BOOLEAN, "boolean_dict",
+ RunLengthBitPackingHybridValuesWriter.class);
+ validateFactory(factory, BOOLEAN, "boolean_no_dict",
+ RunLengthBitPackingHybridValuesWriter.class);
+ validateFactory(factory, FLOAT, "float_dict",
+ PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+ validateFactory(factory, FLOAT, "float_no_dict",
+ PlainValuesWriter.class);
+ validateFactory(factory, INT32, "int32_dict",
+ PlainIntegerDictionaryValuesWriter.class, DeltaBinaryPackingValuesWriter.class);
+ validateFactory(factory, INT32, "int32_no_dict",
+ DeltaBinaryPackingValuesWriter.class);
+ }
+
+ private void validateFactory(ValuesWriterFactory factory, PrimitiveTypeName typeName, String colName,
+ Class<? extends ValuesWriter> initialWriterClass, Class<? extends ValuesWriter> fallbackWriterClass) {
+ ColumnDescriptor column = createColumnDescriptor(typeName, colName);
+ ValuesWriter writer = factory.newValuesWriter(column);
+ validateFallbackWriter(writer, initialWriterClass, fallbackWriterClass);
+ }
+
+ private void validateFactory(ValuesWriterFactory factory, PrimitiveTypeName typeName, String colName,
+ Class<? extends ValuesWriter> expectedWriterClass) {
+ ColumnDescriptor column = createColumnDescriptor(typeName, colName);
+ ValuesWriter writer = factory.newValuesWriter(column);
+ validateWriterType(writer, expectedWriterClass);
+ }
+
private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion version, boolean enableDictionary, boolean enableByteStreamSplit, Class<? extends ValuesWriter> expectedValueWriterClass) {
- ColumnDescriptor mockPath = getMockColumn(typeName);
+ ColumnDescriptor mockPath = createColumnDescriptor(typeName);
ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary, enableByteStreamSplit);
ValuesWriter writer = factory.newValuesWriter(mockPath);
@@ -421,17 +491,19 @@
}
private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion version, boolean enableDictionary, boolean enableByteStreamSplit, Class<? extends ValuesWriter> initialValueWriterClass, Class<? extends ValuesWriter> fallbackValueWriterClass) {
- ColumnDescriptor mockPath = getMockColumn(typeName);
+ ColumnDescriptor mockPath = createColumnDescriptor(typeName);
ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary, enableByteStreamSplit);
ValuesWriter writer = factory.newValuesWriter(mockPath);
validateFallbackWriter(writer, initialValueWriterClass, fallbackValueWriterClass);
}
- private ColumnDescriptor getMockColumn(PrimitiveTypeName typeName) {
- ColumnDescriptor mockPath = mock(ColumnDescriptor.class);
- when(mockPath.getType()).thenReturn(typeName);
- return mockPath;
+ private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName) {
+ return createColumnDescriptor(typeName, "fake_" + typeName.name().toLowerCase() + "_col");
+ }
+
+ private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName, String name) {
+ return new ColumnDescriptor(new String[] { name }, required(typeName).length(1).named(name), 0, 0);
}
private ValuesWriterFactory getDefaultFactory(WriterVersion writerVersion, boolean enableDictionary, boolean enableByteStreamSplit) {
@@ -446,6 +518,20 @@
return factory;
}
+ private ValuesWriterFactory getDefaultFactory(WriterVersion writerVersion, boolean dictEnabledDefault, String... dictInverseColumns) {
+ ValuesWriterFactory factory = new DefaultValuesWriterFactory();
+ ParquetProperties.Builder builder = ParquetProperties.builder()
+ .withDictionaryEncoding(dictEnabledDefault)
+ .withWriterVersion(writerVersion)
+ .withValuesWriterFactory(factory);
+ for (String column : dictInverseColumns) {
+ builder.withDictionaryEncoding(column, !dictEnabledDefault);
+ }
+ builder.build();
+
+ return factory;
+ }
+
private void validateWriterType(ValuesWriter writer, Class<? extends ValuesWriter> valuesWriterClass) {
assertTrue("Not instance of: " + valuesWriterClass.getName(), valuesWriterClass.isInstance(writer));
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnConfigParser.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnConfigParser.java
new file mode 100644
index 0000000..884d3ea
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnConfigParser.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Parses the specified key-values in the format of root.key#column.path from a {@link Configuration} object.
+ */
+class ColumnConfigParser {
+
+ private static class ConfigHelper<T> {
+ private final String prefix;
+ private final Function<String, T> function;
+ private final BiConsumer<String, T> consumer;
+
+ public ConfigHelper(String prefix, Function<String, T> function, BiConsumer<String, T> consumer) {
+ this.prefix = prefix;
+ this.function = function;
+ this.consumer = consumer;
+ }
+
+ public void processKey(String key) {
+ if (key.startsWith(prefix)) {
+ String columnPath = key.substring(prefix.length());
+ T value = function.apply(key);
+ consumer.accept(columnPath, value);
+ }
+ }
+ }
+
+ private final List<ConfigHelper<?>> helpers = new ArrayList<>();
+
+ public <T> ColumnConfigParser withColumnConfig(String rootKey, Function<String, T> function,
+ BiConsumer<String, T> consumer) {
+ helpers.add(new ConfigHelper<T>(rootKey + '#', function, consumer));
+ return this;
+ }
+
+ public void parseConfig(Configuration conf) {
+ for (Map.Entry<String, String> entry : conf) {
+ for (ConfigHelper<?> helper : helpers) {
+ // We retrieve the value from function instead of parsing from the string here to use the exact implementations
+ // in Configuration
+ helper.processKey(entry.getKey());
+ }
+ }
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 3f4b93c..1528ffb 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -425,7 +425,7 @@
throws IOException, InterruptedException {
final WriteSupport<T> writeSupport = getWriteSupport(conf);
- ParquetProperties props = ParquetProperties.builder()
+ ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
.withPageSize(getPageSize(conf))
.withDictionaryPageSize(getDictionaryPageSize(conf))
.withDictionaryEncoding(getEnableDictionary(conf))
@@ -436,8 +436,12 @@
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
.withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
- .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
- .build();
+ .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));
+ new ColumnConfigParser()
+ .withColumnConfig(ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding)
+ .parseConfig(conf);
+
+ ParquetProperties props = propsBuilder.build();
long blockSize = getLongBlockSize(conf);
int maxPaddingSize = getMaxPaddingSize(conf);
@@ -445,19 +449,9 @@
if (LOG.isInfoEnabled()) {
LOG.info("Parquet block size to {}", blockSize);
- LOG.info("Parquet page size to {}", props.getPageSizeThreshold());
- LOG.info("Parquet dictionary page size to {}", props.getDictionaryPageSizeThreshold());
- LOG.info("Dictionary is {}", (props.isEnableDictionary() ? "on" : "off"));
LOG.info("Validation is {}", (validating ? "on" : "off"));
- LOG.info("Writer version is: {}", props.getWriterVersion());
LOG.info("Maximum row group padding size is {} bytes", maxPaddingSize);
- LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant"));
- LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
- LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
- LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
- LOG.info("Truncate length for statistics min/max is: {}", props.getStatisticsTruncateLength());
- LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
- LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
+ LOG.info("Parquet properties are:\n{}", props);
}
WriteContext init = writeSupport.init(conf);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 13c0419..8e698f5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -490,6 +490,18 @@
}
/**
+ * Enable or disable dictionary encoding of the specified column for the constructed writer.
+ *
+ * @param columnPath the path of the column (dot-string)
+ * @param enableDictionary whether dictionary encoding should be enabled
+ * @return this builder for method chaining.
+ */
+ public SELF withDictionaryEncoding(String columnPath, boolean enableDictionary) {
+ encodingPropsBuilder.withDictionaryEncoding(columnPath, enableDictionary);
+ return self();
+ }
+
+ /**
* Enables validation for the constructed writer.
*
* @return this builder for method chaining.