blob: b4058b3051a8102071bc45beb0f91ff0e8ea1eaf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.hive.replication;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.testutils.HiveTestCluster;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
import static org.apache.hudi.hive.replication.GlobalHiveSyncConfig.META_SYNC_GLOBAL_REPLICATE_TIMESTAMP;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SITE_URI;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SITE_URI;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHiveSyncGlobalCommitTool {
@RegisterExtension
public static HiveTestCluster localCluster = new HiveTestCluster();
@RegisterExtension
public static HiveTestCluster remoteCluster = new HiveTestCluster();
private static final String DB_NAME = "foo";
private static final String TBL_NAME = "bar";
private HiveSyncGlobalCommitParams getGlobalCommitConfig(String commitTime) throws Exception {
HiveSyncGlobalCommitParams params = new HiveSyncGlobalCommitParams();
params.loadedProps.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation());
params.loadedProps.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation());
params.loadedProps.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl());
params.loadedProps.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl());
params.loadedProps.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(DB_NAME, TBL_NAME));
params.loadedProps.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(DB_NAME, TBL_NAME));
params.loadedProps.setProperty(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP.key(), commitTime);
params.loadedProps.setProperty(HIVE_USER.key(), System.getProperty("user.name"));
params.loadedProps.setProperty(HIVE_PASS.key(), "");
params.loadedProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME);
params.loadedProps.setProperty(META_SYNC_TABLE_NAME.key(), TBL_NAME);
params.loadedProps.setProperty(META_SYNC_BASE_PATH.key(), localCluster.tablePath(DB_NAME, TBL_NAME));
params.loadedProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
params.loadedProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
params.loadedProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getName());
return params;
}
private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitParams config) throws Exception {
assertEquals(localCluster.getHMSClient()
.getTable(DB_NAME, TBL_NAME).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
.getTable(DB_NAME, TBL_NAME).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated timestamps");
}
@BeforeEach
public void setUp() throws Exception {
localCluster.forceCreateDb(DB_NAME);
remoteCluster.forceCreateDb(DB_NAME);
localCluster.dfsCluster.getFileSystem().delete(new Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
remoteCluster.dfsCluster.getFileSystem().delete(new Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
}
@AfterEach
public void clear() throws Exception {
localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
}
@Test
public void testHiveConfigShouldMatchClusterConf() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitParams params = getGlobalCommitConfig(commitTime);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(params);
ReplicationStateSync localReplicationStateSync = tool.getReplicatedState(false);
ReplicationStateSync remoteReplicationStateSync = tool.getReplicatedState(true);
assertEquals(localReplicationStateSync.globalHiveSyncTool.config.getHiveConf().get("hive.metastore.uris"),
localCluster.getHiveConf().get("hive.metastore.uris"));
assertEquals(remoteReplicationStateSync.globalHiveSyncTool.config.getHiveConf().get("hive.metastore.uris"),
remoteCluster.getHiveConf().get("hive.metastore.uris"));
}
@Test
public void testBasicGlobalCommit() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitParams params = getGlobalCommitConfig(commitTime);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(params);
assertTrue(tool.commit());
compareEqualLastReplicatedTimeStamp(params);
}
@Test
public void testBasicRollback() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitParams params = getGlobalCommitConfig(commitTime);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(params);
assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
// stop the remote cluster hive server to simulate cluster going down
remoteCluster.stopHiveServer2();
assertFalse(tool.commit());
assertEquals(commitTime, localCluster.getHMSClient()
.getTable(DB_NAME, TBL_NAME).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
assertTrue(tool.rollback()); // do a rollback
assertNotEquals(commitTime, localCluster.getHMSClient()
.getTable(DB_NAME, TBL_NAME).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
remoteCluster.startHiveServer2();
}
}