blob: 03ed542bd609e36d50f2637a610827d60220a28f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
import org.apache.hudi.io.storage.HoodieAvroOrcReader;
import org.apache.hudi.util.Lazy;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
/**
* Helper class to read schema from data files and log files and to convert it between different formats.
*/
@ThreadSafe
public class TableSchemaResolver {
private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
private final HoodieTableMetaClient metaClient;
/**
* Signals whether suite of the meta-fields should have additional field designating
* operation particular record was added by. Note, that determining whether this meta-field
* should be appended to the schema requires reading out the actual schema of some data file,
* since it's ultimately the source of truth whether this field has to be represented in
* the schema
*/
private final Lazy<Boolean> hasOperationField;
/**
* NOTE: {@link HoodieCommitMetadata} could be of non-trivial size for large tables (in 100s of Mbs)
* and therefore we'd want to limit amount of throw-away work being performed while fetching
* commits' metadata
*
* Please check out corresponding methods to fetch commonly used instances of {@link HoodieCommitMetadata}:
* {@link #getLatestCommitMetadataWithValidSchema()},
* {@link #getLatestCommitMetadataWithValidSchema()},
* {@link #getCachedCommitMetadata(HoodieInstant)}
*/
private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieCommitMetadata>> commitMetadataCache;
private volatile HoodieInstant latestCommitWithValidSchema = null;
private volatile HoodieInstant latestCommitWithValidData = null;
public TableSchemaResolver(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
this.commitMetadataCache = Lazy.lazily(() -> new ConcurrentHashMap<>(2));
this.hasOperationField = Lazy.lazily(this::hasOperationField);
}
public Schema getTableAvroSchemaFromDataFile() {
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
}
/**
* Gets full schema (user + metadata) for a hoodie table in Avro format.
*
* @return Avro schema for this table
* @throws Exception
*/
public Schema getTableAvroSchema() throws Exception {
return getTableAvroSchema(metaClient.getTableConfig().populateMetaFields());
}
/**
* Gets schema for a hoodie table in Avro format, can choice if include metadata fields.
*
* @param includeMetadataFields choice if include metadata fields
* @return Avro schema for this table
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception {
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}
/**
* Fetches tables schema in Avro format as of the given instant
*
* @param timestamp as of which table's schema will be fetched
*/
public Schema getTableAvroSchema(String timestamp) throws Exception {
Option<HoodieInstant> instant = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp)
.lastInstant();
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant);
}
/**
* Fetches tables schema in Avro format as of the given instant
*
* @param instant as of which table's schema will be fetched
*/
public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception {
return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant));
}
/**
* Gets full schema (user + metadata) for a hoodie table in Parquet format.
*
* @return Parquet schema for the table
*/
public MessageType getTableParquetSchema() throws Exception {
return convertAvroSchemaToParquet(getTableAvroSchema(true));
}
/**
* Gets users data schema for a hoodie table in Parquet format.
*
* @return Parquet schema for the table
*/
public MessageType getTableParquetSchema(boolean includeMetadataField) throws Exception {
return convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField));
}
/**
* Gets users data schema for a hoodie table in Avro format.
*
* @return Avro user data schema
* @throws Exception
*
* @deprecated use {@link #getTableAvroSchema(boolean)} instead
*/
@Deprecated
public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
return getTableAvroSchema(false);
}
private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {
Schema schema =
(instantOpt.isPresent()
? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields)
: getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
.or(() ->
metaClient.getTableConfig().getTableCreateSchema()
.map(tableSchema ->
includeMetadataFields
? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get())
: tableSchema)
)
.orElseGet(() -> {
Schema schemaFromDataFile = getTableAvroSchemaFromDataFile();
return includeMetadataFields
? schemaFromDataFile
: HoodieAvroUtils.removeMetadataFields(schemaFromDataFile);
});
// TODO partition columns have to be appended in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
return metaClient.getTableConfig().getPartitionFields()
.map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
.orElse(schema);
}
return schema;
}
private Option<Schema> getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = getLatestCommitMetadataWithValidSchema();
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
Schema schema = new Schema.Parser().parse(schemaStr);
if (includeMetadataFields) {
schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get());
}
return Option.of(schema);
} else {
return Option.empty();
}
}
private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) {
try {
HoodieCommitMetadata metadata = getCachedCommitMetadata(instant);
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
if (StringUtils.isNullOrEmpty(existingSchemaStr)) {
return Option.empty();
}
Schema schema = new Schema.Parser().parse(existingSchemaStr);
if (includeMetadataFields) {
schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get());
}
return Option.of(schema);
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
}
/**
* Fetches the schema for a table from any the table's data files
*/
private MessageType getTableParquetSchemaFromDataFile() {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = getLatestCommitMetadataWithValidData();
try {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
case MERGE_ON_READ:
// For COW table, data could be written in either Parquet or Orc format currently;
// For MOR table, data could be written in either Parquet, Orc, Hfile or Delta-log format currently;
//
// Determine the file format based on the file name, and then extract schema from it.
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator();
return fetchSchemaFromFiles(filePaths);
} else {
throw new IllegalArgumentException("Could not find any data file written for commit, "
+ "so could not get schema for table " + metaClient.getBasePath());
}
default:
LOG.error("Unknown table type " + metaClient.getTableType());
throw new InvalidTableException(metaClient.getBasePath());
}
} catch (IOException e) {
throw new HoodieException("Failed to read data schema", e);
}
}
public static MessageType convertAvroSchemaToParquet(Schema schema, Configuration hadoopConf) {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(hadoopConf);
return avroSchemaConverter.convert(schema);
}
private Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
return avroSchemaConverter.convert(parquetSchema);
}
private MessageType convertAvroSchemaToParquet(Schema schema) {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
return avroSchemaConverter.convert(schema);
}
/**
* Returns table's latest Avro {@link Schema} iff table is non-empty (ie there's at least
* a single commit)
*
* This method differs from {@link #getTableAvroSchema(boolean)} in that it won't fallback
* to use table's schema used at creation
*/
public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception {
if (metaClient.isTimelineNonEmpty()) {
return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty()));
}
return Option.empty();
}
private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException {
LOG.info("Reading schema from " + parquetFilePath);
FileSystem fs = metaClient.getRawFs();
ParquetMetadata fileFooter =
ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER);
return fileFooter.getFileMetaData().getSchema();
}
private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException {
LOG.info("Reading schema from " + hFilePath);
FileSystem fs = metaClient.getRawFs();
CacheConfig cacheConfig = new CacheConfig(fs.getConf());
HoodieAvroHFileReader hFileReader = new HoodieAvroHFileReader(fs.getConf(), hFilePath, cacheConfig);
return convertAvroSchemaToParquet(hFileReader.getSchema());
}
private MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException {
LOG.info("Reading schema from " + orcFilePath);
FileSystem fs = metaClient.getRawFs();
HoodieAvroOrcReader orcReader = new HoodieAvroOrcReader(fs.getConf(), orcFilePath);
return convertAvroSchemaToParquet(orcReader.getSchema());
}
/**
* Read schema from a data file from the last compaction commit done.
*
* @deprecated please use {@link #getTableAvroSchema(HoodieInstant, boolean)} instead
*/
public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new Exception(
"Could not read schema from last compaction, no compaction commits found on path " + metaClient));
// Read from the compacted file wrote
HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class);
String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().stream().findAny()
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction "
+ lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath()));
return readSchemaFromBaseFile(filePath);
}
private MessageType readSchemaFromLogFile(Path path) throws IOException {
return readSchemaFromLogFile(metaClient.getRawFs(), path);
}
/**
* Read the schema from the log file on path.
*
* @return
*/
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null)) {
HoodieDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieDataBlock) {
lastBlock = (HoodieDataBlock) block;
}
}
return lastBlock != null ? new AvroSchemaConverter().convert(lastBlock.getSchema()) : null;
}
}
/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return InternalSchema for this table
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
}
/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return InternalSchema for this table
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(String timestamp) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp);
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
}
/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return InternalSchema for this table
*/
private Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) {
try {
HoodieCommitMetadata metadata = getCachedCommitMetadata(instant);
String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
if (latestInternalSchemaStr != null) {
return SerDeHelper.fromJson(latestInternalSchemaStr);
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
}
/**
* Gets the history schemas as String for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return history schemas string for this table
*/
public Option<String> getTableHistorySchemaStrFromCommitMetadata() {
// now we only support FileBaseInternalSchemaManager
FileBasedInternalSchemaStorageManager manager = new FileBasedInternalSchemaStorageManager(metaClient);
String result = manager.getHistorySchemaStr();
return result.isEmpty() ? Option.empty() : Option.of(result);
}
/**
* NOTE: This method could only be used in tests
*
* @VisibleForTesting
*/
public boolean hasOperationField() {
try {
Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null;
} catch (Exception e) {
LOG.info(String.format("Failed to read operation field from avro schema (%s)", e.getMessage()));
return false;
}
}
private Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLatestCommitMetadataWithValidSchema() {
if (latestCommitWithValidSchema == null) {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata =
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema();
if (instantAndCommitMetadata.isPresent()) {
HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight();
synchronized (this) {
if (latestCommitWithValidSchema == null) {
latestCommitWithValidSchema = instant;
}
commitMetadataCache.get().putIfAbsent(instant, metadata);
}
}
}
return Option.ofNullable(latestCommitWithValidSchema)
.map(instant -> Pair.of(instant, commitMetadataCache.get().get(instant)));
}
private Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLatestCommitMetadataWithValidData() {
if (latestCommitWithValidData == null) {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata =
metaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
if (instantAndCommitMetadata.isPresent()) {
HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight();
synchronized (this) {
if (latestCommitWithValidData == null) {
latestCommitWithValidData = instant;
}
commitMetadataCache.get().putIfAbsent(instant, metadata);
}
}
}
return Option.ofNullable(latestCommitWithValidData)
.map(instant -> Pair.of(instant, commitMetadataCache.get().get(instant)));
}
private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) {
return commitMetadataCache.get()
.computeIfAbsent(instant, (missingInstant) -> {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(missingInstant).get();
try {
return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to fetch HoodieCommitMetadata for instant (%s)", missingInstant), e);
}
});
}
private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws IOException {
MessageType type = null;
while (filePaths.hasNext() && type == null) {
String filePath = filePaths.next();
if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
// this is a log file
type = readSchemaFromLogFile(new Path(filePath));
} else {
type = readSchemaFromBaseFile(filePath);
}
}
return type;
}
private MessageType readSchemaFromBaseFile(String filePath) throws IOException {
if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) {
return readSchemaFromParquetBaseFile(new Path(filePath));
} else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) {
return readSchemaFromHFileBaseFile(new Path(filePath));
} else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) {
return readSchemaFromORCBaseFile(new Path(filePath));
} else {
throw new IllegalArgumentException("Unknown base file format :" + filePath);
}
}
public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) {
// In cases when {@link DROP_PARTITION_COLUMNS} config is set true, partition columns
// won't be persisted w/in the data files, and therefore we need to append such columns
// when schema is parsed from data files
//
// Here we append partition columns with {@code StringType} as the data type
if (!partitionFields.isPresent() || partitionFields.get().length == 0) {
return dataSchema;
}
boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
boolean hasPartitionColInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
throw new HoodieIncompatibleSchemaException("Partition columns could not be partially contained w/in the data schema");
}
if (hasPartitionColNotInSchema) {
// when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns
// are not in originSchema. So we create and add them.
List<Field> newFields = new ArrayList<>();
for (String partitionField: partitionFields.get()) {
newFields.add(new Schema.Field(
partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
}
return appendFieldsToSchema(dataSchema, newFields);
}
return dataSchema;
}
}