blob: cc1897762a806586faac324044ba2939629d79ab [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.infra.tableflattener;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
public class SchemaFlattener {
// The dir to write the flat datasets to. The dir should either not exist or be
// empty. The URI can either point to a local dir or an HDFS dir.
URI outputDir_;
public SchemaFlattener(URI outputDir) { outputDir_ = outputDir; }
// Creates a flattened schema but does not migrate any data.
public FlattenedSchema flatten(Schema srcSchema) {
Preconditions.checkState(srcSchema.getType() == Type.RECORD);
FlattenedSchema dstDataset = new FlattenedSchema(srcSchema.getName());
LinkedList<Field> fields = Lists.newLinkedList();
addRecordFields(srcSchema, dstDataset, fields, "");
finishCreatingDataset(fields, dstDataset);
return dstDataset;
}
private void addRecordFields(Schema srcSchema, FlattenedSchema dstDataset,
LinkedList<Field> dstSchemaFields, String fieldNamePrefix) {
Preconditions.checkState(srcSchema.getType() == Type.RECORD);
for (Field field : srcSchema.getFields()) {
Schema fieldSchema = field.schema();
if (SchemaUtil.isSimpleType(fieldSchema)) {
dstSchemaFields.add(SchemaUtil.createField(fieldNamePrefix + field.name(),
fieldSchema, field.doc(), field.defaultValue()));
continue;
}
if (SchemaUtil.isNullable(fieldSchema)) {
dstSchemaFields.add(SchemaUtil.createField(
fieldNamePrefix + dstDataset.getIsNullFieldName(field.name()), Type.BOOLEAN));
fieldSchema = SchemaUtil.reduceUnionToNonNull(fieldSchema);
}
if (SchemaUtil.requiresChildDataset(fieldSchema)) {
createChildDataset(dstDataset.getChildOfRecordName(field.name()), fieldSchema,
dstSchemaFields, dstDataset);
} else {
addRecordFields(fieldSchema, dstDataset, dstSchemaFields,
fieldNamePrefix + field.name() + dstDataset.getNameSeparator());
}
}
}
private void createChildDataset(String name, Schema srcSchema,
LinkedList<Field> parentFields, FlattenedSchema parentDataset) {
// Ensure that the parent schema has an id field so the child can reference the
// parent. A single id field is sufficient.
if (parentFields.isEmpty()
|| !parentFields.getFirst().name().equals(parentDataset.getIdFieldName())) {
parentFields.addFirst(SchemaUtil.createField(
parentDataset.getIdFieldName(), Type.LONG));
}
FlattenedSchema childDataset = new FlattenedSchema(name, parentDataset);
LinkedList<Field> fields = Lists.newLinkedList();
String parentIdFieldName = parentDataset.getName() + childDataset.getNameSeparator()
+ childDataset.getIdFieldName();
Field parentIdField = SchemaUtil.createField(parentIdFieldName, Type.LONG);
childDataset.setParentIdField(parentIdField);
fields.add(parentIdField);
Schema valueSchema;
if (srcSchema.getType() == Type.ARRAY) {
fields.add(
SchemaUtil.createField(childDataset.getArrayIdxFieldName(), Type.LONG));
valueSchema = srcSchema.getElementType();
} else {
Preconditions.checkState(srcSchema.getType() == Type.MAP);
fields.add(
SchemaUtil.createField(childDataset.getMapKeyFieldName(), Type.STRING));
valueSchema = srcSchema.getValueType();
}
if (SchemaUtil.isSimpleType(valueSchema)) {
fields.add(SchemaUtil.createField(
childDataset.getCollectionValueFieldName(), valueSchema));
} else {
if (SchemaUtil.isNullable(valueSchema)) {
fields.add(SchemaUtil.createField(childDataset.getIsNullFieldName(
childDataset.getCollectionValueFieldName()), Type.BOOLEAN));
valueSchema = SchemaUtil.reduceUnionToNonNull(valueSchema);
}
if (SchemaUtil.requiresChildDataset(valueSchema)) {
createChildDataset(childDataset.getChildOfCollectionName(), valueSchema, fields,
childDataset);
} else {
addRecordFields(valueSchema, childDataset, fields,
childDataset.getCollectionValueFieldName() + childDataset.getNameSeparator());
}
}
finishCreatingDataset(fields, childDataset);
}
private void finishCreatingDataset(List<Field> fields, FlattenedSchema dataset) {
Schema childSchema = Schema.createRecord(dataset.getName(), null, null, false);
for (Field field : fields) {
Preconditions.checkState(!SchemaUtil.schemaHasNesting(field.schema()));
}
childSchema.setFields(fields);
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.format(Formats.PARQUET)
.schema(childSchema)
.build();
dataset.setDataset((Dataset<GenericRecord>)Datasets.create(
"dataset:" + createDir(dataset.getName()), descriptor));
}
private URI createDir(String name) {
try {
switch (outputDir_.getScheme().toUpperCase()) {
case "FILE": {
Path datasetPath = Paths.get(outputDir_).resolve(name);
datasetPath.toFile().mkdirs();
return datasetPath.toUri();
}
case "HDFS": {
org.apache.hadoop.fs.Path outputDirPath
= new org.apache.hadoop.fs.Path(outputDir_);
org.apache.hadoop.fs.Path datasetPath
= new org.apache.hadoop.fs.Path(outputDirPath, name);
outputDirPath.getFileSystem(new Configuration()).mkdirs(datasetPath);
return datasetPath.toUri();
}
default:
throw new NotImplementedException(String.format(
"Unexpected output dir scheme: %s", outputDir_.getScheme()));
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}