blob: f9ada2fa829a81770ec4d0fcddf1b8ee3cb3ddff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.common;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
public abstract class AbstractSyncHoodieClient {
private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
protected final HoodieTableMetaClient metaClient;
protected final HoodieTableType tableType;
protected final FileSystem fs;
private String basePath;
private boolean assumeDatePartitioning;
private boolean useFileListingFromMetadata;
private boolean verifyMetadataFileListing;
public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
boolean verifyMetadataFileListing, FileSystem fs) {
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.tableType = metaClient.getTableType();
this.basePath = basePath;
this.assumeDatePartitioning = assumeDatePartitioning;
this.useFileListingFromMetadata = useFileListingFromMetadata;
this.verifyMetadataFileListing = verifyMetadataFileListing;
this.fs = fs;
}
/**
* Create the table.
* @param tableName The table name.
* @param storageSchema The table schema.
* @param inputFormatClass The input format class of this table.
* @param outputFormatClass The output format class of this table.
* @param serdeClass The serde class of this table.
* @param serdeProperties The serde properites of this table.
* @param tableProperties The table properties for this table.
*/
public abstract void createTable(String tableName, MessageType storageSchema,
String inputFormatClass, String outputFormatClass,
String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties);
public abstract boolean doesTableExist(String tableName);
public abstract Option<String> getLastCommitTimeSynced(String tableName);
public abstract void updateLastCommitTimeSynced(String tableName);
public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {}
public abstract Map<String, String> getTableSchema(String tableName);
public HoodieTableType getTableType() {
return tableType;
}
public String getBasePath() {
return metaClient.getBasePath();
}
public FileSystem getFs() {
return fs;
}
public void closeQuietly(ResultSet resultSet, Statement stmt) {
try {
if (stmt != null) {
stmt.close();
}
} catch (SQLException e) {
LOG.warn("Could not close the statement opened ", e);
}
try {
if (resultSet != null) {
resultSet.close();
}
} catch (SQLException e) {
LOG.warn("Could not close the resultset opened ", e);
}
}
/**
* Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
* present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
* not changed within a single atomic write.
*
* @return Parquet schema for this table
*/
public MessageType getDataSchema() {
try {
return new TableSchemaResolver(metaClient).getTableParquetSchema();
} catch (Exception e) {
throw new HoodieSyncException("Failed to read data schema", e);
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs);
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
return FSUtils.getAllPartitionPaths(engineContext, basePath, useFileListingFromMetadata, verifyMetadataFileListing,
assumeDatePartitioning);
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
}
}
/**
* Read the schema from the log file on path.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);
// Fall back to read the schema from last compaction
if (messageType == null) {
LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
return new TableSchemaResolver(this.metaClient).readSchemaFromLastCompaction(lastCompactionCommitOpt);
}
return messageType;
}
/**
* Partition Event captures any partition that needs to be added or updated.
*/
public static class PartitionEvent {
public enum PartitionEventType {
ADD, UPDATE
}
public PartitionEventType eventType;
public String storagePartition;
PartitionEvent(PartitionEventType eventType, String storagePartition) {
this.eventType = eventType;
this.storagePartition = storagePartition;
}
public static PartitionEvent newPartitionAddEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.ADD, storagePartition);
}
public static PartitionEvent newPartitionUpdateEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
}
}
}