blob: 764be540ad2a49528dd9f52dc15b519253b50afc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.tools.command;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
public class TestPruneColumnsCommand {
private final int numRecord = 1000;
private PruneColumnsCommand command = new PruneColumnsCommand();
private Configuration conf = new Configuration();
@Test
public void testPruneOneColumn() throws Exception {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
// Remove column
String cargs[] = {inputFile, outputFile, "Gender"};
executeCommandLine(cargs);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Links");
List<Type> subFields = fields.get(2).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
// Verify the data are not changed for the columns not pruned
List<String> prunePaths = Arrays.asList("Gender");
validateColumns(inputFile, prunePaths);
}
@Test
public void testPruneMultiColumns() throws Exception {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
// Remove columns
String cargs[] = {inputFile, outputFile, "Name", "Gender"};
executeCommandLine(cargs);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 2);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Links");
List<Type> subFields = fields.get(1).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
// Verify the data are not changed for the columns not pruned
List<String> prunePaths = Arrays.asList("Name", "Gender");
validateColumns(inputFile, prunePaths);
}
@Test
public void testNotExistsColumn() throws Exception {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
String cargs[] = {inputFile, outputFile, "no_exist"};
executeCommandLine(cargs);
}
@Test
public void testPruneNestedColumn() throws Exception {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
// Remove nested column
String cargs[] = {inputFile, outputFile, "Links.Backward"};
executeCommandLine(cargs);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 4);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Gender");
assertEquals(fields.get(3).getName(), "Links");
List<Type> subFields = fields.get(3).asGroupType().getFields();
assertEquals(subFields.size(), 1);
assertEquals(subFields.get(0).getName(), "Forward");
// Verify the data are not changed for the columns not pruned
List<String> prunePaths = Arrays.asList("Links.Backward");
validateColumns(inputFile, prunePaths);
}
@Test
public void testPruneNestedParentColumn() throws Exception {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
// Remove parent column. All of it's children will be removed.
String cargs[] = {inputFile, outputFile, "Links"};
executeCommandLine(cargs);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Gender");
// Verify the data are not changed for the columns not pruned
List<String> prunePaths = Arrays.asList("Links");
validateColumns(inputFile, prunePaths);
}
@Test
public void testNotExistsNestedColumn() throws Exception {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
String cargs[] = {inputFile, outputFile, "Links.Not_exists"};
executeCommandLine(cargs);
}
private void executeCommandLine(String[] cargs) throws Exception {
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(new Options(), cargs, command.supportsExtraArgs());
command.execute(cmd);
}
private void validateColumns(String inputFile, List<String> prunePaths) throws IOException {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)).withConf(conf).build();
for (int i = 0; i < numRecord; i++) {
Group group = reader.read();
if (!prunePaths.contains("DocId")) {
assertEquals(1l, group.getLong("DocId", 0));
}
if (!prunePaths.contains("Name")) {
assertEquals("foo", group.getBinary("Name", 0).toStringUsingUTF8());
}
if (!prunePaths.contains("Gender")) {
assertEquals("male", group.getBinary("Gender", 0).toStringUsingUTF8());
}
if (!prunePaths.contains("Links")) {
Group subGroup = group.getGroup("Links", 0);
if (!prunePaths.contains("Links.Backward")) {
assertEquals(2l, subGroup.getLong("Backward", 0));
}
if (!prunePaths.contains("Links.Forward")) {
assertEquals(3l, subGroup.getLong("Forward", 0));
}
}
}
reader.close();
}
private String createParquetFile(String prefix) throws IOException {
MessageType schema = new MessageType("schema",
new PrimitiveType(REQUIRED, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(REQUIRED, BINARY, "Gender"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, INT64, "Backward"),
new PrimitiveType(REPEATED, INT64, "Forward")));
conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
String file = createTempFile(prefix);
ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)).withConf(conf);
try (ParquetWriter writer = builder.build()) {
for (int i = 0; i < numRecord; i++) {
SimpleGroup g = new SimpleGroup(schema);
g.add("DocId", 1l);
g.add("Name", "foo");
g.add("Gender", "male");
Group links = g.addGroup("Links");
links.add(0, 2l);
links.add(1, 3l);
writer.write(g);
}
}
return file;
}
private static String createTempFile(String prefix) {
try {
return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
} catch (IOException e) {
throw new AssertionError("Unable to create temporary file", e);
}
}
}