PARQUET-1915: Add nullify column (#819)

diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index 3e95ab6..f29ff13 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -28,6 +28,7 @@
 import org.apache.parquet.cli.commands.CSVSchemaCommand;
 import org.apache.parquet.cli.commands.CatCommand;
 import org.apache.parquet.cli.commands.CheckParquet251Command;
+import org.apache.parquet.cli.commands.ColumnMaskingCommand;
 import org.apache.parquet.cli.commands.ColumnSizeCommand;
 import org.apache.parquet.cli.commands.ConvertCSVCommand;
 import org.apache.parquet.cli.commands.ConvertCommand;
@@ -46,6 +47,7 @@
 import org.apache.log4j.Level;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.parquet.cli.commands.TransCompressionCommand;
+import org.apache.parquet.hadoop.util.ColumnMasker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Set;
@@ -93,6 +95,7 @@
     jc.addCommand("column-index", new ShowColumnIndexCommand(console));
     jc.addCommand("column-size", new ColumnSizeCommand(console));
     jc.addCommand("trans-compression", new TransCompressionCommand(console));
+    jc.addCommand("masking", new ColumnMaskingCommand(console));
   }
 
   @Override
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnMaskingCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnMaskingCommand.java
new file mode 100644
index 0000000..a0b37a1
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnMaskingCommand.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ColumnMasker;
+import org.apache.parquet.hadoop.util.ColumnMasker.MaskMode;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+@Parameters(commandDescription="Replace columns with masked values and write to a new Parquet file")
+public class ColumnMaskingCommand extends BaseCommand {
+
+  private ColumnMasker masker;
+
+  public ColumnMaskingCommand(Logger console) {
+    super(console);
+    masker = new ColumnMasker();
+  }
+
+  @Parameter(
+    names = {"-m", "--mode"},
+    description = "<mask mode: nullify>")
+  String mode;
+
+  @Parameter(
+    names = {"-i", "--input"},
+    description = "<input parquet file path>")
+  String input;
+
+  @Parameter(
+    names = {"-o", "--output"},
+    description = "<output parquet file path>")
+  String output;
+
+  @Parameter(
+    names = {"-c", "--columns"},
+    description = "<columns to be replaced with masked value>")
+  List<String> cols;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(mode != null && (mode.equals("nullify")),
+      "mask mode cannot be null and can be only nullify");
+
+    Preconditions.checkArgument(input != null && output != null,
+      "Both input and output parquet file paths are required.");
+
+    Preconditions.checkArgument(cols != null && cols.size() > 0,
+      "columns cannot be null or empty");
+
+    MaskMode maskMode = MaskMode.fromString(mode);
+    Path inPath = new Path(input);
+    Path outPath = new Path(output);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), HadoopReadOptions.builder(getConf()).build())) {
+      masker.processBlocks(reader, writer, metaData, schema, cols, maskMode);
+    } finally {
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Replace columns with masked values and write to a new Parquet file",
+        " -m nullify -i input.parquet -o output.parquet -c col1_name"
+    );
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
index f85bfcc..b0a1fa8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column;
 
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.io.api.Binary;
 
 /**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index f45e10d..fce085f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -35,6 +35,7 @@
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.parquet.bytes.BytesInput;
 
 /**
  * Base implementation for {@link ColumnWriter} to be extended to specialize for V1 and V2 pages.
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
index 3ba36a5..19cdc57 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
@@ -38,6 +38,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ParquetProperties;
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 134f3da..addfdf1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -53,10 +53,12 @@
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
+@InterfaceAudience.Private
+public class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
   private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class);
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index a1017ca..98699cf 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -113,8 +113,9 @@
     OVERWRITE
   }
 
+  protected final PositionOutputStream out;
+
   private final MessageType schema;
-  private final PositionOutputStream out;
   private final AlignmentStrategy alignment;
   private final int columnIndexTruncateLength;
 
@@ -1008,6 +1009,43 @@
     endBlock();
   }
 
+  /**
+   * @param descriptor the descriptor for the target column
+   * @param from a file stream to read from
+   * @param chunk the column chunk to be copied
+   * @param bloomFilter the bloomFilter for this chunk
+   * @param columnIndex the column index for this chunk
+   * @param offsetIndex the offset index for this chunk
+   * @throws IOException
+   */
+  public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream from, ColumnChunkMetaData chunk,
+                                BloomFilter bloomFilter, ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException {
+    long start = chunk.getStartingPos();
+    long length = chunk.getTotalSize();
+    long newChunkStart = out.getPos();
+
+    copy(from, out, start, length);
+
+    currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+    currentColumnIndexes.add(columnIndex);
+    currentOffsetIndexes.add(offsetIndex);
+
+    currentBlock.addColumn(ColumnChunkMetaData.get(
+      chunk.getPath(),
+      chunk.getPrimitiveType(),
+      chunk.getCodec(),
+      chunk.getEncodingStats(),
+      chunk.getEncodings(),
+      chunk.getStatistics(),
+      newChunkStart,
+      newChunkStart,
+      chunk.getValueCount(),
+      chunk.getTotalSize(),
+      chunk.getTotalUncompressedSize()));
+
+    currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
+  }
+
   // Buffers for the copy function.
   private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
