blob: b6f1d0bdfa2d4bdafe1c8f9564c0ddeb1afeb9a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.common;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;
public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(HoodieSyncClient.class);
protected final HoodieSyncConfig config;
protected final PartitionValueExtractor partitionValueExtractor;
protected final HoodieTableMetaClient metaClient;
public HoodieSyncClient(HoodieSyncConfig config) {
this.config = config;
this.partitionValueExtractor = ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
this.metaClient = HoodieTableMetaClient.builder()
.setConf(config.getHadoopConf())
.setBasePath(config.getString(META_SYNC_BASE_PATH))
.setLoadActiveTimelineOnLoad(true)
.build();
}
public HoodieTimeline getActiveTimeline() {
return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
public HoodieTableType getTableType() {
return metaClient.getTableType();
}
public String getBasePath() {
return metaClient.getBasePathV2().toString();
}
public boolean isBootstrap() {
return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
}
/**
* Get the set of dropped partitions since the last synced commit.
* If last sync time is not known then consider only active timeline.
* Going through archive timeline is a costly operation, and it should be avoided unless some start time is given.
*/
public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
? TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get())
: metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}
@Override
public MessageType getStorageSchema() {
try {
return new TableSchemaResolver(metaClient).getTableParquetSchema();
} catch (Exception e) {
throw new HoodieSyncException("Failed to read schema from storage.", e);
}
}
@Override
public MessageType getStorageSchema(boolean includeMetadataField) {
try {
return new TableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
} catch (Exception e) {
throw new HoodieSyncException("Failed to read schema from storage.", e);
}
}
/**
* Gets all relative partitions paths in the Hudi table on storage.
*
* @return All relative partitions paths.
*/
public List<String> getAllPartitionPathsOnStorage() {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
return FSUtils.getAllPartitionPaths(engineContext,
config.getString(META_SYNC_BASE_PATH),
config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
}
public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in "
+ config.getString(META_SYNC_BASE_PATH)
+ ",FS :" + config.getHadoopFileSystem());
return getAllPartitionPathsOnStorage();
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getWrittenPartitions(
TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get()));
}
}
/**
* Gets the partition events for changed partitions.
* <p>
* This compares the list of all partitions of a table stored in the metastore and
* on the storage:
* (1) Partitions exist in the metastore, but NOT the storage: drops them in the metastore;
* (2) Partitions exist on the storage, but NOT the metastore: adds them to the metastore;
* (3) Partitions exist in both, but the partition path is different: update them in the metastore.
*
* @param allPartitionsInMetastore All partitions of a table stored in the metastore.
* @param allPartitionsOnStorage All partitions of a table stored on the storage.
* @return partition events for changed partitions.
*/
public List<PartitionEvent> getPartitionEvents(List<Partition> allPartitionsInMetastore,
List<String> allPartitionsOnStorage) {
Map<String, String> paths = getPartitionValuesToPathMapping(allPartitionsInMetastore);
Set<String> partitionsToDrop = new HashSet<>(paths.keySet());
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : allPartitionsOnStorage) {
Path storagePartitionPath = FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
// Remove partitions that exist on storage from the `partitionsToDrop` set,
// so the remaining partitions that exist in the metastore should be dropped
partitionsToDrop.remove(storageValue);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
partitionsToDrop.forEach(storageValue -> {
String storagePath = paths.get(storageValue);
try {
String relativePath = FSUtils.getRelativePartitionPath(
metaClient.getBasePathV2(), new CachingPath(storagePath));
events.add(PartitionEvent.newPartitionDropEvent(relativePath));
} catch (IllegalArgumentException e) {
LOG.error("Cannot parse the path stored in the metastore, ignoring it for "
+ "generating DROP partition event: \"" + storagePath + "\".", e);
}
});
return events;
}
/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
public List<PartitionEvent> getPartitionEvents(List<Partition> partitionsInMetastore,
List<String> writtenPartitionsOnStorage,
Set<String> droppedPartitionsOnStorage) {
Map<String, String> paths = getPartitionValuesToPathMapping(partitionsInMetastore);
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : writtenPartitionsOnStorage) {
Path storagePartitionPath = FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (droppedPartitionsOnStorage.contains(storagePartition)) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
}
return events;
}
/**
* Gets the partition values to the absolute path mapping based on the
* partition information from the metastore.
*
* @param partitionsInMetastore Partitions in the metastore.
* @return The partition values to the absolute path mapping.
*/
private Map<String, String> getPartitionValuesToPathMapping(List<Partition> partitionsInMetastore) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : partitionsInMetastore) {
List<String> hivePartitionValues = tablePartition.getValues();
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getStorageLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
}
return paths;
}
}