blob: cafb8167b6fc51cede85a192f92a7001786151f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Date;
import java.util.Properties;
/**
* Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
*/
public abstract class AbstractUpgradeDowngrade {
private static final Logger LOG = LogManager.getLogger(AbstractUpgradeDowngrade.class);
public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated";
private HoodieTableMetaClient metaClient;
protected HoodieWriteConfig config;
protected HoodieEngineContext context;
private transient FileSystem fs;
private Path updatedPropsFilePath;
private Path propsFilePath;
/**
* Perform Upgrade or Downgrade steps if required and updated table version if need be.
* <p>
* Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths.
*
* Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), and Hoodie version was upgraded to 0.6.0,
* Hoodie table version gets bumped to 1 and there are some upgrade steps need to be executed before doing any writes.
* Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie table version 1 and then hoodie was downgraded
* to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need to be executed before proceeding w/ any writes.
*
* On a high level, these are the steps performed
*
* Step1 : Understand current hoodie table version and table version from hoodie.properties file
* Step2 : Delete any left over .updated from previous upgrade/downgrade
* Step3 : If version are different, perform upgrade/downgrade.
* Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated
* Step6 : Rename hoodie.properties.updated to hoodie.properties
* </p>
*
* @param metaClient instance of {@link HoodieTableMetaClient} to use
* @param toVersion version to which upgrade or downgrade has to be done.
* @param config instance of {@link HoodieWriteConfig} to use.
* @param context instance of {@link HoodieEngineContext} to use.
* @param instantTime current instant time that should not be touched.
*/
public abstract void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config,
HoodieEngineContext context, String instantTime);
protected AbstractUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
this.metaClient = metaClient;
this.config = config;
this.context = context;
this.fs = metaClient.getFs();
this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE);
this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
}
protected void run(HoodieTableVersion toVersion, String instantTime) throws IOException {
// Fetch version from property file and current version
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
if (toVersion.versionCode() == fromVersion.versionCode()) {
return;
}
if (fs.exists(updatedPropsFilePath)) {
// this can be left over .updated file from a failed attempt before. Many cases exist here.
// a) We failed while writing the .updated file and it's content is partial (e.g hdfs)
// b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway
// c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file
// All cases, it simply suffices to delete the file and proceed.
LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
fs.delete(updatedPropsFilePath, false);
}
// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
if (fromVersion.versionCode() < toVersion.versionCode()) {
// upgrade
upgrade(fromVersion, toVersion, instantTime);
} else {
// downgrade
downgrade(fromVersion, toVersion, instantTime);
}
// Write out the current version in hoodie.properties.updated file
metaClient.getTableConfig().setTableVersion(toVersion);
createUpdatedFile(metaClient.getTableConfig().getProperties());
// because for different fs the fs.rename have different action,such as:
// a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
// b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
// c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
// so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
if (fs.exists(propsFilePath)) {
fs.delete(propsFilePath, false);
}
// Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
// But as long as this does not leave a partial hoodie.properties file, we are okay.
fs.rename(updatedPropsFilePath, propsFilePath);
}
private void createUpdatedFile(Properties props) throws IOException {
try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
}
}
protected abstract void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
protected abstract void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
}