new file mode 100644
index 0000000..278e3a7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ColumnMasker {
+  /**
+   *
+   * @param reader Reader of source file
+   * @param writer Writer of destination file
+   * @param meta Metadata of source file
+   * @param schema Schema of source file
+   * @param paths Column Paths need to be masked
+   * @param maskMode Mode to mask
+   * @throws IOException
+   */
+  public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths, MaskMode maskMode) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockIndex = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockIndex).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema,
+        meta.getFileMetaData().getCreatedBy());
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, crStore, reader, writer, schema, nullifyColumns, maskMode);
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockIndex++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, ColumnReadStoreImpl crStore,
+                            TransParquetFileReader reader, ParquetFileWriter writer, MessageType schema,
+                            Set<ColumnPath> paths, MaskMode maskMode) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+
+    if (paths.contains(chunk.getPath())) {
+      if (maskMode.equals(MaskMode.NULLIFY)) {
+        Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
+        if (repetition.equals(Type.Repetition.REQUIRED)) {
+          throw new IOException("Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
+        }
+        nullifyColumn(descriptor, chunk, crStore, writer, schema);
+      } else {
+        throw new UnsupportedOperationException("Only nullify is supported for now");
+      }
+    } else {
+      BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+      ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+      OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+    }
+  }
+
+  private void nullifyColumn(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, ColumnReadStoreImpl crStore,
+                             ParquetFileWriter writer, MessageType schema) throws IOException {
+    long totalChunkValues = chunk.getValueCount();
+    int dMax = descriptor.getMaxDefinitionLevel();
+    ColumnReader cReader = crStore.getColumnReader(descriptor);
+
+    WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ? WriterVersion.PARQUET_2_0 : WriterVersion.PARQUET_1_0;
+    ParquetProperties props = ParquetProperties.builder()
+      .withWriterVersion(writerVersion)
+      .build();
+    CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
+    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(chunk.getCodec());
+    
+    // Create new schema that only has the current column
+    MessageType newSchema = newSchema(schema, descriptor);
+    ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength());
+    ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
+    ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
+
+    for (int i = 0; i < totalChunkValues; i++) {
+      int rlvl = cReader.getCurrentRepetitionLevel();
+      int dlvl = cReader.getCurrentDefinitionLevel();
+      if (dlvl == dMax) {
+        // since we checked ether optional or repeated, dlvl should be > 0
+        if (dlvl == 0) {
+          throw new IOException("definition level is detected to be 0 for column " + chunk.getPath().toDotString() + " to be nullified");
+        }
+        // we just write one null for the whole list at the top level, instead of nullify the elements in the list one by one
+        if (rlvl == 0) {
+          cWriter.writeNull(rlvl, dlvl - 1);
+        }
+      } else {
+        cWriter.writeNull(rlvl, dlvl);
+      }
+      cStore.endRecord();
+    }
+
+    cStore.flush();
+    cPageStore.flushToFileWriter(writer);
+
+    cStore.close();
+    cWriter.close();
+  }
+  
+  private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) {
+    String[] path = descriptor.getPath();
+    Type type = schema.getType(path);
+    if (path.length == 1) {
+      return new MessageType(schema.getName(), type);
+    }
+
+    for (Type field : schema.getFields()) {
+      if (!field.isPrimitive()) {
+        Type newType = extractField(field.asGroupType(), type);
+        if (newType != null) {
+          return new MessageType(schema.getName(), newType);
+        }
+      }
+    }
+
+    // We should never hit this because 'type' is returned by schema.getType().
+    throw new RuntimeException("No field is found");
+  }
+
+  private Type extractField(GroupType candidate, Type targetField) {
+    if (targetField.equals(candidate)) {
+      return targetField;
+    }
+
+    // In case 'type' is a descendants of candidate
+    for (Type field : candidate.asGroupType().getFields()) {
+      if (field.isPrimitive()) {
+        if (field.equals(targetField)) {
+          return new GroupType(candidate.getRepetition(), candidate.getName(), targetField);
+        }
+      } else {
+        Type tempField = extractField(field.asGroupType(), targetField);
+        if (tempField != null) {
+          return tempField;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public static Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+    Set<ColumnPath> prunePaths = new HashSet<>();
+    for (String col : cols) {
+      prunePaths.add(ColumnPath.fromDotString(col));
+    }
+    return prunePaths;
+  }
+
+  public enum MaskMode {
+    NULLIFY("nullify"),
+    HASH("hash"),
+    REDACT("redact");
+
+    private String mode;
+
+    MaskMode(String text) {
+      this.mode = text;
+    }
+
+    public String getMode() {
+      return this.mode;
+    }
+
+    public static MaskMode fromString(String mode) {
+      for (MaskMode b : MaskMode.values()) {
+        if (b.mode.equalsIgnoreCase(mode)) {
+          return b;
+        }
+      }
+      return null;
+    }
+  }
+
+  private static final class DummyGroupConverter extends GroupConverter {
+    @Override public void start() {}
+    @Override public void end() {}
+    @Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); }
+  }
+
+  private static final class DummyConverter extends PrimitiveConverter {
+    @Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); }
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
index 922699f..c77674d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
@@ -44,6 +44,7 @@
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
@@ -69,7 +70,7 @@
   }
 
   public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema,
