blob: 0b4daf8d16e4a1ecfd3bbfebc05e4f5560846369 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
import org.apache.hive.http.security.PamAuthenticator;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
import org.apache.hive.service.server.HiveServer2Instance;
import org.apache.hive.service.server.TestHS2HttpServerPam;
import org.apache.hive.service.servlet.HS2Peers;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.StatusLine;
import org.apache.http.util.EntityUtils;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
public class TestActivePassiveHA {
private MiniHS2 miniHS2_1 = null;
private MiniHS2 miniHS2_2 = null;
private static TestingServer zkServer;
private Connection hs2Conn = null;
private static String ADMIN_USER = "user1"; // user from TestPamAuthenticator
private static String ADMIN_PASSWORD = "1";
private static String serviceDiscoveryMode = "zooKeeperHA";
private static String zkHANamespace = "hs2ActivePassiveHATest";
private HiveConf hiveConf1;
private HiveConf hiveConf2;
private static Path kvDataFilePath;
@BeforeClass
public static void beforeTest() throws Exception {
MiniHS2.cleanupLocalDir();
zkServer = new TestingServer();
Class.forName(MiniHS2.getJdbcDriverName());
}
@AfterClass
public static void afterTest() throws Exception {
if (zkServer != null) {
zkServer.close();
zkServer = null;
}
MiniHS2.cleanupLocalDir();
}
@Before
public void setUp() throws Exception {
hiveConf1 = new HiveConf();
hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
// Set up zookeeper dynamic service discovery configs
setHAConfigs(hiveConf1);
miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build();
hiveConf2 = new HiveConf();
hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
// Set up zookeeper dynamic service discovery configs
setHAConfigs(hiveConf2);
miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build();
final String dataFileDir = hiveConf1.get("test.data.files").replace('\\', '/').replace("c:", "");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
}
@After
public void tearDown() throws Exception {
if (hs2Conn != null) {
hs2Conn.close();
}
if ((miniHS2_1 != null) && miniHS2_1.isStarted()) {
miniHS2_1.stop();
}
if ((miniHS2_2 != null) && miniHS2_2.isStarted()) {
miniHS2_2.stop();
}
}
private static void setHAConfigs(Configuration conf) {
conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true);
conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString());
conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true);
conf.set(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname, zkHANamespace);
conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS);
conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS);
conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1);
}
@Test(timeout = 60000)
public void testActivePassiveHA() throws Exception {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
String instanceId2 = UUID.randomUUID().toString();
miniHS2_2.start(getConfOverlay(instanceId2));
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("true", sendGet(url));
assertEquals(false, miniHS2_2.isLeader());
url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("false", sendGet(url));
url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
String resp = sendGet(url);
ObjectMapper objectMapper = new ObjectMapper();
HS2Peers.HS2Instances hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
assertEquals(2, hs2Peers.getHiveServer2Instances().size());
for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
if (hsi.getRpcPort() == port1 && hsi.getWorkerIdentity().equals(instanceId1)) {
assertEquals(true, hsi.isLeader());
} else {
assertEquals(false, hsi.isLeader());
}
}
Configuration conf = new Configuration();
setHAConfigs(conf);
HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf);
List<HiveServer2Instance> hs2Instances = new ArrayList<>(client.getAll());
assertEquals(2, hs2Instances.size());
List<HiveServer2Instance> leaders = new ArrayList<>();
List<HiveServer2Instance> standby = new ArrayList<>();
for (HiveServer2Instance instance : hs2Instances) {
if (instance.isLeader()) {
leaders.add(instance);
} else {
standby.add(instance);
}
}
assertEquals(1, leaders.size());
assertEquals(1, standby.size());
miniHS2_1.stop();
assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_2.isLeader());
url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("true", sendGet(url));
while (client.getAll().size() != 1) {
Thread.sleep(100);
}
client = HS2ActivePassiveHARegistryClient.getClient(conf);
hs2Instances = new ArrayList<>(client.getAll());
assertEquals(1, hs2Instances.size());
leaders = new ArrayList<>();
standby = new ArrayList<>();
for (HiveServer2Instance instance : hs2Instances) {
if (instance.isLeader()) {
leaders.add(instance);
} else {
standby.add(instance);
}
}
assertEquals(1, leaders.size());
assertEquals(0, standby.size());
url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
resp = sendGet(url);
objectMapper = new ObjectMapper();
hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
assertEquals(1, hs2Peers.getHiveServer2Instances().size());
for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
if (hsi.getRpcPort() == port2 && hsi.getWorkerIdentity().equals(instanceId2)) {
assertEquals(true, hsi.isLeader());
} else {
assertEquals(false, hsi.isLeader());
}
}
// start 1st server again
instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
assertEquals(false, miniHS2_1.isLeader());
url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("false", sendGet(url));
while (client.getAll().size() != 2) {
Thread.sleep(100);
}
client = HS2ActivePassiveHARegistryClient.getClient(conf);
hs2Instances = new ArrayList<>(client.getAll());
assertEquals(2, hs2Instances.size());
leaders = new ArrayList<>();
standby = new ArrayList<>();
for (HiveServer2Instance instance : hs2Instances) {
if (instance.isLeader()) {
leaders.add(instance);
} else {
standby.add(instance);
}
}
assertEquals(1, leaders.size());
assertEquals(1, standby.size());
url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
resp = sendGet(url);
objectMapper = new ObjectMapper();
hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
assertEquals(2, hs2Peers.getHiveServer2Instances().size());
for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
if (hsi.getRpcPort() == port2 && hsi.getWorkerIdentity().equals(instanceId2)) {
assertEquals(true, hsi.isLeader());
} else {
assertEquals(false, hsi.isLeader());
}
}
}
@Test(timeout = 60000)
public void testConnectionActivePassiveHAServiceDiscovery() throws Exception {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
String instanceId2 = UUID.randomUUID().toString();
Map<String, String> confOverlay = getConfOverlay(instanceId2);
confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http");
confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest");
miniHS2_2.start(confOverlay);
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("true", sendGet(url));
assertEquals(false, miniHS2_2.isLeader());
url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals("false", sendGet(url));
// miniHS2_1 will be leader
String zkConnectString = zkServer.getConnectString();
String zkJdbcUrl = miniHS2_1.getJdbcURL();
// getAllUrls will parse zkJdbcUrl and will plugin the active HS2's host:port
String parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString();
String hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() +
"/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";";
assertTrue(zkJdbcUrl.contains(zkConnectString));
assertEquals(hs2_1_directUrl, parsedUrl);
openConnectionAndRunQuery(zkJdbcUrl);
// miniHS2_2 will become leader
miniHS2_1.stop();
assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString();
String hs2_2_directUrl = "jdbc:hive2://" + miniHS2_2.getHost() + ":" + miniHS2_2.getHttpPort() +
"/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";";
assertTrue(zkJdbcUrl.contains(zkConnectString));
assertEquals(hs2_2_directUrl, parsedUrl);
openConnectionAndRunQuery(zkJdbcUrl);
// miniHS2_2 will continue to be leader
instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString();
assertTrue(zkJdbcUrl.contains(zkConnectString));
assertEquals(hs2_2_directUrl, parsedUrl);
openConnectionAndRunQuery(zkJdbcUrl);
// miniHS2_1 will become leader
miniHS2_2.stop();
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString();
hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() +
"/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";";
assertTrue(zkJdbcUrl.contains(zkConnectString));
assertEquals(hs2_1_directUrl, parsedUrl);
openConnectionAndRunQuery(zkJdbcUrl);
}
@Test(timeout = 60000)
public void testManualFailover() throws Exception {
hiveConf1.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
hiveConf2.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
setPamConfs(hiveConf1);
setPamConfs(hiveConf2);
PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2);
try {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.setPamAuthenticator(pamAuthenticator1);
miniHS2_1.start(getSecureConfOverlay(instanceId1));
String instanceId2 = UUID.randomUUID().toString();
Map<String, String> confOverlay = getSecureConfOverlay(instanceId2);
confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http");
confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest");
miniHS2_2.setPamAuthenticator(pamAuthenticator2);
miniHS2_2.start(confOverlay);
String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
// when we start miniHS2_1 will be leader (sequential start)
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
assertEquals("true", sendGet(url1, true, true));
// trigger failover on miniHS2_1
String resp = sendDelete(url1, true, true);
assertTrue(resp.contains("Failover successful!"));
// make sure miniHS2_1 is not leader
assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get());
assertEquals(false, miniHS2_1.isLeader());
assertEquals("false", sendGet(url1, true, true));
// make sure miniHS2_2 is the new leader
assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_2.isLeader());
assertEquals("true", sendGet(url2, true, true));
// send failover request again to miniHS2_1 and get a failure
resp = sendDelete(url1, true, true);
assertTrue(resp.contains("Cannot failover an instance that is not a leader"));
assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get());
assertEquals(false, miniHS2_1.isLeader());
// send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners)
resp = sendDelete(url2, true, true);
assertTrue(resp.contains("Failover successful!"));
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
assertEquals("true", sendGet(url1, true, true));
assertEquals(true, miniHS2_2.getNotLeaderTestFuture().get());
assertEquals("false", sendGet(url2, true, true));
assertEquals(false, miniHS2_2.isLeader());
} finally {
resetFailoverConfs();
}
}
@Test(timeout = 60000)
public void testManualFailoverUnauthorized() throws Exception {
setPamConfs(hiveConf1);
PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
try {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.setPamAuthenticator(pamAuthenticator1);
miniHS2_1.start(getSecureConfOverlay(instanceId1));
// dummy HS2 instance just to trigger failover
String instanceId2 = UUID.randomUUID().toString();
Map<String, String> confOverlay = getSecureConfOverlay(instanceId2);
confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http");
confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest");
miniHS2_2.start(confOverlay);
String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
// when we start miniHS2_1 will be leader (sequential start)
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
assertEquals("true", sendGet(url1, true));
// trigger failover on miniHS2_1 without authorization header
assertTrue(sendDelete(url1, false).contains("Unauthorized"));
assertTrue(sendDelete(url1, true).contains("Failover successful!"));
assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get());
assertEquals(false, miniHS2_1.isLeader());
assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_2.isLeader());
} finally {
// revert configs to not affect other tests
unsetPamConfs(hiveConf1);
}
}
@Test(timeout = 60000)
public void testNoConnectionOnPassive() throws Exception {
hiveConf1.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
hiveConf2.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
setPamConfs(hiveConf1);
setPamConfs(hiveConf2);
try {
PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2);
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.setPamAuthenticator(pamAuthenticator1);
miniHS2_1.start(getSecureConfOverlay(instanceId1));
String instanceId2 = UUID.randomUUID().toString();
Map<String, String> confOverlay = getSecureConfOverlay(instanceId2);
miniHS2_2.setPamAuthenticator(pamAuthenticator2);
miniHS2_2.start(confOverlay);
String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
// Don't get urls from ZK, it will actually be a service discovery URL that we don't want.
String hs1Url = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort();
Connection hs2Conn = getConnection(hs1Url, System.getProperty("user.name")); // Should work.
hs2Conn.close();
String resp = sendDelete(url1, true);
assertTrue(resp, resp.contains("Failover successful!"));
// wait for failover to close sessions
while (miniHS2_1.getOpenSessionsCount() != 0) {
Thread.sleep(100);
}
assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_2.isLeader());
try {
hs2Conn = getConnection(hs1Url, System.getProperty("user.name"));
fail("Should throw");
} catch (Exception e) {
if (!e.getMessage().contains("Cannot open sessions on an inactive HS2")) {
throw e;
}
}
} finally {
resetFailoverConfs();
}
}
private void resetFailoverConfs() {
// revert configs to not affect other tests
unsetPamConfs(hiveConf1);
unsetPamConfs(hiveConf2);
hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
}
@Test(timeout = 60000)
public void testClientConnectionsOnFailover() throws Exception {
setPamConfs(hiveConf1);
setPamConfs(hiveConf2);
PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2);
try {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.setPamAuthenticator(pamAuthenticator1);
miniHS2_1.start(getSecureConfOverlay(instanceId1));
String instanceId2 = UUID.randomUUID().toString();
Map<String, String> confOverlay = getSecureConfOverlay(instanceId2);
confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http");
confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest");
miniHS2_2.setPamAuthenticator(pamAuthenticator2);
miniHS2_2.start(confOverlay);
String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
String zkJdbcUrl = miniHS2_1.getJdbcURL();
String zkConnectString = zkServer.getConnectString();
assertTrue(zkJdbcUrl.contains(zkConnectString));
// when we start miniHS2_1 will be leader (sequential start)
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
assertEquals("true", sendGet(url1, true));
// before failover, check if we are getting connection from miniHS2_1
String hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() +
"/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";";
String parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString();
assertEquals(hs2_1_directUrl, parsedUrl);
hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name"));
while (miniHS2_1.getOpenSessionsCount() != 1) {
Thread.sleep(100);
}
// trigger failover on miniHS2_1 and make sure the connections are closed
String resp = sendDelete(url1, true);
assertTrue(resp.contains("Failover successful!"));
// wait for failover to close sessions
while (miniHS2_1.getOpenSessionsCount() != 0) {
Thread.sleep(100);
}
// make sure miniHS2_1 is not leader
assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get());
assertEquals(false, miniHS2_1.isLeader());
assertEquals("false", sendGet(url1, true));
// make sure miniHS2_2 is the new leader
assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_2.isLeader());
assertEquals("true", sendGet(url2, true));
// when we make a new connection we should get it from miniHS2_2 this time
String hs2_2_directUrl = "jdbc:hive2://" + miniHS2_2.getHost() + ":" + miniHS2_2.getHttpPort() +
"/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";";
parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString();
assertEquals(hs2_2_directUrl, parsedUrl);
hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name"));
while (miniHS2_2.getOpenSessionsCount() != 1) {
Thread.sleep(100);
}
// send failover request again to miniHS2_1 and get a failure
resp = sendDelete(url1, true);
assertTrue(resp.contains("Cannot failover an instance that is not a leader"));
assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get());
assertEquals(false, miniHS2_1.isLeader());
// send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners)
resp = sendDelete(url2, true);
assertTrue(resp.contains("Failover successful!"));
assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
assertEquals(true, miniHS2_1.isLeader());
assertEquals("true", sendGet(url1, true));
assertEquals(true, miniHS2_2.getNotLeaderTestFuture().get());
assertEquals("false", sendGet(url2, true));
assertEquals(false, miniHS2_2.isLeader());
// make sure miniHS2_2 closes all its connections
while (miniHS2_2.getOpenSessionsCount() != 0) {
Thread.sleep(100);
}
// new connections goes to miniHS2_1 now
hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name"));
while (miniHS2_1.getOpenSessionsCount() != 1) {
Thread.sleep(100);
}
} finally {
// revert configs to not affect other tests
unsetPamConfs(hiveConf1);
unsetPamConfs(hiveConf2);
}
}
private Connection getConnection(String jdbcURL, String user) throws SQLException {
return DriverManager.getConnection(jdbcURL, user, "bar");
}
private void openConnectionAndRunQuery(String jdbcUrl) throws Exception {
hs2Conn = getConnection(jdbcUrl, System.getProperty("user.name"));
String tableName = "testTab1";
Statement stmt = hs2Conn.createStatement();
// create table
stmt.execute("DROP TABLE IF EXISTS " + tableName);
stmt.execute("CREATE TABLE " + tableName
+ " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
// load data
stmt.execute("load data local inpath '" + kvDataFilePath.toString() + "' into table "
+ tableName);
ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(res.next());
assertEquals("val_238", res.getString(2));
res.close();
stmt.close();
}
private String sendGet(String url) throws Exception {
return sendGet(url, false);
}
private String sendGet(String url, boolean enableAuth) throws Exception {
return sendAuthMethod(new HttpGet(url), enableAuth, false);
}
private String sendGet(String url, boolean enableAuth, boolean enableCORS) throws Exception {
return sendAuthMethod(new HttpGet(url), enableAuth, enableCORS);
}
private String sendDelete(String url, boolean enableAuth) throws Exception {
return sendAuthMethod(new HttpDelete(url), enableAuth, false);
}
private String sendDelete(String url, boolean enableAuth, boolean enableCORS) throws Exception {
return sendAuthMethod(new HttpDelete(url), enableAuth, enableCORS);
}
private String sendAuthMethod(HttpRequestBase method, boolean enableAuth, boolean enableCORS) throws Exception {
CloseableHttpResponse httpResponse = null;
String response = null;
int statusCode = -1;
try (
CloseableHttpClient client = HttpClients.createDefault();
) {
// CORS check
if (enableCORS) {
String origin = "http://example.com";
HttpOptions optionsMethod = new HttpOptions(method.getURI().toString());
optionsMethod.addHeader("Origin", origin);
if (enableAuth) {
setupAuthHeaders(optionsMethod);
}
httpResponse = client.execute(optionsMethod);
if (httpResponse != null) {
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null) {
response = httpResponse.getStatusLine().getReasonPhrase();
statusCode = httpResponse.getStatusLine().getStatusCode();
if (statusCode == 200) {
Header originHeader = httpResponse.getFirstHeader("Access-Control-Allow-Origin");
assertNotNull(originHeader);
assertEquals(origin, originHeader.getValue());
} else {
fail("CORS returned: " + statusCode + " Error: " + response);
}
} else {
fail("Status line is null");
}
} else {
fail("No http Response");
}
}
if (enableAuth) {
setupAuthHeaders(method);
}
httpResponse = client.execute(method);
if (httpResponse != null) {
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null) {
response = httpResponse.getStatusLine().getReasonPhrase();
statusCode = httpResponse.getStatusLine().getStatusCode();
if (statusCode == 200) {
return EntityUtils.toString(httpResponse.getEntity());
}
}
}
return response;
} finally {
httpResponse.close();
}
}
private void setupAuthHeaders(final HttpRequestBase method) {
String authB64Code =
B64Code.encode(ADMIN_USER + ":" + ADMIN_PASSWORD, StringUtil.__ISO_8859_1);
method.setHeader(HttpHeader.AUTHORIZATION.asString(), "Basic " + authB64Code);
}
private Map<String, String> getConfOverlay(final String instanceId) {
Map<String, String> confOverlay = new HashMap<>();
confOverlay.put("hive.server2.zookeeper.publish.configs", "true");
confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId);
return confOverlay;
}
private Map<String, String> getSecureConfOverlay(final String instanceId) {
Map<String, String> confOverlay = new HashMap<>();
confOverlay.put("hive.server2.zookeeper.publish.configs", "true");
confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId);
confOverlay.put("hadoop.security.instrumentation.requires.admin", "true");
confOverlay.put("hadoop.security.authorization", "true");
confOverlay.put(ConfVars.USERS_IN_ADMIN_ROLE.varname, ADMIN_USER);
return confOverlay;
}
private void setPamConfs(final HiveConf hiveConf) {
hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_PAM_SERVICES, "sshd");
hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM, true);
hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, true);
}
private void unsetPamConfs(final HiveConf hiveConf) {
hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_PAM_SERVICES, "");
hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM, false);
hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, false);
}
// This is test for llap command AuthZ added in HIVE-19033 which require ZK access for it to pass
@Test(timeout = 60000)
public void testNoAuthZLlapClusterInfo() throws Exception {
String instanceId1 = UUID.randomUUID().toString();
miniHS2_1.start(getConfOverlay(instanceId1));
Connection hs2Conn = getConnection(miniHS2_1.getJdbcURL(), "user1");
boolean caughtException = false;
Statement stmt = hs2Conn.createStatement();
try {
stmt.execute("set hive.llap.daemon.service.hosts=@localhost");
stmt.execute("llap cluster -info");
} catch (SQLException e) {
caughtException = true;
} finally {
stmt.close();
hs2Conn.close();
}
assertEquals(false, caughtException);
}
}