blob: 643d675dee09acaf63cd937b39d0079d8b2b891f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.iceberg.writer;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.AutoCloseableHiveLock;
import org.apache.gobblin.hive.HiveLock;
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
/**
* This writer is used to calculate iceberg metadata from GMCE and register to iceberg
* iceberg metadata here includes:
* 1. Data files that contained by the table and the metrics of the data files
* 2. Properties of the table (origin table properties, data offset range, high watermark of the GMCE and schema created time)
* 3. Latest schema of the table
*/
@Slf4j
public class IcebergMetadataWriter implements MetadataWriter {
// Critical when there's dataset-level ACL enforced for both data and Iceberg metadata
public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location";
public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s";
public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST = "iceberg.registration.audit.count.blacklist";
private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST = "iceberg.registration.audit.count.whitelist";
private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
private static final String ADDED_FILES_CACHE_EXPIRING_TIME = "added.files.cache.expiring.time";
private static final int DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME = 1;
private static final String OFFSET_RANGE_KEY_PREFIX = "offset.range.";
private static final String OFFSET_RANGE_KEY_FORMAT = OFFSET_RANGE_KEY_PREFIX + "%s";
private static final String DEFAULT_CREATION_TIME = "0";
private static final String SNAPSHOT_EXPIRE_THREADS = "snapshot.expire.threads";
private static final long DEFAULT_WATERMARK = -1L;
/* one of the fields in DataFile entry to describe the location URI of a data file with FS Scheme */
private static final String ICEBERG_FILE_PATH_COLUMN = DataFile.FILE_PATH.name();
private final boolean completenessEnabled;
private final WhitelistBlacklist completenessWhitelistBlacklist;
private final String timeZone;
private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
private final String newPartitionColumn;
private final String newPartitionColumnType;
private final boolean newPartitionEnabled;
private final WhitelistBlacklist newPartitionTableWhitelistBlacklist;
private Optional<KafkaAuditCountVerifier> auditCountVerifier;
private final String auditCheckGranularity;
protected final MetricContext metricContext;
protected EventSubmitter eventSubmitter;
private final WhitelistBlacklist whitelistBlacklist;
private final WhitelistBlacklist auditWhitelistBlacklist;
private final Closer closer = Closer.create();
// Mapping between table-id and currently processed watermark
private final Map<TableIdentifier, Long> tableCurrentWatermarkMap;
// Used to store the relationship between table and the gmce topicPartition
private final Map<TableIdentifier, String> tableTopicPartitionMap;
@Getter
private final KafkaSchemaRegistry schemaRegistry;
private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
@Setter
protected HiveCatalog catalog;
protected final Configuration conf;
protected final ReadWriteLock readWriteLock;
private final HiveLock locks;
private final boolean useDataLocationAsTableLocation;
private final ParallelRunner parallelRunner;
private FsPermission permission;
protected State state;
public IcebergMetadataWriter(State state) throws IOException {
this.state = state;
this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
conf = HadoopUtils.getConfFromState(state);
initializeCatalog();
tableTopicPartitionMap = new HashMap<>();
tableMetadataMap = new HashMap<>();
tableCurrentWatermarkMap = new HashMap<>();
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
metricContext = closer.register(
GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, tags));
this.eventSubmitter =
new EventSubmitter.Builder(this.metricContext, MetadataWriterKeys.METRICS_NAMESPACE_ICEBERG_WRITER).build();
this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
this.auditWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST, ""),
state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST, ""));
// Use rw-lock to make it thread-safe when flush and write(which is essentially aggregate & reading metadata),
// are called in separate threads.
readWriteLock = new ReentrantReadWriteLock();
this.locks = new HiveLock(state.getProperties());
parallelRunner = closer.register(new ParallelRunner(state.getPropAsInt(SNAPSHOT_EXPIRE_THREADS, 20),
FileSystem.get(HadoopUtils.getConfFromState(state))));
useDataLocationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
if (useDataLocationAsTableLocation) {
permission =
HadoopUtils.deserializeFsPermission(state, ICEBERG_METADATA_FILE_PERMISSION,
FsPermission.getDefault());
}
this.completenessEnabled = state.getPropAsBoolean(ICEBERG_COMPLETENESS_ENABLED, DEFAULT_ICEBERG_COMPLETENESS);
this.completenessWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_COMPLETENESS_WHITELIST, ""),
state.getProp(ICEBERG_COMPLETENESS_BLACKLIST, ""));
this.timeZone = state.getProp(TIME_ZONE_KEY, DEFAULT_TIME_ZONE);
this.HOURLY_DATEPARTITION_FORMAT = DateTimeFormatter.ofPattern(DATEPARTITION_FORMAT)
.withZone(ZoneId.of(this.timeZone));
this.auditCountVerifier = Optional.fromNullable(this.completenessEnabled ? new KafkaAuditCountVerifier(state) : null);
this.newPartitionColumn = state.getProp(NEW_PARTITION_KEY, DEFAULT_NEW_PARTITION);
this.newPartitionColumnType = state.getProp(NEW_PARTITION_TYPE_KEY, DEFAULT_PARTITION_COLUMN_TYPE);
this.newPartitionEnabled = state.getPropAsBoolean(ICEBERG_NEW_PARTITION_ENABLED, DEFAULT_ICEBERG_NEW_PARTITION_ENABLED);
this.newPartitionTableWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_NEW_PARTITION_WHITELIST, ""),
state.getProp(ICEBERG_NEW_PARTITION_BLACKLIST, ""));
this.auditCheckGranularity = state.getProp(AUDIT_CHECK_GRANULARITY, DEFAULT_AUDIT_CHECK_GRANULARITY);
}
@VisibleForTesting
protected void setAuditCountVerifier(KafkaAuditCountVerifier verifier) {
this.auditCountVerifier = Optional.of(verifier);
}
protected void initializeCatalog() {
catalog = HiveCatalogs.loadCatalog(conf);
}
private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws NoSuchTableException {
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
if (!tableMetadata.table.isPresent()) {
tableMetadata.table = Optional.of(catalog.loadTable(tid));
}
return tableMetadata.table.get();
}
/**
* The method is used to get current watermark of the gmce topic partition for a table, and persist the value
* in the {@link #tableMetadataMap} as a side effect.
*
* Make the watermark config name contains topicPartition in case we change the gmce topic name for some reason
*/
private Long getAndPersistCurrentWatermark(TableIdentifier tid, String topicPartition) {
if (tableCurrentWatermarkMap.containsKey(tid)) {
return tableCurrentWatermarkMap.get(tid);
}
org.apache.iceberg.Table icebergTable;
Long currentWatermark = DEFAULT_WATERMARK;
try {
icebergTable = getIcebergTable(tid);
} catch (NoSuchTableException e) {
return currentWatermark;
}
currentWatermark =
icebergTable.properties().containsKey(String.format(GMCE_HIGH_WATERMARK_KEY, topicPartition)) ? Long.parseLong(
icebergTable.properties().get(String.format(GMCE_HIGH_WATERMARK_KEY, topicPartition))) : DEFAULT_WATERMARK;
return currentWatermark;
}
/**
* The write method will be responsible for processing gmce and aggregating the metadata.
* The logic of this function will be:
* 1. Check whether a table exists, if not then create a iceberg table
* - If completeness is enabled, Add new parititon column to
* table {#NEW_PARTITION_KEY}
* 2. Compute schema from the gmce and update the cache for candidate schemas
* 3. Do the required operation of the gmce, i.e. addFile, rewriteFile, dropFile or change_property.
*
* Note: this method only aggregates the metadata in cache without committing,
* while the actual commit will be done in flush method (except rewrite and drop methods where preserving older file
* information increases the memory footprints, therefore we would like to flush them eagerly).
*/
public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
Table table;
try {
table = getIcebergTable(tid);
} catch (NoSuchTableException e) {
try {
if (gmce.getOperationType() == OperationType.drop_files ||
gmce.getOperationType() == OperationType.change_property) {
log.warn("Table {} does not exist, skip processing this {} event", tid.toString(), gmce.getOperationType());
return;
}
table = createTable(gmce, tableSpec);
tableMetadata.table = Optional.of(table);
} catch (Exception e1) {
log.error("skip processing {} for table {}.{} due to error when creating table", gmce.toString(),
tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
log.debug(e1.toString());
return;
}
}
if(tableMetadata.completenessEnabled) {
tableMetadata.completionWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
}
computeCandidateSchema(gmce, tid, tableSpec);
tableMetadata.ensureTxnInit();
tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
switch (gmce.getOperationType()) {
case add_files: {
updateTableProperty(tableSpec, tid);
addFiles(gmce, newSpecsMap, table, tableMetadata);
if (gmce.getAuditCountMap() != null && auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
tableMetadata.serializedAuditCountMaps.add(gmce.getAuditCountMap());
}
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
break;
}
case rewrite_files: {
updateTableProperty(tableSpec, tid);
rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
break;
}
case drop_files: {
dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
break;
}
case change_property: {
updateTableProperty(tableSpec, tid);
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
log.info("No file operation need to be performed by Iceberg Metadata Writer at this point.");
break;
}
default: {
log.error("unsupported operation {}", gmce.getOperationType().toString());
return;
}
}
}
private HashMap<String, List<Range>> getLastOffset(TableMetadata tableMetadata) {
HashMap<String, List<Range>> offsets = new HashMap<>();
if (tableMetadata.lastProperties.isPresent()) {
for (Map.Entry<String, String> entry : tableMetadata.lastProperties.get().entrySet()) {
if (entry.getKey().startsWith(OFFSET_RANGE_KEY_PREFIX)) {
List<Range> ranges = Arrays.asList(entry.getValue().split(ConfigurationKeys.LIST_DELIMITER_KEY))
.stream()
.map(s -> {
List<String> rangePair = Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(s);
return Range.openClosed(Long.parseLong(rangePair.get(0)), Long.parseLong(rangePair.get(1)));})
.collect(Collectors.toList());
offsets.put(entry.getKey().substring(OFFSET_RANGE_KEY_PREFIX.length()), ranges);
}
}
}
return offsets;
}
/**
* The side effect of this method is to update the offset-range of the table identified by
* the given {@link TableIdentifier} with the input {@link GobblinMetadataChangeEvent}
*/
private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier tid) {
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
tableMetadata.dataOffsetRange = Optional.of(tableMetadata.dataOffsetRange.or(() -> getLastOffset(tableMetadata)));
Map<String, List<Range>> offsets = tableMetadata.dataOffsetRange.get();
for (Map.Entry<String, String> entry : gmce.getTopicPartitionOffsetsRange().entrySet()) {
List<String> rangePair = Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(entry.getValue());
Range range = Range.openClosed(Long.parseLong(rangePair.get(0)), Long.parseLong(rangePair.get(1)));
if (range.lowerEndpoint().equals(range.upperEndpoint())) {
//Ignore this case
continue;
}
List<Range> existRanges = offsets.getOrDefault(entry.getKey(), new ArrayList<>());
List<Range> newRanges = new ArrayList<>();
for (Range r : existRanges) {
if (range.isConnected(r)) {
range = range.span(r);
} else {
newRanges.add(r);
}
}
newRanges.add(range);
Collections.sort(newRanges, new Comparator<Range>() {
@Override
public int compare(Range o1, Range o2) {
return o1.lowerEndpoint().compareTo(o2.lowerEndpoint());
}
});
offsets.put(entry.getKey(), newRanges);
}
}
private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
String nativeName = tableMetadata.datasetName;
String topic = nativeName.substring(nativeName.lastIndexOf("/") + 1);
tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topic);
}
/**
* Compute the candidate schema from the gmce.
* If the schema source is schemaRegistry, we will use the schema creation time as the schema version to compute candidate schema and determine latest schema
* If the schema does not contain creation time, we will treat it the same as when schema source is event
* If the schema source is event, we will put it as default creation time, during flush, if we only have one candidate with default creation time,
* we'll use that to update schema.
* @param gmce
* @param tid
*/
private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdentifier tid, HiveSpec spec) {
Table table = getIcebergTable(tid);
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
org.apache.hadoop.hive.metastore.api.Table hiveTable = HiveMetaStoreUtils.getTable(spec.getTable());
tableMetadata.lastProperties = Optional.of(tableMetadata.lastProperties.or(() -> table.properties()));
Map<String, String> props = tableMetadata.lastProperties.get();
tableMetadata.lastSchemaVersion = Optional.of(
tableMetadata.lastSchemaVersion.or(() -> props.getOrDefault(SCHEMA_CREATION_TIME_KEY, DEFAULT_CREATION_TIME)));
String lastSchemaVersion = tableMetadata.lastSchemaVersion.get();
tableMetadata.candidateSchemas = Optional.of(tableMetadata.candidateSchemas.or(() -> CacheBuilder.newBuilder()
.expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME,
MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()));
Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
try {
switch (gmce.getSchemaSource()) {
case SCHEMAREGISTRY: {
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(gmce.getTableSchema());
String createdOn = AvroUtils.getSchemaCreationTime(schema);
if (createdOn == null) {
candidate.put(DEFAULT_CREATION_TIME,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
} else if (!createdOn.equals(lastSchemaVersion)) {
candidate.put(createdOn, IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
}
break;
}
case EVENT: {
candidate.put(DEFAULT_CREATION_TIME,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
break;
}
case NONE: {
log.debug("Schema source set to be none, will ignore the schema");
break;
}
default: {
throw new IOException(String.format("unsupported schema source %s", gmce.getSchemaSource()));
}
}
} catch (Exception e) {
log.error("Cannot get candidate schema from event due to", e);
}
}
/**
* Add a partition column to the schema and partition spec
* @param table incoming iceberg table
* @param fieldName name of partition column
* @param type datatype of partition column
* @return table with updated schema and partition spec
*/
private Table addPartitionToIcebergTable(Table table, String fieldName, String type) {
if(!table.schema().columns().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
table.updateSchema().addColumn(fieldName, Types.fromPrimitiveString(type)).commit();
}
if(!table.spec().fields().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
table.updateSpec().addField(fieldName).commit();
}
table.refresh();
return table;
}
protected Table createTable(GobblinMetadataChangeEvent gmce, HiveSpec spec) throws IOException {
String schema = gmce.getTableSchema();
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(spec.getTable());
IcebergUtils.IcebergDataAndPartitionSchema schemas = IcebergUtils.getIcebergSchema(schema, table);
TableIdentifier tid = TableIdentifier.of(table.getDbName(), table.getTableName());
Schema tableSchema = schemas.tableSchema;
Preconditions.checkState(tableSchema != null, "Table schema cannot be null when creating a table");
PartitionSpec partitionSpec = IcebergUtils.getPartitionSpec(tableSchema, schemas.partitionSchema);
Table icebergTable = null;
String tableLocation = null;
if (useDataLocationAsTableLocation) {
tableLocation = gmce.getDatasetIdentifier().getNativeName() + String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
//Set the path permission
Path tablePath = new Path(tableLocation);
WriterUtils.mkdirsWithRecursivePermission(tablePath.getFileSystem(conf), tablePath, permission);
}
try (Timer.Context context = metricContext.timer(CREATE_TABLE_TIME).time()) {
icebergTable =
catalog.createTable(tid, tableSchema, partitionSpec, tableLocation, IcebergUtils.getTableProperties(table));
log.info("Created table {}, schema: {} partition spec: {}", tid, tableSchema, partitionSpec);
} catch (AlreadyExistsException e) {
log.warn("table {} already exist, there may be some other process try to create table concurrently", tid);
}
return icebergTable;
}
protected void rewriteFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, Table table, TableMetadata tableMetadata) throws IOException {
PartitionSpec partitionSpec = table.spec();
tableMetadata.ensureTxnInit();
Set<DataFile> newDataFiles = new HashSet<>();
getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata)
.forEach(dataFile -> {
newDataFiles.add(dataFile);
tableMetadata.addedFiles.put(dataFile.path(), "");
});
Set<DataFile> oldDataFiles = getIcebergDataFilesToBeDeleted(gmce, table, newSpecsMap, oldSpecsMap, partitionSpec);
// Dealing with the case when old file doesn't exist, in which it could either be converted into noop or AppendFile.
if (oldDataFiles.isEmpty() && !newDataFiles.isEmpty()) {
//We randomly check whether one of the new data files already exists in the db to avoid reprocessing re-write events
DataFile dataFile = newDataFiles.iterator().next();
Expression exp = Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, dataFile.path().toString());
if (FindFiles.in(table).withMetadataMatching(exp).collect().iterator().hasNext()) {
//This means this re-write event is duplicated with the one we already handled, so noop.
return;
}
// This is the case when the files to be deleted do not exist in table
// So we directly call addFiles interface to add new files into the table.
// Note that this AppendFiles won't be committed here, in contrast to a real rewrite operation
// where the commit will be called at once to save memory footprints.
AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
newDataFiles.forEach(appendFiles::appendFile);
return;
}
tableMetadata.transaction.get().newRewrite().rewriteFiles(oldDataFiles, newDataFiles).commit();
}
/**
* Given the GMCE, get the iceberg schema with the origin ID specified by data pipeline which
* is corresponding to the file metrics index.
* @param gmce GMCE emitted by data pipeline
* @return iceberg schema with the origin ID
*/
private Schema getSchemaWithOriginId(GobblinMetadataChangeEvent gmce) {
Schema schemaWithOriginId = null;
if (gmce.getAvroSchemaWithIcebergSchemaID() != null) {
org.apache.iceberg.shaded.org.apache.avro.Schema avroSchema =
new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(gmce.getAvroSchemaWithIcebergSchemaID());
schemaWithOriginId = AvroSchemaUtil.toIceberg(avroSchema);
}
return schemaWithOriginId;
}
/**
* Deal with both regular file deletions manifested by GMCE(aggregation but no commit),
* and expiring older snapshots(commit).
*/
protected void dropFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> oldSpecsMap, Table table,
TableMetadata tableMetadata, TableIdentifier tid) throws IOException {
PartitionSpec partitionSpec = table.spec();
// Update DeleteFiles in tableMetadata: This is regular file deletion
DeleteFiles deleteFiles = tableMetadata.getOrInitDeleteFiles();
Set<DataFile> oldDataFiles =
getIcebergDataFilesToBeDeleted(gmce, table, new HashMap<>(), oldSpecsMap, partitionSpec);
oldDataFiles.forEach(deleteFiles::deleteFile);
// Update ExpireSnapshots and commit the updates at once: This is for expiring snapshots that are
// beyond look-back allowance for time-travel.
parallelRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
long olderThan = getExpireSnapshotTime();
long start = System.currentTimeMillis();
ExpireSnapshots expireSnapshots = table.expireSnapshots();
final Table tmpTable = table;
expireSnapshots.deleteWith(new Consumer<String>() {
@Override
public void accept(String file) {
if (file.startsWith(tmpTable.location())) {
tmpTable.io().deleteFile(file);
}
}
}).expireOlderThan(olderThan).commit();
//TODO: emit these metrics to Ingraphs, in addition to metrics for publishing new snapshots and other Iceberg metadata operations.
log.info("Spent {} ms to expire snapshots older than {} ({}) in table {}", System.currentTimeMillis() - start,
new DateTime(olderThan).toString(), olderThan, tid.toString());
} catch (Exception e) {
log.error(String.format("Fail to expire snapshots for table %s due to exception ", tid.toString()), e);
}
return null;
}
}, tid.toString());
}
private long getExpireSnapshotTime() {
PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendYears()
.appendSuffix("y")
.appendMonths()
.appendSuffix("M")
.appendDays()
.appendSuffix("d")
.appendHours()
.appendSuffix("h")
.appendMinutes()
.appendSuffix("m")
.toFormatter();
return DateTime.now()
.minus(periodFormatter.parsePeriod(
conf.get(EXPIRE_SNAPSHOTS_LOOKBACK_TIME, DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME)))
.getMillis();
}
protected void addFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, Table table,
TableMetadata tableMetadata) {
AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata)
.forEach(dataFile -> {
appendFiles.appendFile(dataFile);
tableMetadata.addedFiles.put(dataFile.path(), "");
});
}
private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table,
Map<String, Collection<HiveSpec>> newSpecsMap,
TableMetadata tableMetadata) {
return getIcebergDataFilesToBeAdded(table, tableMetadata, gmce, gmce.getNewFiles(), table.spec(), newSpecsMap,
IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gmce), table.schema())).stream()
.filter(dataFile -> tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null);
}
/**
* Method to get a {@link DataFile} collection without metrics information
* This method is used to get files to be deleted from iceberg
* If oldFilePrefixes is specified in gmce, this method will use those prefixes to find old file in iceberg,
* or the method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
*/
private Set<DataFile> getIcebergDataFilesToBeDeleted(GobblinMetadataChangeEvent gmce, Table table,
Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap,
PartitionSpec partitionSpec) throws IOException {
Set<DataFile> oldDataFiles = new HashSet<>();
if (gmce.getOldFilePrefixes() != null) {
Expression exp = Expressions.alwaysFalse();
for (String prefix : gmce.getOldFilePrefixes()) {
// Use both full path and raw path to filter old files
exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, prefix));
String rawPathPrefix = new Path(prefix).toUri().getRawPath();
exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, rawPathPrefix));
}
long start = System.currentTimeMillis();
oldDataFiles.addAll(Sets.newHashSet(FindFiles.in(table).withMetadataMatching(exp).collect().iterator()));
//Use INFO level log here to get better estimate.
//This shouldn't overwhelm the log since we receive limited number of rewrite_file gmces for one table in a day
log.info("Spent {}ms to query all old files in iceberg.", System.currentTimeMillis() - start);
} else {
for (String file : gmce.getOldFiles()) {
String specPath = new Path(file).getParent().toString();
// For the use case of recompaction, the old path may contains /daily path, in this case, we find the spec from newSpecsMap
StructLike partitionVal = getIcebergPartitionVal(
oldSpecsMap.containsKey(specPath) ? oldSpecsMap.get(specPath) : newSpecsMap.get(specPath), file,
partitionSpec);
oldDataFiles.add(IcebergUtils.getIcebergDataFileWithoutMetric(file, partitionSpec, partitionVal));
}
}
return oldDataFiles;
}
/**
* Method to get dataFiles with metrics information
* This method is used to get files to be added to iceberg
* if completeness is enabled a new field (late) is added to table schema and partition spec
* computed based on datepartition and completion watermark
* This method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
*/
private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce, List<org.apache.gobblin.metadata.DataFile> files,
PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap) {
Set<DataFile> dataFiles = new HashSet<>();
for (org.apache.gobblin.metadata.DataFile file : files) {
try {
Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new Path(file.getFilePath()).getParent().toString());
StructLike partition = getIcebergPartitionVal(hiveSpecs, file.getFilePath(), partitionSpec);
if(tableMetadata.newPartitionColumnEnabled && gmce.getOperationType() == OperationType.add_files) {
// Assumes first partition value to be partitioned by date
// TODO Find better way to determine a partition value
String datepartition = partition.get(0, null);
partition = addLatePartitionValueToIcebergTable(table, tableMetadata,
hiveSpecs.iterator().next().getPartition().get(), datepartition);
tableMetadata.datePartitions.add(getDateTimeFromDatepartitionString(datepartition));
}
dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, table.spec(), partition, conf, schemaIdMap));
} catch (Exception e) {
log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), e);
}
}
return dataFiles;
}
/**
* 1. Add "late" partition column to iceberg table if not exists
* 2. compute "late" partition value based on datepartition and completion watermark
* 3. Default to late=0 if completion watermark check is disabled
* @param table
* @param tableMetadata
* @param hivePartition
* @param datepartition
* @return new iceberg partition value for file
*/
private StructLike addLatePartitionValueToIcebergTable(Table table, TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) {
table = addPartitionToIcebergTable(table, newPartitionColumn, newPartitionColumnType);
PartitionSpec partitionSpec = table.spec();
int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, tableMetadata.completionWatermark);
List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
partitionValues.add(String.valueOf(late));
return IcebergUtils.getPartition(partitionSpec.partitionType(), partitionValues);
}
private int isLate(String datepartition, long previousWatermark) {
ZonedDateTime partitionDateTime = ZonedDateTime.parse(datepartition, HOURLY_DATEPARTITION_FORMAT);
long partitionEpochTime = partitionDateTime.toInstant().toEpochMilli();
if(partitionEpochTime >= previousWatermark) {
return 0;
} else if(partitionEpochTime < previousWatermark
&& partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark))) {
return 1;
} else {
return 2;
}
}
private LocalDate getDateFromEpochMillis(long epochMillis) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), ZoneId.of(timeZone)).toLocalDate();
}
private ZonedDateTime getDateTimeFromDatepartitionString(String datepartition) {
return ZonedDateTime.parse(datepartition, HOURLY_DATEPARTITION_FORMAT);
}
/**
* Obtain Iceberg partition value with a collection of {@link HiveSpec}.
* @param specs A collection of {@link HiveSpec}s.
* @param filePath URI of file, used for logging purpose in this method.
* @param partitionSpec The scheme of partition.
* @return The value of partition based on the given {@link PartitionSpec}.
* @throws IOException
*/
private StructLike getIcebergPartitionVal(Collection<HiveSpec> specs, String filePath, PartitionSpec partitionSpec)
throws IOException {
if (specs == null || specs.isEmpty()) {
throw new IOException("Cannot get hive spec for " + filePath);
}
HivePartition hivePartition = specs.iterator().next().getPartition().orNull();
StructLike partitionVal = hivePartition == null ? null
: IcebergUtils.getPartition(partitionSpec.partitionType(), hivePartition.getValues());
return partitionVal;
}
/**
* For flush of each table, we do the following logic:
* 1. Commit the appendFiles if it exist
* 2. Update the new table property: high watermark of GMCE, data offset range, schema versions
* 3. Update the schema
* 4. Commit the transaction
* 5. reset tableMetadata
* @param dbName
* @param tableName
*/
@Override
public void flush(String dbName, String tableName) throws IOException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new TableMetadata());
if (tableMetadata.transaction.isPresent()) {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps);
if (tableMetadata.completenessEnabled) {
checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
// Check and update completion watermark when there are no files to be registered, typically for quiet topics
// The logic is to check the next window from previous completion watermark and update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
log.info(String.format("Checking kafka audit for %s on change_property ", topic));
SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
ZonedDateTime prevWatermarkDT =
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
checkAndUpdateCompletenessWatermark(tableMetadata, topic, timestamps, props);
} else {
log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
tableMetadata.completionWatermark, topic));
}
}
//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
//Set low waterMark
props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicPartitionMap.get(tid)),
tableMetadata.lowWatermark.get().toString());
//Set whether to delete metadata files after commit
props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString(
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
//Set data offset range
boolean containOffsetRange = setDatasetOffsetRange(tableMetadata, props);
String topicName = tableName;
if (containOffsetRange) {
String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) {
transaction.commitTransaction();
}
// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
} else {
log.info("There's no transaction initiated for the table {}", tid.toString());
}
} catch (RuntimeException e) {
throw new RuntimeException(String.format("Fail to flush table %s %s", dbName, tableName), e);
} catch (Exception e) {
throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e);
} finally {
writeLock.unlock();
}
}
@Override
public void reset(String dbName, String tableName) throws IOException {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}
/**
* Update TableMetadata with the new completion watermark upon a successful audit check
* @param tableMetadata metadata of table
* @param topic topic name
* @param timestamps Sorted set in reverse order of timestamps to check audit counts for
* @param props table properties map
*/
private void checkAndUpdateCompletenessWatermark(TableMetadata tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
Map<String, String> props) {
if (topic == null) {
log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s",
TOPIC_NAME_KEY, tableMetadata.table.get().name()));
}
long newCompletenessWatermark =
computeCompletenessWatermark(topic, timestamps, tableMetadata.completionWatermark);
if (newCompletenessWatermark > tableMetadata.completionWatermark) {
log.info(String.format("Updating %s for %s to %s", COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
newCompletenessWatermark));
props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
tableMetadata.completionWatermark = newCompletenessWatermark;
}
}
/**
* NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit counts match
* for that window (aka its is set to the beginning of next window)
* For each timestamp in sorted collection of timestamps in descending order
* if timestamp is greater than previousWatermark
* and hour(now) > hour(prevWatermark)
* check audit counts for completeness between
* a source and reference tier for [timestamp -1 , timstamp unit of granularity]
* If the audit count matches update the watermark to the timestamp and break
* else continue
* else
* break
* Using a {@link TimeIterator} that operates over a range of time in 1 unit
* given the start, end and granularity
* @param table
* @param timestamps a sorted set of timestamps in decreasing order
* @param previousWatermark previous completion watermark for the table
* @return updated completion watermark
*/
private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
log.info(String.format("Compute completion watermark for %s and timestamps %s with previous watermark %s", table, timestamps, previousWatermark));
long completionWatermark = previousWatermark;
ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
try {
if(timestamps == null || timestamps.size() <= 0) {
log.error("Cannot create time iterator. Empty for null timestamps");
return previousWatermark;
}
TimeIterator.Granularity granularity = TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
.atZone(ZoneId.of(this.timeZone));
ZonedDateTime startDT = timestamps.first();
ZonedDateTime endDT = timestamps.last();
TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, true);
while (iterator.hasNext()) {
ZonedDateTime timestampDT = iterator.next();
if (timestampDT.isAfter(prevWatermarkDT)
&& TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
long timestampMillis = timestampDT.toInstant().toEpochMilli();
if (auditCountVerifier.get().isComplete(table,
TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
completionWatermark = timestampMillis;
break;
}
} else {
break;
}
}
} catch (IOException e) {
log.warn("Exception during audit count check: ", e);
}
return completionWatermark;
}
private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName,
String tableName, Map<String, String> props, Long highWaterMark) {
GobblinEventBuilder gobblinTrackingEvent =
new GobblinEventBuilder(MetadataWriterKeys.ICEBERG_COMMIT_EVENT_NAME);
long currentSnapshotID = snapshot.snapshotId();
long endToEndLag = System.currentTimeMillis() - tableMetadata.lowestGMCEEmittedTime;
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
String gmceTopicPartition = tableTopicPartitionMap.get(tid);
//Add information to automatically trigger repair jon when data loss happen
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.GMCE_TOPIC_NAME, gmceTopicPartition.split("-")[0]);
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.GMCE_TOPIC_PARTITION, gmceTopicPartition.split("-")[1]);
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.GMCE_HIGH_WATERMARK, highWaterMark.toString());
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.GMCE_LOW_WATERMARK,
tableMetadata.lowWatermark.get().toString());
//Add information for lag monitoring
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.LAG_KEY_NAME, Long.toString(endToEndLag));
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.SNAPSHOT_KEY_NAME, Long.toString(currentSnapshotID));
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.MANIFEST_LOCATION, snapshot.manifestListLocation());
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.SNAPSHOT_INFORMATION_KEY_NAME,
Joiner.on(",").withKeyValueSeparator("=").join(snapshot.summary()));
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.ICEBERG_TABLE_KEY_NAME, tableName);
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.ICEBERG_DATABASE_KEY_NAME, dbName);
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.DATASET_HDFS_PATH, tableMetadata.datasetName);
for (Map.Entry<String, String> entry : props.entrySet()) {
if (entry.getKey().startsWith(OFFSET_RANGE_KEY_PREFIX)) {
gobblinTrackingEvent.addMetadata(entry.getKey(), entry.getValue());
}
}
eventSubmitter.submit(gobblinTrackingEvent);
}
private boolean setDatasetOffsetRange(TableMetadata tableMetadata, Map<String, String> props) {
if (tableMetadata.dataOffsetRange.isPresent() && !tableMetadata.dataOffsetRange.get().isEmpty()) {
for (Map.Entry<String, List<Range>> offsets : tableMetadata.dataOffsetRange.get().entrySet()) {
List<Range> ranges = offsets.getValue();
String rangeString = ranges.stream()
.map(r -> Joiner.on(ConfigurationKeys.RANGE_DELIMITER_KEY).join(r.lowerEndpoint(), r.upperEndpoint()))
.collect(Collectors.joining(ConfigurationKeys.LIST_DELIMITER_KEY));
props.put(String.format(OFFSET_RANGE_KEY_FORMAT, offsets.getKey()), rangeString);
}
return true;
}
return false;
}
private void updateSchema(TableMetadata tableMetadata, Map<String, String> props, String topicName) {
//Set default schema versions
props.put(SCHEMA_CREATION_TIME_KEY, tableMetadata.lastSchemaVersion.or(DEFAULT_CREATION_TIME));
// Update Schema
try {
if (tableMetadata.candidateSchemas.isPresent() && tableMetadata.candidateSchemas.get().size() > 0) {
Cache candidates = tableMetadata.candidateSchemas.get();
//Only have default schema, so either we calculate schema from event or the schema does not have creation time, directly update it
if (candidates.size() == 1 && candidates.getIfPresent(DEFAULT_CREATION_TIME) != null) {
updateSchemaHelper(DEFAULT_CREATION_TIME, (Schema) candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
tableMetadata.table.get());
} else {
//update schema if candidates contains the schema that has the same creation time with the latest schema
org.apache.avro.Schema latestSchema =
(org.apache.avro.Schema) schemaRegistry.getLatestSchemaByTopic(topicName);
String creationTime = AvroUtils.getSchemaCreationTime(latestSchema);
if (creationTime == null) {
log.warn(
"Schema from schema registry does not contain creation time, check config for schema registry class");
} else if (candidates.getIfPresent(creationTime) != null) {
updateSchemaHelper(creationTime, (Schema) candidates.getIfPresent(creationTime), props,
tableMetadata.table.get());
}
}
}
} catch (SchemaRegistryException e) {
log.error("Cannot get schema form schema registry, will not update this schema", e);
}
}
private void updateSchemaHelper(String schemaCreationTime, Schema schema, Map<String, String> props, Table table) {
try {
table.updateSchema().unionByNameWith(schema).commit();
props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime);
} catch (Exception e) {
log.error("Cannot update schema to " + schema.toString() + "for table " + table.location(), e);
}
}
@Override
public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope, Map<String, Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
GenericRecord genericRecord = recordEnvelope.getRecord();
GobblinMetadataChangeEvent gmce =
(GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
String dbName = tableSpec.getTable().getDbName();
String tableName = tableSpec.getTable().getTableName();
if (whitelistBlacklist.acceptTable(dbName, tableName)) {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
String topicPartition = tableTopicPartitionMap.computeIfAbsent(tid,
t -> recordEnvelope.getWatermark().getSource());
Long currentWatermark = getAndPersistCurrentWatermark(tid, topicPartition);
Long currentOffset = ((LongWatermark)recordEnvelope.getWatermark().getWatermark()).getValue();
if (currentOffset > currentWatermark) {
if (!tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()).lowWatermark.isPresent()) {
//This means we haven't register this table or met some error before, we need to reset the low watermark
tableMetadataMap.get(tid).lowWatermark = Optional.of(currentOffset - 1);
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
if (this.newPartitionEnabled && this.newPartitionTableWhitelistBlacklist.acceptTable(dbName, tableName)) {
tableMetadataMap.get(tid).newPartitionColumnEnabled = true;
if (this.completenessEnabled && this.completenessWhitelistBlacklist.acceptTable(dbName, tableName)) {
tableMetadataMap.get(tid).completenessEnabled = true;
}
}
}
write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
tableCurrentWatermarkMap.put(tid, currentOffset);
} else {
log.warn(String.format("Skip processing record %s since it has lower watermark", genericRecord.toString()));
}
} else {
log.info(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName()));
}
} finally {
readLock.unlock();
}
}
/**
* Method to send audit counts given a topic name and a list of serialized audit count maps. Called only when new files
* are added. Default is no-op, must be implemented in a subclass.
*/
public void sendAuditCounts(String topicName, Collection<String> serializedAuditCountMaps) {
}
@Override
public void close() throws IOException {
this.closer.close();
}
/**
* A collection of Iceberg metadata including {@link Table} itself as well as
* A set of buffered objects (reflecting table's {@link org.apache.iceberg.PendingUpdate}s) within the flush interval
* that aggregates the metadata like location arriving / deleting files, schema,
* along with other table-level metadata like watermark, offset-range, etc.
*
* Also note the difference with {@link org.apache.iceberg.TableMetadata}.
*/
private class TableMetadata {
Optional<Table> table = Optional.absent();
/**
* The {@link Transaction} object holds the reference of a {@link org.apache.iceberg.BaseTransaction.TransactionTableOperations}
* that is shared by all individual operation (e.g. {@link AppendFiles}) to ensure atomicity even if commit method
* is invoked from a individual operation.
*/
Optional<Transaction> transaction = Optional.absent();
private Optional<AppendFiles> appendFiles = Optional.absent();
private Optional<DeleteFiles> deleteFiles = Optional.absent();
Optional<Map<String, String>> lastProperties = Optional.absent();
Optional<Map<String, String>> newProperties = Optional.absent();
Optional<Cache<String, Schema>> candidateSchemas = Optional.absent();
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());
List<String> serializedAuditCountMaps = new ArrayList<>();
@Setter
String datasetName;
boolean completenessEnabled;
boolean newPartitionColumnEnabled;
Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
.expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
TimeUnit.HOURS)
.build();
long lowestGMCEEmittedTime = Long.MAX_VALUE;
/**
* Always use this method to obtain {@link AppendFiles} object within flush interval
* if clients want to have the {@link AppendFiles} committed along with other updates in a txn.
*/
AppendFiles getOrInitAppendFiles() {
ensureTxnInit();
if (!this.appendFiles.isPresent()) {
this.appendFiles = Optional.of(this.transaction.get().newAppend());
}
return this.appendFiles.get();
}
DeleteFiles getOrInitDeleteFiles() {
ensureTxnInit();
if (!this.deleteFiles.isPresent()) {
this.deleteFiles = Optional.of(this.transaction.get().newDelete());
}
return this.deleteFiles.get();
}
/**
* Initializing {@link Transaction} object within {@link TableMetadata} when needed.
*/
void ensureTxnInit() {
if (!this.transaction.isPresent()) {
this.transaction = Optional.of(table.get().newTransaction());
}
}
void reset(Map<String, String> props, Long lowWaterMark) {
this.lastProperties = Optional.of(props);
this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
this.transaction = Optional.absent();
this.deleteFiles = Optional.absent();
this.appendFiles = Optional.absent();
// Clean cache and reset to eagerly release unreferenced objects.
if (this.candidateSchemas.isPresent()) {
this.candidateSchemas.get().cleanUp();
}
this.candidateSchemas = Optional.absent();
this.dataOffsetRange = Optional.absent();
this.newProperties = Optional.absent();
this.lowestGMCEEmittedTime = Long.MAX_VALUE;
this.lowWatermark = Optional.of(lowWaterMark);
this.datePartitions.clear();
this.serializedAuditCountMaps.clear();
}
}
}