blob: 776fd7a2506c4a9d43d4fac04cd2b33e3f43d8ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso.client;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.tls.X509KeyType;
import org.apache.omid.tls.X509TestContext;
import org.apache.omid.tso.HALeaseManagementModule;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.VoidLeaseManagementModule;
import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
import org.apache.statemachine.StateMachine.FsmImpl;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.omid.tls.KeyStoreFileType;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.File;
import java.nio.file.Files;
import java.security.Security;
import java.util.concurrent.ExecutionException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestTSOClientConnectionToTSO {
private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
// Constants and variables for component connectivity
private static final String TSO_HOST = "localhost";
private static final String CURRENT_TSO_PATH = "/current_tso_path";
private static final String TSO_LEASE_PATH = "/tso_lease_path";
private int tsoPortForTest;
private String zkClusterForTest;
private Injector injector = null;
private TestingServer zkServer;
private CuratorFramework zkClient;
private TSOServer tsoServer;
protected X509TestContext x509TestContext;
@BeforeMethod
public void beforeMethod() throws Exception {
tsoPortForTest = TestUtils.getFreeLocalPort();
int zkPortForTest = TestUtils.getFreeLocalPort();
zkClusterForTest = TSO_HOST + ":" + zkPortForTest;
LOG.info("Starting ZK Server in port {}", zkPortForTest);
zkServer = TestUtils.provideTestingZKServer(zkPortForTest);
LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest);
Stat stat;
try {
zkClient.delete().forPath(CURRENT_TSO_PATH);
stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
assertNull(stat, CURRENT_TSO_PATH + " should not exist");
} catch (NoNodeException e) {
LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
}
}
@AfterMethod
public void afterMethod() {
zkClient.close();
CloseableUtils.closeQuietly(zkServer);
zkServer = null;
LOG.info("ZK Server Stopped");
}
@Test(timeOut = 30_000)
public void testUnsuccessfulConnectionToTSO() throws Exception {
// When no HA node for TSOServer is found & no host:port config exists
// we should get an exception when getting the client
try {
TSOClient.newInstance(new OmidClientConfiguration());
} catch (IllegalArgumentException e) {
// Expected
}
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
// Launch a TSO WITHOUT publishing the address in HA...
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(tsoPortForTest);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
LOG.info("Finished loading TSO");
// When no HA node for TSOServer is found we should get a connection
// to the TSO through the host:port configured...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOWithHostAndPortSSLServerside() throws Exception {
// Launch a TSO WITHOUT publishing the address in HA...
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(tsoPortForTest);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
tsoConfig.setTlsEnabled(true);
tsoConfig.setSupportPlainText(true);
Security.addProvider(new BouncyCastleProvider());
File tempDir = Files.createTempDirectory("x509Tests").toFile();
String keyPassword = "pa$$w0rd";
X509KeyType certKeyType = X509KeyType.RSA;
X509KeyType caKeyType = X509KeyType.RSA;
x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
.setTrustStoreKeyType(caKeyType).build();
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoConfig.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoConfig.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoConfig.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
LOG.info("Finished loading TSO");
// When no HA node for TSOServer is found we should get a connection
// to the TSO through the host:port configured...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
tsoClientConf.setTlsEnabled(false);
tsoClientConf.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoClientConf.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoClientConf.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOWithHostAndPortSSLBoth() throws Exception {
// Launch a TSO WITHOUT publishing the address in HA...
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(tsoPortForTest);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
tsoConfig.setTlsEnabled(true);
tsoConfig.setSupportPlainText(false);
Security.addProvider(new BouncyCastleProvider());
File tempDir = Files.createTempDirectory("x509Tests").toFile();
String keyPassword = "pa$$w0rd";
X509KeyType certKeyType = X509KeyType.RSA;
X509KeyType caKeyType = X509KeyType.RSA;
x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
.setTrustStoreKeyType(caKeyType).build();
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoConfig.setSslCrlEnabled(true);
injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
LOG.info("Finished loading TSO");
// When no HA node for TSOServer is found we should get a connection
// to the TSO through the host:port configured...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
tsoClientConf.setTlsEnabled(true);
tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOWithHostAndPortBCFKS() throws Exception {
// Launch a TSO WITHOUT publishing the address in HA...
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(tsoPortForTest);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
tsoConfig.setTlsEnabled(true);
tsoConfig.setSupportPlainText(false);
Security.addProvider(new BouncyCastleProvider());
File tempDir = Files.createTempDirectory("x509Tests").toFile();
String keyPassword = "pa$$w0rd";
X509KeyType certKeyType = X509KeyType.RSA;
X509KeyType caKeyType = X509KeyType.RSA;
x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
.setTrustStoreKeyType(caKeyType).build();
x509TestContext.setSystemProperties(KeyStoreFileType.BCFKS, KeyStoreFileType.BCFKS);
LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoConfig.setSslCrlEnabled(true);
injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
LOG.info("Finished loading TSO");
// When no HA node for TSOServer is found we should get a connection
// to the TSO through the host:port configured...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
tsoClientConf.setTlsEnabled(true);
tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOWithHostAndPortSSLBothMutual() throws Exception {
// Launch a TSO WITHOUT publishing the address in HA...
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(tsoPortForTest);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
tsoConfig.setTlsEnabled(true);
tsoConfig.setSupportPlainText(false);
Security.addProvider(new BouncyCastleProvider());
File tempDir = Files.createTempDirectory("x509Tests").toFile();
String keyPassword = "pa$$w0rd";
X509KeyType certKeyType = X509KeyType.RSA;
X509KeyType caKeyType = X509KeyType.RSA;
x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
.setTrustStoreKeyType(caKeyType).build();
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoConfig.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoConfig.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoConfig.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
tsoConfig.setSslCrlEnabled(true);
injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
LOG.info("Finished loading TSO");
// When no HA node for TSOServer is found we should get a connection
// to the TSO through the host:port configured...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
tsoClientConf.setTlsEnabled(true);
tsoClientConf.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoClientConf.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoClientConf.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOWithHostAndPortSSLCipherSuite() throws Exception {
// Launch a TSO WITHOUT publishing the address in HA...
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(tsoPortForTest);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
tsoConfig.setTlsEnabled(true);
tsoConfig.setSupportPlainText(false);
Security.addProvider(new BouncyCastleProvider());
File tempDir = Files.createTempDirectory("x509Tests").toFile();
String keyPassword = "pa$$w0rd";
X509KeyType certKeyType = X509KeyType.RSA;
X509KeyType caKeyType = X509KeyType.RSA;
x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
.setTrustStoreKeyType(caKeyType).build();
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoConfig.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384");
injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
LOG.info("Finished loading TSO");
// When no HA node for TSOServer is found we should get a connection
// to the TSO through the host:port configured...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
tsoClientConf.setTlsEnabled(true);
tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
tsoClientConf.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384");
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOWithHostAndPortSSLOneCipherSuite2() throws Exception {
// Launch a TSO WITHOUT publishing the address in HA...
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setConflictMapSize(1000);
tsoConfig.setPort(tsoPortForTest);
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
tsoConfig.setTlsEnabled(true);
tsoConfig.setSupportPlainText(false);
Security.addProvider(new BouncyCastleProvider());
File tempDir = Files.createTempDirectory("x509Tests").toFile();
String keyPassword = "pa$$w0rd";
X509KeyType certKeyType = X509KeyType.RSA;
X509KeyType caKeyType = X509KeyType.RSA;
x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
.setTrustStoreKeyType(caKeyType).build();
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
tsoConfig.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA");
injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
LOG.info("Finished loading TSO");
// When no HA node for TSOServer is found we should get a connection
// to the TSO through the host:port configured...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
tsoClientConf.setTlsEnabled(true);
tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
tsoClientConf.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA");
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
// Launch a TSO publishing the address in HA...
TSOServerConfig config = new TSOServerConfig();
config.setConflictMapSize(1000);
config.setPort(tsoPortForTest);
config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
injector = Guice.createInjector(new TSOMockModule(config));
LOG.info("Starting TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
LOG.info("Finished loading TSO");
waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
// When a HA node for TSOServer is found we should get a connection
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
tsoClientConf.setConnectionString(zkClusterForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... so we should get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertTrue(startTS.longValue() >= CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Close the tsoClient connection and stop the TSO Server
tsoClient.close().get();
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("TSO Server Stopped");
}
@Test(timeOut = 30_000)
public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
// Start a TSO with HA...
TSOServerConfig config = new TSOServerConfig();
config.setConflictMapSize(1000);
config.setPort(tsoPortForTest);
config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid"));
injector = Guice.createInjector(new TSOMockModule(config));
LOG.info("Starting Initial TSO");
tsoServer = injector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
LOG.info("Finished loading TSO");
waitTillTsoRegisters(injector.getInstance(CuratorFramework.class));
// Then create the TSO Client under test...
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA);
tsoClientConf.setConnectionString(zkClusterForTest);
tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
// ... and check that initially we get responses from the methods
Long startTS = tsoClient.getNewStartTimestamp().get();
LOG.info("Start TS {} ", startTS);
assertTrue(startTS.longValue() >= CommitTable.MAX_CHECKPOINTS_PER_TXN);
// Then stop the server...
tsoServer.stopAsync();
tsoServer.awaitTerminated();
tsoServer = null;
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("Initial TSO Server Stopped");
Thread.sleep(1500); // ...allow the client to receive disconnection event...
// ... and check that we get a conn exception when trying to access the client
try {
startTS = tsoClient.getNewStartTimestamp().get();
fail();
} catch (ExecutionException e) {
LOG.info("Exception expected");
// Internal accessor to fsm to do the required checkings
FsmImpl fsm = (FsmImpl) tsoClient.fsm;
assertEquals(e.getCause().getClass(), ConnectionException.class);
assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class)
||
fsm.getState().getClass().equals(TSOClient.DisconnectedState.class));
}
// After that, simulate that a new TSO has been launched...
Injector newInjector = Guice.createInjector(new TSOMockModule(config));
LOG.info("Re-Starting again the TSO");
tsoServer = newInjector.getInstance(TSOServer.class);
tsoServer.startAsync();
tsoServer.awaitRunning();
TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100);
LOG.info("Finished loading restarted TSO");
// Finally re-check that, eventually, we can get a new value from the new TSO...
boolean reconnectionActive = false;
while (!reconnectionActive) {
try {
startTS = tsoClient.getNewStartTimestamp().get();
reconnectionActive = true;
} catch (ExecutionException e) {
// Expected
}
}
assertNotNull(startTS);
// ...and stop the server
tsoServer.stopAsync();
tsoServer.awaitTerminated();
TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
LOG.info("Restarted TSO Server Stopped");
}
private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception {
while (true) {
try {
Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH);
if (stat == null) {
continue;
}
LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
if (stat.toString().length() == 0) {
continue;
}
return;
} catch (Exception e) {
LOG.debug("TSO still has not registered yet, sleeping...", e);
Thread.sleep(500);
}
}
}
}