blob: b037cb6354fff332b8d380db749753a3d90b24bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.iceberg;
import azkaban.jobExecutor.AbstractJob;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.HiveSerDeManager;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.DataMetrics;
import org.apache.gobblin.metadata.DataOrigin;
import org.apache.gobblin.metadata.DatasetIdentifier;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.IntegerBytesPair;
import org.apache.gobblin.metadata.IntegerLongPair;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.PartitionedDataWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
import static org.apache.gobblin.iceberg.writer.GobblinMCEWriter.HIVE_PARTITION_NAME;
/**
* A class running along with data ingestion pipeline for emitting GobblinMCE (Gobblin Metadata Change Event
* that includes the information of the file metadata change, i.e., add or delete file, and the column min/max value of the added file.
* GMCE will be consumed by another metadata ingestion pipeline to register/de-register hive/iceberg metadata)
*
* This is an abstract class, we need a sub system like Kakfa, which support at least once delivery, to emit the event
*/
@Slf4j
public abstract class GobblinMCEProducer implements Closeable {
public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
public static final String GMCE_CLUSTER_NAME = "GobblinMCE.cluster.name";
public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
private static final String DATASET_ORIGIN_KEY = "dataset.origin";
private static final String DEFAULT_DATASET_ORIGIN = "PROD";
@Setter
protected State state;
protected MetricContext metricContext;
public GobblinMCEProducer(State state) {
this.state = state;
this.metricContext = Instrumented.getMetricContext(state, this.getClass());
}
public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
sendGMCE(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource, null);
}
/**
* This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
* the metadata ingestion pipeline can use the information to register metadata
* @param newFiles The map of new files' path and metrics
* @param oldFiles the list of old file to be dropped
* @param offsetRange offset range of the new data, can be null
* @param operationType The opcode of gmce emitted by this method.
* @param serializedAuditCountMap Audit count map to be used by {@link org.apache.gobblin.iceberg.writer.IcebergMetadataWriter} to track iceberg
* registration counts
* @throws IOException
*/
public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource, String serializedAuditCountMap) throws IOException {
GobblinMetadataChangeEvent gmce =
getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource, serializedAuditCountMap);
underlyingSendGMCE(gmce);
}
/**
* Use the producer to send GMCE, the implementation need to make sure the emitting is at-least once in-order delivery
* (i.e. use kafka producer to send event and config it to be at-least once delivery)
* @param gmce GMCE that contains information of the metadata change
*/
public abstract void underlyingSendGMCE(GobblinMetadataChangeEvent gmce);
private void setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceBuilder,
Map<String, String> offsetRange, SchemaSource schemaSource) {
String origin = state.getProp(DATASET_ORIGIN_KEY, DEFAULT_DATASET_ORIGIN);
gmceBuilder.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataPlatformUrn(HDFS_PLATFORM_URN)
.setDataOrigin(DataOrigin.valueOf(origin))
.setNativeName(state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR))
.build());
gmceBuilder.setCluster(state.getProp(GMCE_CLUSTER_NAME, ClustersNames.getInstance().getClusterName()));
//retention job does not have job.id
gmceBuilder.setFlowId(
state.getProp(AbstractJob.JOB_ID, new Configuration().get(ConfigurationKeys.AZKABAN_FLOW_ID)));
gmceBuilder.setRegistrationPolicy(state.getProp(ConfigurationKeys.HIVE_REGISTRATION_POLICY));
gmceBuilder.setSchemaSource(schemaSource);
gmceBuilder.setPartitionColumns(Lists.newArrayList(state.getProp(HIVE_PARTITION_NAME, "")));
if (offsetRange != null) {
gmceBuilder.setTopicPartitionOffsetsRange(offsetRange);
}
String schemaString = state.getProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA);
if (schemaString != null) {
gmceBuilder.setTableSchema(schemaString);
}
if (state.contains(GobblinMCEPublisher.AVRO_SCHEMA_WITH_ICEBERG_ID)) {
gmceBuilder.setAvroSchemaWithIcebergSchemaID(state.getProp(GobblinMCEPublisher.AVRO_SCHEMA_WITH_ICEBERG_ID));
}
if (state.contains(OLD_FILES_HIVE_REGISTRATION_KEY)) {
gmceBuilder.setRegistrationPolicyForOldData(state.getProp(OLD_FILES_HIVE_REGISTRATION_KEY));
} else {
log.warn(
"properties {} does not set, if it's for rewrite/drop operation, there may be trouble to get partition value for old data",
OLD_FILES_HIVE_REGISTRATION_KEY);
}
Map<String, String> regProperties = new HashMap<>();
if (state.contains(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME)) {
regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME));
}
if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) {
regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES));
}
if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES)) {
regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES,
state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES));
}
if (state.contains(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB)) {
regProperties.put(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB,
state.getProp(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB));
}
if (state.contains(HiveSerDeManager.HIVE_ROW_FORMAT)) {
regProperties.put(HiveSerDeManager.HIVE_ROW_FORMAT,
state.getProp(HiveSerDeManager.HIVE_ROW_FORMAT));
}
if (!regProperties.isEmpty()) {
gmceBuilder.setRegistrationProperties(regProperties);
}
}
public GobblinMetadataChangeEvent getGobblinMetadataChangeEvent(Map<Path, Metrics> newFiles, List<String> oldFiles,
List<String> oldFilePrefixes, Map<String, String> offsetRange, OperationType operationType,
SchemaSource schemaSource, String serializedAuditCountMap) {
if (!verifyInput(newFiles, oldFiles, oldFilePrefixes, operationType)) {
return null;
}
GobblinMetadataChangeEvent.Builder gmceBuilder = GobblinMetadataChangeEvent.newBuilder();
setBasicInformationForGMCE(gmceBuilder, offsetRange, schemaSource);
if (newFiles != null && !newFiles.isEmpty()) {
gmceBuilder.setNewFiles(toGobblinDataFileList(newFiles));
}
if (oldFiles != null && !oldFiles.isEmpty()) {
gmceBuilder.setOldFiles(oldFiles);
}
if (oldFilePrefixes != null && !oldFilePrefixes.isEmpty()) {
gmceBuilder.setOldFilePrefixes(oldFilePrefixes);
}
gmceBuilder.setOperationType(operationType);
gmceBuilder.setAuditCountMap(serializedAuditCountMap);
return gmceBuilder.build();
}
private boolean verifyInput(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
OperationType operationType) {
switch (operationType) {
case rewrite_files: {
if (newFiles == null || ((oldFiles == null || oldFiles.isEmpty()) && (oldFilePrefixes == null || oldFilePrefixes
.isEmpty())) || newFiles.isEmpty()) {
log.error("Rewrite files operation must contain newFiles to be added and oldFiles to be deleted");
return false;
}
break;
}
case add_files: {
if (newFiles == null || newFiles.isEmpty()) {
log.error("Add files operation must contain newFiles to be added");
return false;
}
break;
}
case drop_files: {
if ((oldFiles == null || oldFiles.isEmpty()) && (oldFilePrefixes == null || oldFilePrefixes.isEmpty())) {
log.error("Drop files operation must contain old files to be deleted");
return false;
}
break;
}
case change_property: {
if(oldFiles != null) {
log.warn("{} old files detected while no file alteration is performed", oldFiles.size());
}
log.info("Setting GMCE while no file changes need to be performed.");
break;
}
default: {
//unsupported operation
log.error("Unsupported operation type {}", operationType);
return false;
}
}
return true;
}
private List<DataFile> toGobblinDataFileList(Map<Path, Metrics> files) {
return Lists.newArrayList(Iterables.transform(files.entrySet(), file ->
{
DataFile.Builder builder = createBuilderWithFilePath(file.getKey());
addMetricsToFileBuilder(builder, file.getValue());
return builder.build();
}
));
}
private DataFile.Builder createBuilderWithFilePath(Path filePath) {
return DataFile.newBuilder()
.setFilePath(filePath.toString())
.setFileFormat(IcebergUtils.getIcebergFormat(state).toString());
}
private void addMetricsToFileBuilder(DataFile.Builder builder, Metrics metrics) {
// If metrics is null or empty, set FileMetrics a dummy one
if(metrics == null || metrics.recordCount() == null) {
builder.setFileMetrics(DataMetrics.newBuilder().setRecordCount(0)
.build());
return;
}
// If metrics is concrete, fill all fields
builder.setFileMetrics(DataMetrics.newBuilder()
.setRecordCount(metrics.recordCount())
.setColumnSizes(getIntegerLongPairsFromMap(metrics.columnSizes()))
.setValueCounts(getIntegerLongPairsFromMap(metrics.valueCounts()))
.setNullValueCounts(getIntegerLongPairsFromMap(metrics.nullValueCounts()))
.setLowerBounds(getIntegerBytesPairsFromMap(metrics.lowerBounds()))
.setUpperBounds(getIntegerBytesPairsFromMap(metrics.upperBounds()))
.build());
}
private List<IntegerLongPair> getIntegerLongPairsFromMap(Map<Integer, Long> map) {
if (map == null || map.size() == 0) {
return null;
}
Iterable<Map.Entry<Integer, Long>> entries = map.entrySet();
Iterable<IntegerLongPair> pairs =
Iterables.transform(entries, entry -> new IntegerLongPair(entry.getKey(), entry.getValue()));
return Lists.newArrayList(pairs);
}
private List<IntegerBytesPair> getIntegerBytesPairsFromMap(Map<Integer, ByteBuffer> map) {
if (map == null || map.size() == 0) {
return null;
}
Iterable<Map.Entry<Integer, ByteBuffer>> entries = map.entrySet();
Iterable<IntegerBytesPair> pairs =
Iterables.transform(entries, entry -> new IntegerBytesPair(entry.getKey(), entry.getValue()));
return Lists.newArrayList(pairs);
}
public static GobblinMCEProducer getGobblinMCEProducer(State state) {
return GobblinConstructorUtils.invokeConstructor(GobblinMCEProducer.class,
state.getProp(GMCE_PRODUCER_CLASS), state);
}
@Override
public void close() throws IOException {
//producer close will handle by the cache
this.metricContext.close();
}
}