blob: 296edaa0bfe2f9b09d44083266460db9fa02062e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.state.server;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.nifi.controller.cluster.SecureClientZooKeeperFactory;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.nifi.leader.election.ITSecureClientZooKeeperFactory.createSecureClientProperties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
// Testing setting up a ZooKeeperStateServer with TLS
public class ITZooKeeperStateServerTLS {
private static final String INSECURE_ZOOKEEPER_PROPS = getPath("insecure.zookeeper.properties");
private static final String PARTIAL_ZOOKEEPER_PROPS = getPath("partial.zookeeper.properties");
private static final String SECURE_ZOOKEEPER_PROPS = getPath("secure.zookeeper.properties");
private static final String ZOOKEEPER_PROPERTIES_FILE_KEY = "nifi.state.management.embedded.zookeeper.properties";
private static final String ZOOKEEPER_CNXN_FACTORY = "org.apache.zookeeper.server.NettyServerCnxnFactory";
private static final String QUORUM_CONNECT_STRING = "node0.apache.org:2281,node1.apache.org:2281";
private static final Map<String, String> SECURE_NIFI_PROPS = new HashMap<>();
private static final Map<String, String> SECURE_ZOOKEEPER_NIFI_PROPS = new HashMap<>();
private static TlsConfiguration tlsConfiguration;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@BeforeClass
public static void setTlsConfiguration() {
tlsConfiguration = new TemporaryKeyStoreBuilder().build();
SECURE_NIFI_PROPS.put(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, SECURE_ZOOKEEPER_PROPS);
SECURE_NIFI_PROPS.put(NiFiProperties.WEB_HTTPS_PORT, "8443");
SECURE_NIFI_PROPS.put(NiFiProperties.SECURITY_KEYSTORE, tlsConfiguration.getKeystorePath());
SECURE_NIFI_PROPS.put(NiFiProperties.SECURITY_KEYSTORE_TYPE, tlsConfiguration.getKeystoreType().getType());
SECURE_NIFI_PROPS.put(NiFiProperties.SECURITY_KEYSTORE_PASSWD, tlsConfiguration.getKeystorePassword());
SECURE_NIFI_PROPS.put(NiFiProperties.SECURITY_TRUSTSTORE, tlsConfiguration.getTruststorePath());
SECURE_NIFI_PROPS.put(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, tlsConfiguration.getTruststoreType().getType());
SECURE_NIFI_PROPS.put(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, tlsConfiguration.getTruststorePassword());
SECURE_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, SECURE_ZOOKEEPER_PROPS);
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.WEB_HTTPS_PORT, "8443");
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, tlsConfiguration.getKeystorePath());
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, tlsConfiguration.getKeystoreType().getType());
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, tlsConfiguration.getKeystorePassword());
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, tlsConfiguration.getTruststorePath());
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, tlsConfiguration.getTruststoreType().getType());
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, tlsConfiguration.getTruststorePassword());
SECURE_ZOOKEEPER_NIFI_PROPS.put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
}
private static final Map<String, String> INSECURE_NIFI_PROPS = new HashMap<String, String>() {{
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.WEB_HTTP_HOST, "localhost");
put(NiFiProperties.WEB_HTTP_PORT, "8080");
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "false");
}};
private NiFiProperties niFiProps;
private static NiFiProperties clientProperties;
private QuorumPeerConfig quorumPeerConfig;
private Properties secureZooKeeperProps;
private Properties insecureZooKeeperProps;
private Properties partialZooKeeperProps;
private ZooKeeperStateServer server;
@Before
public void setupWithValidProperties() throws IOException, QuorumPeerConfig.ConfigException {
niFiProps = NiFiProperties.createBasicNiFiProperties(null, SECURE_NIFI_PROPS);
assertNotNull(niFiProps);
// This shows that a ZooKeeper server is created from valid NiFi properties:
final ZooKeeperStateServer zooKeeperStateServer = ZooKeeperStateServer.create(niFiProps);
assertNotNull(zooKeeperStateServer);
quorumPeerConfig = zooKeeperStateServer.getQuorumPeerConfig();
assertNotNull(quorumPeerConfig);
secureZooKeeperProps = new Properties();
secureZooKeeperProps.load(FileUtils.openInputStream(new File(SECURE_ZOOKEEPER_PROPS)));
assertNotNull(secureZooKeeperProps);
insecureZooKeeperProps = new Properties();
insecureZooKeeperProps.load(FileUtils.openInputStream(new File(INSECURE_ZOOKEEPER_PROPS)));
assertNotNull(insecureZooKeeperProps);
partialZooKeeperProps = new Properties();
partialZooKeeperProps.load(FileUtils.openInputStream(new File(PARTIAL_ZOOKEEPER_PROPS)));
assertNotNull(partialZooKeeperProps);
}
@After
public void clearConnectionProperties() {
Collections.unmodifiableSet(System.getProperties().stringPropertyNames()).stream()
.filter(name -> name.startsWith("zookeeper."))
.forEach(System::clearProperty);
}
// This test shows that a ZooKeeperStateServer cannot be created from empty NiFi properties.
@Test
public void testCreateFromEmptyNiFiProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties emptyProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<>());
assertNotNull(emptyProps);
Assert.assertNull(ZooKeeperStateServer.create(emptyProps));
}
// This test shows that a ZooKeeperStateServer can be created from insecure NiFi properties.
@Test
public void testCreateFromValidInsecureNiFiProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties insecureProps = NiFiProperties.createBasicNiFiProperties(null, INSECURE_NIFI_PROPS);
final ZooKeeperStateServer server = ZooKeeperStateServer.create(insecureProps);
assertNotNull(server);
assertNotNull(server.getQuorumPeerConfig().getClientPortAddress());
}
// This test shows that a ZK TLS config with some but not all values throws an exception:
@Test
public void testCreateFromPartialZooKeeperTlsProperties() {
final NiFiProperties partialProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, PARTIAL_ZOOKEEPER_PROPS);
}});
Assert.assertThrows(QuorumPeerConfig.ConfigException.class, () ->
ZooKeeperStateServer.create(partialProps));
}
// This test shows that a ZK TLS config with all values set is valid and a server can be created using it:
@Test
public void testCreateFromCompleteZooKeeperTlsProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties completeProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
assertNotNull(ZooKeeperStateServer.create(completeProps));
}
// This test shows that the client can specify a secure port and that port is used:
@Test
public void testCreateWithSpecifiedSecureClientPort() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties secureProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
final ZooKeeperStateServer server = ZooKeeperStateServer.create(secureProps);
assertNotNull(server);
final QuorumPeerConfig config = server.getQuorumPeerConfig();
assertEquals(secureZooKeeperProps.getProperty("secureClientPort"), String.valueOf(config.getSecureClientPortAddress().getPort()));
}
// This shows that a secure NiFi with an insecure ZooKeeper will not have an insecure client address or port:
@Test
public void testCreateRemovesInsecureClientPort() {
assertNotNull(insecureZooKeeperProps.getProperty("clientPort"));
Assert.assertNotEquals(insecureZooKeeperProps.getProperty("clientPort"), "");
Assert.assertNull(quorumPeerConfig.getClientPortAddress());
}
// This test shows that a connection class is set when none is specified (QuorumPeerConfig::parseProperties sets the System property):
@Test
public void testCreateWithUnspecifiedConnectionClass() {
assertEquals(org.apache.zookeeper.server.NettyServerCnxnFactory.class.getName(), System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY));
}
// This test shows that a specified connection class is honored (QuorumPeerConfig::parseProperties sets the System property):
@Test
public void testCreateWithSpecifiedConnectionClass() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties secureProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
assertNotNull(ZooKeeperStateServer.create(secureProps));
assertEquals(ZOOKEEPER_CNXN_FACTORY, System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY));
}
@After
public void tearDown() throws Exception {
if(server != null) {
File stateDir = server.getQuorumPeerConfig().getDataDir().getParentFile();
deleteRecursively(stateDir);
server.shutdown();
server = null;
}
}
private void deleteRecursively(final File file) {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
deleteRecursively(child);
}
}
file.delete();
}
// Connect to a secure ZooKeeperStateServer
@Test
public void testSecureClientQuorumConnectString() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, QUORUM_CONNECT_STRING);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getSecureZooKeeperClient(serverPort);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
// Connect to an insecure ZooKeeperStateServer with an insecure client (ensure insecure setup still works)
@Test
public void testInsecureZooKeeperWithInsecureClient() throws Exception {
final String connect = "localhost:" + 2381;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
// Fail to connect to a secure ZooKeeperStateServer with insecure client configuration
@Test(expected = KeeperException.ConnectionLossException.class)
public void testSecureZooKeeperStateServerWithInsecureClient() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
// Expect this to fail with ConnectionLossException
client.create().forPath(testPath, new byte[0]);
}
// Fail to connect to a secure ZooKeeperStateServer with missing client configuration
@Test(expected = KeeperException.ConnectionLossException.class)
public void testSecureZooKeeperStateServerWithMissingClientConfiguration() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
// Expect this to fail with ConnectionLossException
final String createResult = client.create().forPath(testPath, new byte[0]);
}
// Connect to a secure ZooKeeperStateServer
@Test
public void testSecureClientConnection() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getSecureZooKeeperClient(serverPort);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
// Connect to an insecure ZooKeeperStateServer
@Test
public void testClientSecureFalseClientPortNoTls() throws Exception {
final int actualPort = Integer.parseInt(insecureZooKeeperProps.getProperty("clientPort", "3000"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
@Test
public void testClientSecureFalseWithClientSecurePortAndNoTls() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage("ZooKeeper properties file src/test/resources/TestZooKeeperStateServerConfigurations/secure.zookeeper.properties was " +
"configured to be secure but there was no valid TLS config present in nifi.properties or nifi.zookeeper.client.secure was set to false. Check the administration guide");
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testClientSecureTrueWithInsecureZooKeeperAndTlsSet() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage("NiFi was configured to be secure but secureClientPort could not be retrieved from zookeeper.properties file");
final int actualPort = Integer.parseInt(insecureZooKeeperProps.getProperty("clientPort"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testClientSecureTrueWithNoTls() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage(NiFiProperties.ZOOKEEPER_CLIENT_SECURE + " is true but no TLS configuration is present in nifi.properties");
final int actualPort = Integer.parseInt(insecureZooKeeperProps.getProperty("clientPort"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testZooKeeperMissingPortSettings() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage("No clientAddress or secureClientAddress is set in zookeeper.properties");
final String connect = "localhost:" + 2181;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, PARTIAL_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testClientSecureFalseAndOnlyZooKeeperClientPortSetWithTlsProperties() throws Exception {
final String connect = "localhost:" + 2181;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "false");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
// Connect to a secure ZooKeeperStateServer with ZooKeeper.
@Test
public void testSecureClientConnectionWithZooKeeperSecurityProperties() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_ZOOKEEPER_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getSecureZooKeeperClient(serverPort);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
private static String getPath(String path) {
return new File("src/test/resources/TestZooKeeperStateServerConfigurations/" + path).getPath();
}
private CuratorFramework getSecureZooKeeperClient(final int port) {
// TODO: port being set needs to be based on port set in nifi.properties, should create client in the same
clientProperties = createSecureClientProperties(
port,
Paths.get(tlsConfiguration.getKeystorePath()),
tlsConfiguration.getKeystoreType().getType(),
tlsConfiguration.getKeystorePassword(),
Paths.get(tlsConfiguration.getTruststorePath()),
tlsConfiguration.getTruststoreType().getType(),
tlsConfiguration.getTruststorePassword()
);
final ZooKeeperClientConfig zkClientConfig =
ZooKeeperClientConfig.createConfig(clientProperties);
return getCuratorFramework(zkClientConfig, new SecureClientZooKeeperFactory(zkClientConfig));
}
private CuratorFramework getInsecureZooKeeperClient(final Map<String, String> nifiClientProps, final String connectString) {
// set up zkClientConfig
Map<String, String> supplementedProps = nifiClientProps;
supplementedProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connectString);
NiFiProperties insecureClientProperties = NiFiProperties.createBasicNiFiProperties(null, supplementedProps);
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(insecureClientProperties);
ZookeeperFactory factory = null;
try {
factory = new DefaultZookeeperFactory();
} catch (Exception e) {
e.printStackTrace();
}
return getCuratorFramework(zkClientConfig, factory);
}
private CuratorFramework getCuratorFramework(ZooKeeperClientConfig zkClientConfig, ZookeeperFactory factory) {
final CuratorFrameworkFactory.Builder clientBuilder = CuratorFrameworkFactory.builder()
.connectString(zkClientConfig.getConnectString())
.sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis())
.retryPolicy(new RetryOneTime(200))
.defaultData(new byte[0])
.zookeeperFactory(factory);
return clientBuilder.build();
}
}