PARQUET-1784: Column-wise configuration (#754)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnProperty.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnProperty.java
new file mode 100644
index 0000000..e9a21d6
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnProperty.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Represents a Parquet property that may have different values for the different columns.
+ */
+abstract class ColumnProperty<T> {
+  private static class DefaultColumnProperty<T> extends ColumnProperty<T> {
+    private final T defaultValue;
+
+    private DefaultColumnProperty(T defaultValue) {
+      this.defaultValue = defaultValue;
+    }
+
+    @Override
+    public T getDefaultValue() {
+      return defaultValue;
+    }
+
+    @Override
+    public T getValue(ColumnPath columnPath) {
+      return getDefaultValue();
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toString(getDefaultValue());
+    }
+  }
+
+  private static class MultipleColumnProperty<T> extends DefaultColumnProperty<T> {
+    private final Map<ColumnPath, T> values;
+
+    private MultipleColumnProperty(T defaultValue, Map<ColumnPath, T> values) {
+      super(defaultValue);
+      assert !values.isEmpty();
+      this.values = new HashMap<>(values);
+    }
+
+    @Override
+    public T getValue(ColumnPath columnPath) {
+      T value = values.get(columnPath);
+      if (value != null) {
+        return value;
+      }
+      return getDefaultValue();
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toString(getDefaultValue()) + ' ' + values.toString();
+    }
+  }
+
+  static class Builder<T> {
+    private T defaultValue;
+    private final Map<ColumnPath, T> values = new HashMap<>();
+
+    private Builder() {
+    }
+
+    public Builder<T> withDefaultValue(T defaultValue) {
+      this.defaultValue = defaultValue;
+      return this;
+    }
+
+    public Builder<T> withValue(ColumnPath columnPath, T value) {
+      values.put(columnPath, value);
+      return this;
+    }
+
+    public Builder<T> withValue(String columnPath, T value) {
+      return withValue(ColumnPath.fromDotString(columnPath), value);
+    }
+
+    public Builder<T> withValue(ColumnDescriptor columnDescriptor, T value) {
+      return withValue(ColumnPath.get(columnDescriptor.getPath()), value);
+    }
+
+    public ColumnProperty<T> build() {
+      if (values.isEmpty()) {
+        return new DefaultColumnProperty<>(defaultValue);
+      } else {
+        return new MultipleColumnProperty<>(defaultValue, values);
+      }
+    }
+  }
+
+  public static <T> Builder<T> builder() {
+    return new Builder<>();
+  }
+
+  public static <T> Builder<T> builder(ColumnProperty<T> toCopy) {
+    Builder<T> builder = new Builder<>();
+    builder.withDefaultValue(((DefaultColumnProperty<T>) toCopy).defaultValue);
+    if (toCopy instanceof MultipleColumnProperty) {
+      builder.values.putAll(((MultipleColumnProperty<T>) toCopy).values);
+    }
+    return builder;
+  }
+
+  public abstract T getDefaultValue();
+
+  public abstract T getValue(ColumnPath columnPath);
+
+  public T getValue(String columnPath) {
+    return getValue(ColumnPath.fromDotString(columnPath));
+  }
+
+  public T getValue(ColumnDescriptor columnDescriptor) {
+    return getValue(ColumnPath.get(columnDescriptor.getPath()));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 1110d7c..4595723 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -88,7 +88,7 @@
   private final int pageSizeThreshold;
   private final int dictionaryPageSizeThreshold;
   private final WriterVersion writerVersion;
-  private final boolean enableDictionary;
+  private final ColumnProperty<Boolean> dictionaryEnabled;
   private final int minRowCountForPageSizeCheck;
   private final int maxRowCountForPageSizeCheck;
   private final boolean estimateNextSizeCheck;
@@ -100,27 +100,24 @@
   private final boolean pageWriteChecksumEnabled;
   private final boolean enableByteStreamSplit;
 
-  private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
-                            int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
-                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
-                            boolean pageWriteChecksumEnabled, int statisticsTruncateLength, boolean enableByteStreamSplit) {
-    this.pageSizeThreshold = pageSize;
+  private ParquetProperties(Builder builder) {
+    this.pageSizeThreshold = builder.pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
-    this.dictionaryPageSizeThreshold = dictPageSize;
-    this.writerVersion = writerVersion;
-    this.enableDictionary = enableDict;
-    this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck;
-    this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
-    this.estimateNextSizeCheck = estimateNextSizeCheck;
-    this.allocator = allocator;
+    this.dictionaryPageSizeThreshold = builder.dictPageSize;
+    this.writerVersion = builder.writerVersion;
+    this.dictionaryEnabled = builder.enableDict.build();
+    this.minRowCountForPageSizeCheck = builder.minRowCountForPageSizeCheck;
+    this.maxRowCountForPageSizeCheck = builder.maxRowCountForPageSizeCheck;
+    this.estimateNextSizeCheck = builder.estimateNextSizeCheck;
+    this.allocator = builder.allocator;
 
-    this.valuesWriterFactory = writerFactory;
-    this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
-    this.statisticsTruncateLength = statisticsTruncateLength;
-    this.pageRowCountLimit = pageRowCountLimit;
-    this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
-    this.enableByteStreamSplit = enableByteStreamSplit;
+    this.valuesWriterFactory = builder.valuesWriterFactory;
+    this.columnIndexTruncateLength = builder.columnIndexTruncateLength;
+    this.statisticsTruncateLength = builder.statisticsTruncateLength;
+    this.pageRowCountLimit = builder.pageRowCountLimit;
+    this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
+    this.enableByteStreamSplit = builder.enableByteStreamSplit;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -173,8 +170,13 @@
     return writerVersion;
   }
 
+  @Deprecated
   public boolean isEnableDictionary() {
-    return enableDictionary;
+    return dictionaryEnabled.getDefaultValue();
+  }
+
+  public boolean isDictionaryEnabled(ColumnDescriptor column) {
+    return dictionaryEnabled.getValue(column);
   }
 
   public boolean isByteStreamSplitEnabled() {
@@ -237,10 +239,25 @@
     return new Builder(toCopy);
   }
 
+  @Override
+  public String toString() {
+    return "Parquet page size to " + getPageSizeThreshold() + '\n'
+        + "Parquet dictionary page size to " + getDictionaryPageSizeThreshold() + '\n'
+        + "Dictionary is " + dictionaryEnabled + '\n'
+        + "Writer version is: " + getWriterVersion() + '\n'
+        + "Page size checking is: " + (estimateNextSizeCheck() ? "estimated" : "constant") + '\n'
+        + "Min row count for page size check is: " + getMinRowCountForPageSizeCheck() + '\n'
+        + "Max row count for page size check is: " + getMaxRowCountForPageSizeCheck() + '\n'
+        + "Truncate length for column indexes is: " + getColumnIndexTruncateLength() + '\n'
+        + "Truncate length for statistics min/max  is: " + getStatisticsTruncateLength() + '\n'
+        + "Page row count limit to " + getPageRowCountLimit() + '\n'
+        + "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off");
+  }
+
   public static class Builder {
     private int pageSize = DEFAULT_PAGE_SIZE;
     private int dictPageSize = DEFAULT_DICTIONARY_PAGE_SIZE;
-    private boolean enableDict = DEFAULT_IS_DICTIONARY_ENABLED;
+    private final ColumnProperty.Builder<Boolean> enableDict;
     private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
     private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
     private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
@@ -254,11 +271,12 @@
     private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
 
     private Builder() {
+      enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
     }
 
     private Builder(ParquetProperties toCopy) {
       this.pageSize = toCopy.pageSizeThreshold;
-      this.enableDict = toCopy.enableDictionary;
+      this.enableDict = ColumnProperty.<Boolean>builder(toCopy.dictionaryEnabled);
       this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
       this.writerVersion = toCopy.writerVersion;
       this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
@@ -291,7 +309,19 @@
      * @return this builder for method chaining.
      */
     public Builder withDictionaryEncoding(boolean enableDictionary) {
-      this.enableDict = enableDictionary;
+      this.enableDict.withDefaultValue(enableDictionary);
+      return this;
+    }
+
+    /**
+     * Enable or disable dictionary encoding for the specified column.
+     *
+     * @param columnPath the path of the column (dot-string)
+     * @param enableDictionary whether dictionary encoding should be enabled
+     * @return this builder for method chaining.
+     */
+    public Builder withDictionaryEncoding(String columnPath, boolean enableDictionary) {
+      this.enableDict.withValue(columnPath, enableDictionary);
       return this;
     }
 
@@ -378,11 +408,7 @@
     }
 
     public ParquetProperties build() {
-      ParquetProperties properties =
-        new ParquetProperties(writerVersion, pageSize, dictPageSize,
-          enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
-          pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength, enableByteStreamSplit);
+      ParquetProperties properties = new ParquetProperties(this);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
index 6584894..9d66d8d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
@@ -76,7 +76,7 @@
   }
 
   static ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, ParquetProperties parquetProperties, Encoding dictPageEncoding, Encoding dataPageEncoding, ValuesWriter writerToFallBackTo) {
-    if (parquetProperties.isEnableDictionary()) {
+    if (parquetProperties.isDictionaryEnabled(path)) {
       return FallbackValuesWriter.of(
         dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding),
         writerToFallBackTo);
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
index 53bcaef..c1b7f9f 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
@@ -18,6 +18,13 @@
  */
 package org.apache.parquet.column.values.factory;
 
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Types.required;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
@@ -26,7 +33,12 @@
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
 import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
 import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.*;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
 import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
 import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
 import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
@@ -34,13 +46,8 @@
 import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-
 import org.junit.Test;
 
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class DefaultValuesWriterFactoryTest {
 
   @Test
@@ -412,8 +419,71 @@
       PlainDoubleDictionaryValuesWriter.class, ByteStreamSplitValuesWriter.class);
   }
 
+  public void testColumnWiseDictionaryWithFalseDefault() {
+    ValuesWriterFactory factory = getDefaultFactory(WriterVersion.PARQUET_2_0, false,
+        "binary_dict",
+        "boolean_dict",
+        "float_dict",
+        "int32_dict");
+    validateFactory(factory, BINARY, "binary_dict",
+        PlainBinaryDictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+    validateFactory(factory, BINARY, "binary_no_dict",
+        DeltaByteArrayWriter.class);
+    validateFactory(factory, BOOLEAN, "boolean_dict",
+        RunLengthBitPackingHybridValuesWriter.class);
+    validateFactory(factory, BOOLEAN, "boolean_no_dict",
+        RunLengthBitPackingHybridValuesWriter.class);
+    validateFactory(factory, FLOAT, "float_dict",
+        PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+    validateFactory(factory, FLOAT, "float_no_dict",
+        PlainValuesWriter.class);
+    validateFactory(factory, INT32, "int32_dict",
+        PlainIntegerDictionaryValuesWriter.class, DeltaBinaryPackingValuesWriter.class);
+    validateFactory(factory, INT32, "int32_no_dict",
+        DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testColumnWiseDictionaryWithTrueDefault() {
+    ValuesWriterFactory factory = getDefaultFactory(WriterVersion.PARQUET_2_0, true,
+        "binary_no_dict",
+        "boolean_no_dict",
+        "float_no_dict",
+        "int32_no_dict");
+    validateFactory(factory, BINARY, "binary_dict",
+        PlainBinaryDictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+    validateFactory(factory, BINARY, "binary_no_dict",
+        DeltaByteArrayWriter.class);
+    validateFactory(factory, BOOLEAN, "boolean_dict",
+        RunLengthBitPackingHybridValuesWriter.class);
+    validateFactory(factory, BOOLEAN, "boolean_no_dict",
+        RunLengthBitPackingHybridValuesWriter.class);
+    validateFactory(factory, FLOAT, "float_dict",
+        PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+    validateFactory(factory, FLOAT, "float_no_dict",
+        PlainValuesWriter.class);
+    validateFactory(factory, INT32, "int32_dict",
+        PlainIntegerDictionaryValuesWriter.class, DeltaBinaryPackingValuesWriter.class);
+    validateFactory(factory, INT32, "int32_no_dict",
+        DeltaBinaryPackingValuesWriter.class);
+  }
+
+  private void validateFactory(ValuesWriterFactory factory, PrimitiveTypeName typeName, String colName,
+      Class<? extends ValuesWriter> initialWriterClass, Class<? extends ValuesWriter> fallbackWriterClass) {
+    ColumnDescriptor column = createColumnDescriptor(typeName, colName);
+    ValuesWriter writer = factory.newValuesWriter(column);
+    validateFallbackWriter(writer, initialWriterClass, fallbackWriterClass);
+  }
+
+  private void validateFactory(ValuesWriterFactory factory, PrimitiveTypeName typeName, String colName,
+      Class<? extends ValuesWriter> expectedWriterClass) {
+    ColumnDescriptor column = createColumnDescriptor(typeName, colName);
+    ValuesWriter writer = factory.newValuesWriter(column);
+    validateWriterType(writer, expectedWriterClass);
+  }
+
   private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion version, boolean enableDictionary, boolean enableByteStreamSplit, Class<? extends ValuesWriter> expectedValueWriterClass) {
-    ColumnDescriptor mockPath = getMockColumn(typeName);
+    ColumnDescriptor mockPath = createColumnDescriptor(typeName);
     ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary, enableByteStreamSplit);
     ValuesWriter writer = factory.newValuesWriter(mockPath);
 
@@ -421,17 +491,19 @@
   }
 
   private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion version, boolean enableDictionary, boolean enableByteStreamSplit, Class<? extends ValuesWriter> initialValueWriterClass, Class<? extends ValuesWriter> fallbackValueWriterClass) {
-    ColumnDescriptor mockPath = getMockColumn(typeName);
+    ColumnDescriptor mockPath = createColumnDescriptor(typeName);
     ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary, enableByteStreamSplit);
     ValuesWriter writer = factory.newValuesWriter(mockPath);
 
     validateFallbackWriter(writer, initialValueWriterClass, fallbackValueWriterClass);
   }
 
-  private ColumnDescriptor getMockColumn(PrimitiveTypeName typeName) {
-    ColumnDescriptor mockPath = mock(ColumnDescriptor.class);
-    when(mockPath.getType()).thenReturn(typeName);
-    return mockPath;
+  private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName) {
+    return createColumnDescriptor(typeName, "fake_" + typeName.name().toLowerCase() + "_col");
+  }
+
+  private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName, String name) {
+    return new ColumnDescriptor(new String[] { name }, required(typeName).length(1).named(name), 0, 0);
   }
 
   private ValuesWriterFactory getDefaultFactory(WriterVersion writerVersion, boolean enableDictionary, boolean enableByteStreamSplit) {
@@ -446,6 +518,20 @@
     return factory;
   }
 
+  private ValuesWriterFactory getDefaultFactory(WriterVersion writerVersion, boolean dictEnabledDefault, String... dictInverseColumns) {
+    ValuesWriterFactory factory = new DefaultValuesWriterFactory();
+    ParquetProperties.Builder builder = ParquetProperties.builder()
+        .withDictionaryEncoding(dictEnabledDefault)
+        .withWriterVersion(writerVersion)
+        .withValuesWriterFactory(factory);
+    for (String column : dictInverseColumns) {
+      builder.withDictionaryEncoding(column, !dictEnabledDefault);
+    }
+    builder.build();
+
+    return factory;
+  }
+
   private void validateWriterType(ValuesWriter writer, Class<? extends ValuesWriter> valuesWriterClass) {
     assertTrue("Not instance of: " + valuesWriterClass.getName(), valuesWriterClass.isInstance(writer));
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnConfigParser.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnConfigParser.java
new file mode 100644
index 0000000..884d3ea
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnConfigParser.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Parses the specified key-values in the format of root.key#column.path from a {@link Configuration} object.
+ */
+class ColumnConfigParser {
+
+  private static class ConfigHelper<T> {
+    private final String prefix;
+    private final Function<String, T> function;
+    private final BiConsumer<String, T> consumer;
+
+    public ConfigHelper(String prefix, Function<String, T> function, BiConsumer<String, T> consumer) {
+      this.prefix = prefix;
+      this.function = function;
+      this.consumer = consumer;
+    }
+
+    public void processKey(String key) {
+      if (key.startsWith(prefix)) {
+        String columnPath = key.substring(prefix.length());
+        T value = function.apply(key);
+        consumer.accept(columnPath, value);
+      }
+    }
+  }
+
+  private final List<ConfigHelper<?>> helpers = new ArrayList<>();
+
+  public <T> ColumnConfigParser withColumnConfig(String rootKey, Function<String, T> function,
+      BiConsumer<String, T> consumer) {
+    helpers.add(new ConfigHelper<T>(rootKey + '#', function, consumer));
+    return this;
+  }
+
+  public void parseConfig(Configuration conf) {
+    for (Map.Entry<String, String> entry : conf) {
+      for (ConfigHelper<?> helper : helpers) {
+        // We retrieve the value from function instead of parsing from the string here to use the exact implementations
+        // in Configuration
+        helper.processKey(entry.getKey());
+      }
+    }
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 3f4b93c..1528ffb 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -425,7 +425,7 @@
         throws IOException, InterruptedException {
     final WriteSupport<T> writeSupport = getWriteSupport(conf);
 
-    ParquetProperties props = ParquetProperties.builder()
+    ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
         .withPageSize(getPageSize(conf))
         .withDictionaryPageSize(getDictionaryPageSize(conf))
         .withDictionaryEncoding(getEnableDictionary(conf))
@@ -436,8 +436,12 @@
         .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
         .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
         .withPageRowCountLimit(getPageRowCountLimit(conf))
-        .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
-        .build();
+        .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));
+    new ColumnConfigParser()
+        .withColumnConfig(ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding)
+        .parseConfig(conf);
+
+    ParquetProperties props = propsBuilder.build();
 
     long blockSize = getLongBlockSize(conf);
     int maxPaddingSize = getMaxPaddingSize(conf);
@@ -445,19 +449,9 @@
 
     if (LOG.isInfoEnabled()) {
       LOG.info("Parquet block size to {}", blockSize);
-      LOG.info("Parquet page size to {}", props.getPageSizeThreshold());
-      LOG.info("Parquet dictionary page size to {}", props.getDictionaryPageSizeThreshold());
-      LOG.info("Dictionary is {}", (props.isEnableDictionary() ? "on" : "off"));
       LOG.info("Validation is {}", (validating ? "on" : "off"));
-      LOG.info("Writer version is: {}", props.getWriterVersion());
       LOG.info("Maximum row group padding size is {} bytes", maxPaddingSize);
-      LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant"));
-      LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
-      LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
-      LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
-      LOG.info("Truncate length for statistics min/max  is: {}", props.getStatisticsTruncateLength());
-      LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
-      LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
+      LOG.info("Parquet properties are:\n{}", props);
     }
 
     WriteContext init = writeSupport.init(conf);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 13c0419..8e698f5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -490,6 +490,18 @@
     }
 
     /**
+     * Enable or disable dictionary encoding of the specified column for the constructed writer.
+     *
+     * @param columnPath the path of the column (dot-string)
+     * @param enableDictionary whether dictionary encoding should be enabled
+     * @return this builder for method chaining.
+     */
+    public SELF withDictionaryEncoding(String columnPath, boolean enableDictionary) {
+      encodingPropsBuilder.withDictionaryEncoding(columnPath, enableDictionary);
+      return self();
+    }
+
+    /**
      * Enables validation for the constructed writer.
      *
      * @return this builder for method chaining.