blob: 68643c50ae4ab92bb2ea8839b7eae34554c65e85 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.infra.tableflattener;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.kitesdk.data.CompressionType;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
public class Main {
Options cliOptions_;
DatasetDescriptor datasetDescr_;
// The dir to write the flat datasets to. The dir should either not exist or be
// empty. The URI can either point to a local dir or an HDFS dir.
URI outputDir_;
CommandLine commandLine_;
@SuppressWarnings("static-access")
void parseArgs(String[] args) throws ParseException, IOException {
cliOptions_ = new Options();
cliOptions_.addOption(OptionBuilder.withLongOpt("help").create("h"));
cliOptions_.addOption(OptionBuilder
.hasArg()
.withLongOpt("input-data-format")
.withDescription("The format of the input file. Ex, avro")
.create("f"));
cliOptions_.addOption(OptionBuilder
.hasArg()
.withLongOpt("input-data-compression")
.withDescription("The compression type of the input file. Ex, snappy")
.create("c"));
cliOptions_.addOption(OptionBuilder
.hasArg()
.withLongOpt("input-schema-uri")
.withDescription("The URI of the input file's schema. Ex, file://foo.avsc")
.create("s"));
CommandLineParser parser = new PosixParser();
commandLine_ = parser.parse(cliOptions_, args);
if (commandLine_.hasOption("h")) printHelp();
DatasetDescriptor.Builder datasetDescrBuilder = new DatasetDescriptor.Builder();
String[] dataArgs = commandLine_.getArgs();
if (dataArgs.length != 2) {
printHelp("Exactly two arguments are required");
}
URI dataFile = URI.create(dataArgs[0]);
outputDir_ = URI.create(dataArgs[1]);
datasetDescrBuilder.location(dataFile);
Format inputFormat;
if (commandLine_.hasOption("f")) {
inputFormat = Formats.fromString(commandLine_.getOptionValue("f"));
} else {
String dataFilePath = dataFile.getPath();
if (dataFilePath == null || dataFilePath.isEmpty()) {
printHelp("Data file URI is missing a path component: " + dataFile.toString());
}
String ext = FilenameUtils.getExtension(dataFilePath);
if (ext.isEmpty()) {
printHelp("The file format (-f) must be specified");
}
inputFormat = Formats.fromString(ext);
}
datasetDescrBuilder.format(inputFormat);
if (commandLine_.hasOption("c")) {
datasetDescrBuilder.compressionType(
CompressionType.forName(commandLine_.getOptionValue("c")));
}
if (commandLine_.hasOption("s")) {
datasetDescrBuilder.schemaUri(commandLine_.getOptionValue("s"));
} else if (inputFormat == Formats.AVRO) {
datasetDescrBuilder.schemaFromAvroDataFile(dataFile);
} else if (inputFormat == Formats.PARQUET) {
ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(
new Configuration(), new org.apache.hadoop.fs.Path(dataFile));
datasetDescrBuilder.schema(new AvroSchemaConverter().convert(
parquetMetadata.getFileMetaData().getSchema()));
} else {
printHelp("A schema (-s) is required for data format " + inputFormat.getName());
}
datasetDescr_ = datasetDescrBuilder.build();
}
void printHelp() { printHelp(""); }
void printHelp(String errorMessage) {
PrintWriter printer = new PrintWriter(
errorMessage.isEmpty() ? System.out : System.err);
if (!errorMessage.isEmpty()) printer.println("Error: " + errorMessage + "\n");
printer.println("Usage: [options] <input uri> <output uri>\n\n" +
"input uri The URI to the input file.\n" +
" Ex, file:///foo.avro or hdfs://localhost:20500/foo.avro\n" +
"output uri The URI to the output directory. The dir must either not\n" +
" exist or it must be empty.\n" +
" Ex, file:///bar or hdfs://localhost:20500/bar\n\n" +
"Options:");
new HelpFormatter().printOptions(printer, 80, cliOptions_, 1 , 3);
printer.close();
System.exit(errorMessage.isEmpty() ? 0 : 1);
}
void exec(String[] args) throws ParseException, IOException {
Logger.getRootLogger().setLevel(Level.OFF);
parseArgs(args);
SchemaFlattener schemaFlattener = new SchemaFlattener(outputDir_);
FlattenedSchema rootDataset =
schemaFlattener.flatten(datasetDescr_.getSchema());
Path tempDatasetPath = Files.createTempDirectory(null);
try {
Dataset<GenericRecord> srcDataset = Datasets.create(
"dataset:file:" + tempDatasetPath.toString(), datasetDescr_);
FileMigrator migrator = new FileMigrator();
migrator.migrate(srcDataset, rootDataset);
} finally {
FileUtils.deleteDirectory(tempDatasetPath.toFile());
}
}
public static void main(String[] args) throws Exception {
new Main().exec(args);
}
}