PARQUET-1915: Add nullify column (#819)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index 3e95ab6..f29ff13 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -28,6 +28,7 @@
import org.apache.parquet.cli.commands.CSVSchemaCommand;
import org.apache.parquet.cli.commands.CatCommand;
import org.apache.parquet.cli.commands.CheckParquet251Command;
+import org.apache.parquet.cli.commands.ColumnMaskingCommand;
import org.apache.parquet.cli.commands.ColumnSizeCommand;
import org.apache.parquet.cli.commands.ConvertCSVCommand;
import org.apache.parquet.cli.commands.ConvertCommand;
@@ -46,6 +47,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.PropertyConfigurator;
import org.apache.parquet.cli.commands.TransCompressionCommand;
+import org.apache.parquet.hadoop.util.ColumnMasker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
@@ -93,6 +95,7 @@
jc.addCommand("column-index", new ShowColumnIndexCommand(console));
jc.addCommand("column-size", new ColumnSizeCommand(console));
jc.addCommand("trans-compression", new TransCompressionCommand(console));
+ jc.addCommand("masking", new ColumnMaskingCommand(console));
}
@Override
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnMaskingCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnMaskingCommand.java
new file mode 100644
index 0000000..a0b37a1
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnMaskingCommand.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ColumnMasker;
+import org.apache.parquet.hadoop.util.ColumnMasker.MaskMode;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+@Parameters(commandDescription="Replace columns with masked values and write to a new Parquet file")
+public class ColumnMaskingCommand extends BaseCommand {
+
+ private ColumnMasker masker;
+
+ public ColumnMaskingCommand(Logger console) {
+ super(console);
+ masker = new ColumnMasker();
+ }
+
+ @Parameter(
+ names = {"-m", "--mode"},
+ description = "<mask mode: nullify>")
+ String mode;
+
+ @Parameter(
+ names = {"-i", "--input"},
+ description = "<input parquet file path>")
+ String input;
+
+ @Parameter(
+ names = {"-o", "--output"},
+ description = "<output parquet file path>")
+ String output;
+
+ @Parameter(
+ names = {"-c", "--columns"},
+ description = "<columns to be replaced with masked value>")
+ List<String> cols;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(mode != null && (mode.equals("nullify")),
+ "mask mode cannot be null and can be only nullify");
+
+ Preconditions.checkArgument(input != null && output != null,
+ "Both input and output parquet file paths are required.");
+
+ Preconditions.checkArgument(cols != null && cols.size() > 0,
+ "columns cannot be null or empty");
+
+ MaskMode maskMode = MaskMode.fromString(mode);
+ Path inPath = new Path(input);
+ Path outPath = new Path(output);
+
+ ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, NO_FILTER);
+ MessageType schema = metaData.getFileMetaData().getSchema();
+ ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE);
+ writer.start();
+
+ try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), HadoopReadOptions.builder(getConf()).build())) {
+ masker.processBlocks(reader, writer, metaData, schema, cols, maskMode);
+ } finally {
+ writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+ }
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Replace columns with masked values and write to a new Parquet file",
+ " -m nullify -i input.parquet -o output.parquet -c col1_name"
+ );
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
index f85bfcc..b0a1fa8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.column;
+import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.io.api.Binary;
/**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index f45e10d..fce085f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -35,6 +35,7 @@
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.parquet.bytes.BytesInput;
/**
* Base implementation for {@link ColumnWriter} to be extended to specialize for V1 and V2 pages.
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
index 3ba36a5..19cdc57 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
@@ -38,6 +38,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ParquetProperties;
import org.junit.Assert;
import org.junit.Test;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 134f3da..addfdf1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -53,10 +53,12 @@
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
+@InterfaceAudience.Private
+public class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class);
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index a1017ca..98699cf 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -113,8 +113,9 @@
OVERWRITE
}
+ protected final PositionOutputStream out;
+
private final MessageType schema;
- private final PositionOutputStream out;
private final AlignmentStrategy alignment;
private final int columnIndexTruncateLength;
@@ -1008,6 +1009,43 @@
endBlock();
}
+ /**
+ * @param descriptor the descriptor for the target column
+ * @param from a file stream to read from
+ * @param chunk the column chunk to be copied
+ * @param bloomFilter the bloomFilter for this chunk
+ * @param columnIndex the column index for this chunk
+ * @param offsetIndex the offset index for this chunk
+ * @throws IOException
+ */
+ public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream from, ColumnChunkMetaData chunk,
+ BloomFilter bloomFilter, ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException {
+ long start = chunk.getStartingPos();
+ long length = chunk.getTotalSize();
+ long newChunkStart = out.getPos();
+
+ copy(from, out, start, length);
+
+ currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+ currentColumnIndexes.add(columnIndex);
+ currentOffsetIndexes.add(offsetIndex);
+
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ chunk.getPath(),
+ chunk.getPrimitiveType(),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ newChunkStart,
+ newChunkStart,
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize()));
+
+ currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
+ }
+
// Buffers for the copy function.
private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
new file mode 100644
index 0000000..278e3a7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ColumnMasker {
+ /**
+ *
+ * @param reader Reader of source file
+ * @param writer Writer of destination file
+ * @param meta Metadata of source file
+ * @param schema Schema of source file
+ * @param paths Column Paths need to be masked
+ * @param maskMode Mode to mask
+ * @throws IOException
+ */
+ public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+ MessageType schema, List<String> paths, MaskMode maskMode) throws IOException {
+ Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+ int blockIndex = 0;
+ PageReadStore store = reader.readNextRowGroup();
+
+ while (store != null) {
+ writer.startBlock(store.getRowCount());
+ List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockIndex).getColumns();
+ Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+ Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema,
+ meta.getFileMetaData().getCreatedBy());
+
+ for (int i = 0; i < columnsInOrder.size(); i += 1) {
+ ColumnChunkMetaData chunk = columnsInOrder.get(i);
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ processChunk(descriptor, chunk, crStore, reader, writer, schema, nullifyColumns, maskMode);
+ }
+
+ writer.endBlock();
+ store = reader.readNextRowGroup();
+ blockIndex++;
+ }
+ }
+
+ private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, ColumnReadStoreImpl crStore,
+ TransParquetFileReader reader, ParquetFileWriter writer, MessageType schema,
+ Set<ColumnPath> paths, MaskMode maskMode) throws IOException {
+ reader.setStreamPosition(chunk.getStartingPos());
+
+ if (paths.contains(chunk.getPath())) {
+ if (maskMode.equals(MaskMode.NULLIFY)) {
+ Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
+ if (repetition.equals(Type.Repetition.REQUIRED)) {
+ throw new IOException("Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
+ }
+ nullifyColumn(descriptor, chunk, crStore, writer, schema);
+ } else {
+ throw new UnsupportedOperationException("Only nullify is supported for now");
+ }
+ } else {
+ BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+ ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+ OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+ writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+ }
+ }
+
+ private void nullifyColumn(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, ColumnReadStoreImpl crStore,
+ ParquetFileWriter writer, MessageType schema) throws IOException {
+ long totalChunkValues = chunk.getValueCount();
+ int dMax = descriptor.getMaxDefinitionLevel();
+ ColumnReader cReader = crStore.getColumnReader(descriptor);
+
+ WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ? WriterVersion.PARQUET_2_0 : WriterVersion.PARQUET_1_0;
+ ParquetProperties props = ParquetProperties.builder()
+ .withWriterVersion(writerVersion)
+ .build();
+ CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
+ CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(chunk.getCodec());
+
+ // Create new schema that only has the current column
+ MessageType newSchema = newSchema(schema, descriptor);
+ ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength());
+ ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
+ ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
+
+ for (int i = 0; i < totalChunkValues; i++) {
+ int rlvl = cReader.getCurrentRepetitionLevel();
+ int dlvl = cReader.getCurrentDefinitionLevel();
+ if (dlvl == dMax) {
+ // since we checked ether optional or repeated, dlvl should be > 0
+ if (dlvl == 0) {
+ throw new IOException("definition level is detected to be 0 for column " + chunk.getPath().toDotString() + " to be nullified");
+ }
+ // we just write one null for the whole list at the top level, instead of nullify the elements in the list one by one
+ if (rlvl == 0) {
+ cWriter.writeNull(rlvl, dlvl - 1);
+ }
+ } else {
+ cWriter.writeNull(rlvl, dlvl);
+ }
+ cStore.endRecord();
+ }
+
+ cStore.flush();
+ cPageStore.flushToFileWriter(writer);
+
+ cStore.close();
+ cWriter.close();
+ }
+
+ private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) {
+ String[] path = descriptor.getPath();
+ Type type = schema.getType(path);
+ if (path.length == 1) {
+ return new MessageType(schema.getName(), type);
+ }
+
+ for (Type field : schema.getFields()) {
+ if (!field.isPrimitive()) {
+ Type newType = extractField(field.asGroupType(), type);
+ if (newType != null) {
+ return new MessageType(schema.getName(), newType);
+ }
+ }
+ }
+
+ // We should never hit this because 'type' is returned by schema.getType().
+ throw new RuntimeException("No field is found");
+ }
+
+ private Type extractField(GroupType candidate, Type targetField) {
+ if (targetField.equals(candidate)) {
+ return targetField;
+ }
+
+ // In case 'type' is a descendants of candidate
+ for (Type field : candidate.asGroupType().getFields()) {
+ if (field.isPrimitive()) {
+ if (field.equals(targetField)) {
+ return new GroupType(candidate.getRepetition(), candidate.getName(), targetField);
+ }
+ } else {
+ Type tempField = extractField(field.asGroupType(), targetField);
+ if (tempField != null) {
+ return tempField;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public static Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+ Set<ColumnPath> prunePaths = new HashSet<>();
+ for (String col : cols) {
+ prunePaths.add(ColumnPath.fromDotString(col));
+ }
+ return prunePaths;
+ }
+
+ public enum MaskMode {
+ NULLIFY("nullify"),
+ HASH("hash"),
+ REDACT("redact");
+
+ private String mode;
+
+ MaskMode(String text) {
+ this.mode = text;
+ }
+
+ public String getMode() {
+ return this.mode;
+ }
+
+ public static MaskMode fromString(String mode) {
+ for (MaskMode b : MaskMode.values()) {
+ if (b.mode.equalsIgnoreCase(mode)) {
+ return b;
+ }
+ }
+ return null;
+ }
+ }
+
+ private static final class DummyGroupConverter extends GroupConverter {
+ @Override public void start() {}
+ @Override public void end() {}
+ @Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); }
+ }
+
+ private static final class DummyConverter extends PrimitiveConverter {
+ @Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); }
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
index 922699f..c77674d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
@@ -44,6 +44,7 @@
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
@@ -69,7 +70,7 @@
}
public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema,
- String createdBy, CompressionCodecName codecName) throws IOException {
+ String createdBy, CompressionCodecName codecName) throws IOException {
int blockIndex = 0;
PageReadStore store = reader.readNextRowGroup();
while (store != null) {
@@ -267,5 +268,9 @@
public long getPos() throws IOException {
return f.getPos();
}
+
+ public SeekableInputStream getStream() {
+ return f;
+ }
}
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java
new file mode 100644
index 0000000..41f2c1b
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnMaskerTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertArrayEquals;
+
+public class ColumnMaskerTest {
+
+ private Configuration conf = new Configuration();
+ private Map<String, String> extraMeta = ImmutableMap.of("key1", "value1", "key2", "value2");
+ private ColumnMasker columnMasker = new ColumnMasker();
+ private Random rnd = new Random(5);
+ private final int numRecord = 1000;
+ private String inputFile = null;
+ private String outputFile = null;
+ private TestDocs testDocs = null;
+
+ @Before
+ public void testSetup() throws Exception {
+ testDocs = new TestDocs(numRecord);
+ inputFile = createParquetFile(conf, extraMeta, numRecord, "input", "GZIP",
+ ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE, testDocs);
+ outputFile = createTempFile("test");
+ nullifyColumns(conf, inputFile, outputFile);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testNullColumns() throws IOException {
+ ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+ Group group = reader.read();
+ group.getLong("DocId", 0);
+ reader.close();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testNullNestedColumns() throws IOException {
+ ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+ Group group = reader.read();
+ Group subGroup = group.getGroup("Links", 0);
+ subGroup.getBinary("Backward", 0).getBytes();
+ reader.close();
+ }
+
+ @Test
+ public void validateNonNuLLColumns() throws IOException {
+ ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+ for (int i = 0; i < numRecord; i++) {
+ Group group = reader.read();
+ assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes());
+ Group subGroup = group.getGroup("Links", 0);
+ assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes());
+ }
+ reader.close();
+ }
+
+ private void nullifyColumns(Configuration conf, String inputFile, String outputFile) throws IOException {
+ Path inPath = new Path(inputFile);
+ Path outPath = new Path(outputFile);
+
+ ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+ MessageType schema = metaData.getFileMetaData().getSchema();
+ ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.OVERWRITE);
+ writer.start();
+
+ List<String> paths = new ArrayList<>();
+ paths.add("DocId");
+ paths.add("Gender");
+ paths.add("Links.Backward");
+ try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+ columnMasker.processBlocks(reader, writer, metaData, schema, paths, ColumnMasker.MaskMode.NULLIFY);
+ } finally {
+ writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+ }
+ }
+
+ private String createParquetFile(Configuration conf, Map<String, String> extraMeta, int numRecord, String prefix, String codec,
+ ParquetProperties.WriterVersion writerVersion, int pageSize, TestDocs testDocs) throws IOException {
+ MessageType schema = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+ conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+ String file = createTempFile(prefix);
+ ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file))
+ .withConf(conf)
+ .withWriterVersion(writerVersion)
+ .withExtraMetaData(extraMeta)
+ .withDictionaryEncoding("DocId", true)
+ .withDictionaryEncoding("Name", true)
+ .withValidation(true)
+ .enablePageWriteChecksum()
+ .withPageSize(pageSize)
+ .withCompressionCodec(CompressionCodecName.valueOf(codec));
+ try (ParquetWriter writer = builder.build()) {
+ for (int i = 0; i < numRecord; i++) {
+ SimpleGroup g = new SimpleGroup(schema);
+ g.add("DocId", testDocs.docId[i]);
+ g.add("Name", testDocs.name[i]);
+ g.add("Gender", testDocs.gender[i]);
+ Group links = g.addGroup("Links");
+ links.add(0, testDocs.linkBackward[i]);
+ links.add(1, testDocs.linkForward[i]);
+ writer.write(g);
+ }
+ }
+
+ return file;
+ }
+
+ private static long getLong() {
+ return ThreadLocalRandom.current().nextLong(1000);
+ }
+
+ private String getString() {
+ char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ sb.append(chars[rnd.nextInt(10)]);
+ }
+ return sb.toString();
+ }
+
+ private String createTempFile(String prefix) {
+ try {
+ return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ private class TestDocs {
+ public long[] docId;
+ public String[] name;
+ public String[] gender;
+ public String[] linkBackward;
+ public String[] linkForward;
+
+ public TestDocs(int numRecord) {
+ docId = new long[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ docId[i] = getLong();
+ }
+
+ name = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ name[i] = getString();
+ }
+
+ gender = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ gender[i] = getString();
+ }
+
+ linkBackward = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ linkBackward[i] = getString();
+ }
+
+ linkForward = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ linkForward[i] = getString();
+ }
+ }
+ }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnMaskingCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnMaskingCommand.java
new file mode 100644
index 0000000..aba0dc7
--- /dev/null
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnMaskingCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ColumnMasker;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import org.apache.parquet.hadoop.util.ColumnMasker.MaskMode;
+
+public class ColumnMaskingCommand extends ArgsOnlyCommand {
+
+ private static final int MAX_COL_NUM = 100;
+
+ public static final String[] USAGE = new String[] {
+ "<mask_mode> <input> <output> [<column> ...]",
+
+ "where <mask_mode> is mask mode: nullify, hash, redact" +
+ " <input> is the source parquet file",
+ " <output> is the destination parquet file," +
+ " [<column> ...] are the columns in the case sensitive dot format"
+ };
+
+ private Configuration conf;
+ private ColumnMasker masker;
+
+ public ColumnMaskingCommand() {
+ super(4, MAX_COL_NUM + 3);
+ this.conf = new Configuration();
+ masker = new ColumnMasker();
+ }
+
+ public ColumnMaskingCommand(Configuration conf) {
+ super(4, MAX_COL_NUM + 3);
+ this.conf = conf;
+ masker = new ColumnMasker();
+ }
+
+ @Override
+ public String[] getUsageDescription() {
+ return USAGE;
+ }
+
+ @Override
+ public String getCommandDescription() {
+ return "Replace columns in a given Parquet file with masked values and write to a new Parquet file.";
+ }
+
+ @Override
+ public void execute(CommandLine options) throws Exception {
+ super.execute(options);
+ List<String> args = options.getArgList();
+ MaskMode mode = MaskMode.fromString(args.get(0));
+ Path inPath = new Path(args.get(1));
+ Path outPath = new Path(args.get(2));
+ List<String> cols = args.subList(3, args.size());
+
+ ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+ MessageType schema = metaData.getFileMetaData().getSchema();
+ ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE);
+ writer.start();
+
+ try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+ masker.processBlocks(reader, writer, metaData, schema, cols, mode);
+ } finally {
+ writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+ }
+ }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
index bb45174..2378f37 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
@@ -48,7 +48,7 @@
"where <input> is the source parquet file",
" <output> is the destination parquet file," +
- " [<column> ...] are the columns in the case senstive dot format" +
+ " [<column> ...] are the columns in the case sensitive dot format" +
" to be pruned, for example a.b.c"
};
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
index f521b0c..abbb1bd 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
@@ -38,6 +38,7 @@
registry.put("prune", PruneColumnsCommand.class);
registry.put("column-size", ColumnSizeCommand.class);
registry.put("trans-compression", TransCompressionCommand.class);
+ registry.put("masking", ColumnMaskingCommand.class);
}
public static Map<String,Command> allCommands() {