-                             String createdBy, CompressionCodecName codecName) throws IOException {
+                            String createdBy, CompressionCodecName codecName) throws IOException {
     int blockIndex = 0;
     PageReadStore store = reader.readNextRowGroup();
     while (store != null) {
@@ -267,5 +268,9 @@
     public long getPos() throws IOException {
       return f.getPos();
     }
+
+    public SeekableInputStream getStream() {
+      return f;
+    }
   }
 }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java
new file mode 100644
index 0000000..41f2c1b
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertArrayEquals;
+
+public class ColumnMaskerTest {
+
+  private Configuration conf = new Configuration();
+  private Map<String, String> extraMeta = ImmutableMap.of("key1", "value1", "key2", "value2");
+  private ColumnMasker columnMasker = new ColumnMasker();
+  private Random rnd = new Random(5);
+  private final int numRecord = 1000;
+  private String inputFile = null;
+  private String outputFile = null;
+  private TestDocs testDocs = null;
+
+  @Before
+  public void testSetup() throws Exception {
+    testDocs = new TestDocs(numRecord);
+    inputFile = createParquetFile(conf, extraMeta, numRecord, "input", "GZIP",
+      ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE, testDocs);
+    outputFile = createTempFile("test");
+    nullifyColumns(conf, inputFile, outputFile);
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testNullColumns() throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+    Group group = reader.read();
+    group.getLong("DocId", 0);
+    reader.close();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testNullNestedColumns() throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+    Group group = reader.read();
+    Group subGroup = group.getGroup("Links", 0);
+    subGroup.getBinary("Backward", 0).getBytes();
+    reader.close();
+  }
+
+  @Test
+  public void validateNonNuLLColumns() throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+    for (int i = 0; i < numRecord; i++) {
+      Group group = reader.read();
+      assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes());
+      Group subGroup = group.getGroup("Links", 0);
+      assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes());
+    }
+    reader.close();
+  }
+
+  private void nullifyColumns(Configuration conf, String inputFile, String outputFile) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.OVERWRITE);
+    writer.start();
+
+    List<String> paths = new ArrayList<>();
+    paths.add("DocId");
+    paths.add("Gender");
+    paths.add("Links.Backward");
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      columnMasker.processBlocks(reader, writer, metaData, schema, paths, ColumnMasker.MaskMode.NULLIFY);
+    } finally {
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+  }
+
+  private String createParquetFile(Configuration conf, Map<String, String> extraMeta, int numRecord, String prefix, String codec,
+                                         ParquetProperties.WriterVersion writerVersion, int pageSize, TestDocs testDocs) throws IOException {
+    MessageType schema = new MessageType("schema",
+      new PrimitiveType(OPTIONAL, INT64, "DocId"),
+      new PrimitiveType(REQUIRED, BINARY, "Name"),
+      new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+      new GroupType(OPTIONAL, "Links",
+        new PrimitiveType(REPEATED, BINARY, "Backward"),
+        new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+    String file = createTempFile(prefix);
+    ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file))
+      .withConf(conf)
+      .withWriterVersion(writerVersion)
+      .withExtraMetaData(extraMeta)
+      .withDictionaryEncoding("DocId", true)
+      .withDictionaryEncoding("Name", true)
+      .withValidation(true)
+      .enablePageWriteChecksum()
+      .withPageSize(pageSize)
+      .withCompressionCodec(CompressionCodecName.valueOf(codec));
+    try (ParquetWriter writer = builder.build()) {
+      for (int i = 0; i < numRecord; i++) {
+        SimpleGroup g = new SimpleGroup(schema);
+        g.add("DocId", testDocs.docId[i]);
+        g.add("Name", testDocs.name[i]);
+        g.add("Gender", testDocs.gender[i]);
+        Group links = g.addGroup("Links");
+        links.add(0, testDocs.linkBackward[i]);
+        links.add(1, testDocs.linkForward[i]);
+        writer.write(g);
+      }
+    }
+
+    return file;
+  }
+
+  private static long getLong() {
+    return ThreadLocalRandom.current().nextLong(1000);
+  }
+
+  private String getString() {
+    char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 100; i++) {
+      sb.append(chars[rnd.nextInt(10)]);
+    }
+    return sb.toString();
+  }
+
+  private String createTempFile(String prefix) {
+    try {
+      return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  private  class TestDocs {
+    public long[] docId;
+    public String[] name;
+    public String[] gender;
+    public String[] linkBackward;
+    public String[] linkForward;
+
+    public TestDocs(int numRecord) {
+      docId = new long[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        docId[i] = getLong();
+      }
+
+      name = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        name[i] = getString();
+      }
+
+      gender = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        gender[i] = getString();
+      }
+
+      linkBackward = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        linkBackward[i] = getString();
+      }
+
+      linkForward = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        linkForward[i] = getString();
+      }
+    }
+  }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnMaskingCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnMaskingCommand.java
new file mode 100644
index 0000000..aba0dc7
--- /dev/null
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnMaskingCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ColumnMasker;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import org.apache.parquet.hadoop.util.ColumnMasker.MaskMode;
+
+public class ColumnMaskingCommand extends ArgsOnlyCommand {
+
+  private static final int MAX_COL_NUM = 100;
+
+  public static final String[] USAGE = new String[] {
+    "<mask_mode> <input> <output> [<column> ...]",
+
+    "where <mask_mode> is mask mode: nullify, hash, redact" +
+    "    <input> is the source parquet file",
+    "    <output> is the destination parquet file," +
+    "    [<column> ...] are the columns in the case sensitive dot format"
+  };
+
+  private Configuration conf;
+  private ColumnMasker masker;
+
+  public ColumnMaskingCommand() {
+    super(4, MAX_COL_NUM + 3);
+    this.conf = new Configuration();
+    masker = new ColumnMasker();
+  }
+
+  public ColumnMaskingCommand(Configuration conf) {
+    super(4, MAX_COL_NUM + 3);
+    this.conf = conf;
+    masker = new ColumnMasker();
+  }
+
+  @Override
+  public String[] getUsageDescription() {
+    return USAGE;
+  }
+
+  @Override
+  public String getCommandDescription() {
+    return "Replace columns in a given Parquet file with masked values and write to a new Parquet file.";
+  }
+
+  @Override
+  public void execute(CommandLine options) throws Exception {
+    super.execute(options);
+    List<String> args = options.getArgList();
+    MaskMode mode = MaskMode.fromString(args.get(0));
+    Path inPath = new Path(args.get(1));
+    Path outPath = new Path(args.get(2));
+    List<String> cols = args.subList(3, args.size());
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      masker.processBlocks(reader, writer, metaData, schema, cols, mode);
+    } finally {
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+  }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
index bb45174..2378f37 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
@@ -48,7 +48,7 @@
 
     "where <input> is the source parquet file",
     "    <output> is the destination parquet file," +
-    "    [<column> ...] are the columns in the case senstive dot format" +
+    "    [<column> ...] are the columns in the case sensitive dot format" +
       " to be pruned, for example a.b.c"
   };
 
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
index f521b0c..abbb1bd 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
@@ -38,6 +38,7 @@
     registry.put("prune", PruneColumnsCommand.class);
     registry.put("column-size", ColumnSizeCommand.class);
     registry.put("trans-compression", TransCompressionCommand.class);
+    registry.put("masking", ColumnMaskingCommand.class);
   }
 
   public static Map<String,Command> allCommands() {