blob: 731eb9c6bd73dd926e58bb94d5932bd39a17bac5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
public class TestReplicationOnHDFSEncryptedZones {
private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";
private static String jksFile2 = System.getProperty("java.io.tmpdir") + "/test2.jks";
@Rule
public final TestName testName = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private static WarehouseInstance primary;
private static String primaryDbName, replicatedDbName;
private static Configuration conf;
private static MiniDFSCluster miniDFSCluster;
@BeforeClass
public static void beforeClassSetup() throws Exception {
System.setProperty("jceks.key.serialFilter", "java.lang.Enum;java.security.KeyRep;" +
"java.security.KeyRep$Type;javax.crypto.spec.SecretKeySpec;" +
"org.apache.hadoop.crypto.key.JavaKeyStoreProvider$KeyMetadata;!*");
conf = new Configuration();
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile);
conf.setBoolean("dfs.namenode.delegation.token.always-use", true);
conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, 1);
conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname, 0);
conf.setBoolean(METASTORE_AGGREGATE_STATS_CACHE_ENABLED.varname, false);
miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
DFSTestUtil.createKey("test_key", miniDFSCluster, conf);
primary = new WarehouseInstance(LOG, miniDFSCluster, new HashMap<String, String>() {{
put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
}}, "test_key");
}
@AfterClass
public static void classLevelTearDown() throws IOException {
primary.close();
FileUtils.deleteQuietly(new File(jksFile));
FileUtils.deleteQuietly(new File(jksFile2));
}
@Before
public void setup() throws Throwable {
primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
replicatedDbName = "replicated_" + primaryDbName;
primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')");
}
@Test
public void targetAndSourceHaveDifferentEncryptionZoneKeys() throws Throwable {
String replicaBaseDir = Files.createTempDirectory("replica").toFile().getAbsolutePath();
Configuration replicaConf = new Configuration();
replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
replicaConf.set("dfs.client.use.datanode.hostname", "true");
replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
replicaConf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile2);
replicaConf.setBoolean("dfs.namenode.delegation.token.always-use", true);
MiniDFSCluster miniReplicaDFSCluster =
new MiniDFSCluster.Builder(replicaConf).numDataNodes(2).format(true).build();
replicaConf.setBoolean(METASTORE_AGGREGATE_STATS_CACHE_ENABLED.varname, false);
DFSTestUtil.createKey("test_key123", miniReplicaDFSCluster, replicaConf);
WarehouseInstance replica = new WarehouseInstance(LOG, miniReplicaDFSCluster,
new HashMap<String, String>() {{
put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
put(HiveConf.ConfVars.REPLDIR.varname, primary.repldDir);
}}, "test_key123");
//read should pass without raw-byte distcp
List<String> dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+ replica.externalTableWarehouseRoot + "'");
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create external table encrypted_table2 (id int, value string)")
.run("insert into table encrypted_table2 values (1,'value1')")
.run("insert into table encrypted_table2 values (2,'value2')")
.dump(primaryDbName, dumpWithClause);
replica
.run("repl load " + primaryDbName + " into " + replicatedDbName
+ " with('hive.repl.replica.external.table.base.dir'='" + replica.externalTableWarehouseRoot + "', "
+ "'hive.exec.copyfile.maxsize'='0', 'distcp.options.skipcrccheck'='')")
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.run("select value from encrypted_table2")
.verifyResults(new String[] { "value1", "value2" });
}
@Test
public void targetAndSourceHaveSameEncryptionZoneKeys() throws Throwable {
String replicaBaseDir = Files.createTempDirectory("replica2").toFile().getAbsolutePath();
Configuration replicaConf = new Configuration();
replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
replicaConf.set("dfs.client.use.datanode.hostname", "true");
replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
replicaConf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile);
replicaConf.setBoolean("dfs.namenode.delegation.token.always-use", true);
MiniDFSCluster miniReplicaDFSCluster =
new MiniDFSCluster.Builder(replicaConf).numDataNodes(2).format(true).build();
replicaConf.setBoolean(METASTORE_AGGREGATE_STATS_CACHE_ENABLED.varname, false);
WarehouseInstance replica = new WarehouseInstance(LOG, miniReplicaDFSCluster,
new HashMap<String, String>() {{
put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
put(HiveConf.ConfVars.REPLDIR.varname, primary.repldDir);
}}, "test_key");
List<String> dumpWithClause = Arrays.asList(
"'hive.repl.add.raw.reserved.namespace'='true'",
"'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+ replica.externalTableWarehouseRoot + "'",
"'distcp.options.skipcrccheck'=''",
"'" + HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + "'='false'",
"'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ UserGroupInformation.getCurrentUser().getUserName() +"'");
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
.run("create table encrypted_table (id int, value string)")
.run("insert into table encrypted_table values (1,'value1')")
.run("insert into table encrypted_table values (2,'value2')")
.dump(primaryDbName, dumpWithClause);
replica
.run("repl load " + primaryDbName + " into " + replicatedDbName
+ " with('hive.repl.add.raw.reserved.namespace'='true',"
+ "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+ replica.externalTableWarehouseRoot + "')")
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("select value from encrypted_table")
.verifyResults(new String[] { "value1", "value2" });
}
}