ZOOKEEPER-4276. Serving only with secureClientPort fails (#2117)
* Support TLS-only ZK server
* Cleanup
* Update documentation according to review comments
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperReconfig.md b/zookeeper-docs/src/main/resources/markdown/zookeeperReconfig.md
index afb85df..8b3e3da 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperReconfig.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperReconfig.md
@@ -88,27 +88,41 @@
### Specifying the client port
-A client port of a server is the port on which the server accepts
-client connection requests. Starting with 3.5.0 the
-_clientPort_ and _clientPortAddress_ configuration parameters should no longer be used. Instead,
-this information is now part of the server keyword specification, which
+A client port of a server is the port on which the server accepts plaintext (non-TLS) client connection requests
+and secure client port is the port on which the server accepts TLS client connection requests.
+
+Starting with 3.5.0 the
+_clientPort_ and _clientPortAddress_ configuration parameters should no longer be used in zoo.cfg.
+
+Starting with 3.10.0 the
+_secureClientPort_ and _secureClientPortAddress_ configuration parameters should no longer be used in zoo.cfg.
+
+Instead, this information is now part of the server keyword specification, which
becomes as follows:
- server.<positive id> = <address1>:<port1>:<port2>[:role];[<client port address>:]<client port>**
+ server.<positive id> = <address1>:<quorum port>:<leader election port>[:role];[[<client port address>:]<client port>][;[<secure client port address>:]<secure client port>]
-The client port specification is to the right of the semicolon. The
-client port address is optional, and if not specified it defaults to
-"0.0.0.0". As usual, role is also optional, it can be
-_participant_ or _observer_
-(_participant_ by default).
+- [New in ZK 3.10.0] The client port specification is optional and is to the right of the
+ first semicolon. The secure client port specification is also optional and is to the right
+ of the second semicolon. However, both the client port and secure client port specification
+ cannot be omitted, at least one of them should be present. If the user intends to omit client
+ port specification and provide only secure client port specification (TLS-only server), a second
+ semicolon should still be specified to indicate an empty client port specification (see last
+ example below). In either spec, the port address is optional, and if not specified it defaults
+ to "0.0.0.0".
+- As usual, role is also optional, it can be _participant_ or _observer_ (_participant_ by default).
Examples of legal server statements:
- server.5 = 125.23.63.23:1234:1235;1236
- server.5 = 125.23.63.23:1234:1235:participant;1236
- server.5 = 125.23.63.23:1234:1235:observer;1236
- server.5 = 125.23.63.23:1234:1235;125.23.63.24:1236
- server.5 = 125.23.63.23:1234:1235:participant;125.23.63.23:1236
+ server.5 = 125.23.63.23:1234:1235;1236 (non-TLS server)
+ server.5 = 125.23.63.23:1234:1235;1236;1237 (non-TLS + TLS server)
+ server.5 = 125.23.63.23:1234:1235;;1237 (TLS-only server)
+ server.5 = 125.23.63.23:1234:1235:participant;1236 (non-TLS server)
+ server.5 = 125.23.63.23:1234:1235:observer;1236 (non-TLS server)
+ server.5 = 125.23.63.23:1234:1235;125.23.63.24:1236 (non-TLS server)
+ server.5 = 125.23.63.23:1234:1235:participant;125.23.63.23:1236 (non-TLS server)
+ server.5 = 125.23.63.23:1234:1235:participant;125.23.63.23:1236;125.23.63.23:1237 (non-TLS + TLS server)
+ server.5 = 125.23.63.23:1234:1235:participant;;125.23.63.23:1237 (TLS-only server)
<a name="sc_multiaddress"></a>
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/FourLetterWordMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/FourLetterWordMain.java
index de2f79d..39988c3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/FourLetterWordMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/FourLetterWordMain.java
@@ -91,7 +91,29 @@ public static String send4LetterWord(
String cmd,
boolean secure,
int timeout) throws IOException, SSLContextException {
- LOG.info("connecting to {} {}", host, port);
+ return send4LetterWord(host, port, cmd, secure, timeout, null);
+ }
+
+ /**
+ * Send the 4letterword
+ * @param host the destination host
+ * @param port the destination port
+ * @param cmd the 4letterword
+ * @param secure whether to use SSL
+ * @param timeout in milliseconds, maximum time to wait while connecting/reading data
+ * @param sslContext SSL context
+ * @return server response
+ * @throws java.io.IOException
+ * @throws SSLContextException
+ */
+ public static String send4LetterWord(
+ String host,
+ int port,
+ String cmd,
+ boolean secure,
+ int timeout,
+ SSLContext sslContext) throws IOException, SSLContextException {
+ LOG.info("connecting to {}:{} (secure={})", host, port, secure);
Socket sock = null;
BufferedReader reader = null;
@@ -101,14 +123,16 @@ public static String send4LetterWord(
: new InetSocketAddress(InetAddress.getByName(null), port);
if (secure) {
LOG.info("using secure socket");
- try (X509Util x509Util = new ClientX509Util()) {
- SSLContext sslContext = x509Util.getDefaultSSLContext();
- SSLSocketFactory socketFactory = sslContext.getSocketFactory();
- SSLSocket sslSock = (SSLSocket) socketFactory.createSocket();
- sslSock.connect(hostaddress, timeout);
- sslSock.startHandshake();
- sock = sslSock;
+ if (sslContext == null) {
+ try (X509Util x509Util = new ClientX509Util()) {
+ sslContext = x509Util.getDefaultSSLContext();
+ }
}
+ SSLSocketFactory socketFactory = sslContext.getSocketFactory();
+ SSLSocket sslSock = (SSLSocket) socketFactory.createSocket();
+ sslSock.connect(hostaddress, timeout);
+ sslSock.startHandshake();
+ sock = sslSock;
} else {
sock = new Socket();
sock.connect(hostaddress, timeout);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 075f8d7..bcbfbaa 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -482,8 +482,8 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record)
// extract server id x from first part of joiner: server.x
Long sid = Long.parseLong(parts[0].substring(parts[0].lastIndexOf('.') + 1));
QuorumServer qs = new QuorumServer(sid, parts[1]);
- if (qs.clientAddr == null || qs.electionAddr == null || qs.addr == null) {
- throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have 3 ports specified");
+ if ((qs.clientAddr == null && qs.secureClientAddr == null) || qs.electionAddr == null || qs.addr == null) {
+ throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have at least 3 ports specified");
}
// check duplication of addresses and ports
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 6fc3ee2..aba7a29 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -19,6 +19,7 @@
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.configureSSLAuth;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
@@ -152,15 +153,20 @@ public static final class AddressTuple {
public final MultipleAddresses quorumAddr;
public final MultipleAddresses electionAddr;
public final InetSocketAddress clientAddr;
+ public final InetSocketAddress secureClientAddr;
- public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
+ public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr, InetSocketAddress secureClientAddr) {
this.quorumAddr = quorumAddr;
this.electionAddr = electionAddr;
this.clientAddr = clientAddr;
+ this.secureClientAddr = secureClientAddr;
}
}
+ private Boolean isClientAddrFromStatic = null;
+ private Boolean isSecureClientAddrFromStatic = null;
+
private int observerMasterPort;
public int getObserverMasterPort() {
@@ -216,6 +222,7 @@ public static class QuorumServer {
public MultipleAddresses electionAddr = new MultipleAddresses();
public InetSocketAddress clientAddr = null;
+ public InetSocketAddress secureClientAddr = null;
public long id;
@@ -224,20 +231,31 @@ public static class QuorumServer {
public LearnerType type = LearnerType.PARTICIPANT;
public boolean isClientAddrFromStatic = false;
+ public boolean isSecureClientAddrFromStatic = false;
private List<InetSocketAddress> myAddrs;
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
- this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT);
+ this(id, addr, electionAddr, clientAddr, null, LearnerType.PARTICIPANT);
+ }
+
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr,
+ InetSocketAddress secureClientAddr) {
+ this(id, addr, electionAddr, clientAddr, secureClientAddr, LearnerType.PARTICIPANT);
+ }
+
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr,
+ LearnerType learnerType) {
+ this(id, addr, electionAddr, clientAddr, null, learnerType);
}
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) {
- this(id, addr, electionAddr, null, LearnerType.PARTICIPANT);
+ this(id, addr, electionAddr, null, null, LearnerType.PARTICIPANT);
}
// VisibleForTesting
public QuorumServer(long id, InetSocketAddress addr) {
- this(id, addr, null, null, LearnerType.PARTICIPANT);
+ this(id, addr, null, null, null, LearnerType.PARTICIPANT);
}
public long getId() {
@@ -284,10 +302,10 @@ public QuorumServer(long sid, String addressStr) throws ConfigException {
}
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
- this(id, addr, electionAddr, null, type);
+ this(id, addr, electionAddr, null, null, type);
}
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, InetSocketAddress secureClientAddr, LearnerType type) {
this.id = id;
if (addr != null) {
this.addr.addAddress(addr);
@@ -297,6 +315,7 @@ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionA
}
this.type = type;
this.clientAddr = clientAddr;
+ this.secureClientAddr = secureClientAddr;
setMyAddrs();
}
@@ -304,14 +323,15 @@ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionA
private static final String wrongFormat =
" does not have the form server_config or server_config;client_config"
+ " where server_config is the pipe separated list of host:port:port or host:port:port:type"
- + " and client_config is port or host:port";
+ + " and client_config is host:clientPort;host:secureClientPort or clientPort or host:clientPort"
+ + " or ';secureClientPort' or ';host:secureClientPort'";
private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
LearnerType newType = null;
String[] serverClientParts = addressStr.split(";");
String[] serverAddresses = serverClientParts[0].split("\\|");
- if (serverClientParts.length == 2) {
+ if (serverClientParts.length >= 2 && !serverClientParts[1].isEmpty()) {
String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]);
if (clientParts.length > 2) {
throw new ConfigException(addressStr + wrongFormat);
@@ -322,10 +342,27 @@ private void initializeWithAddressString(String addressStr, Function<InetSocketA
try {
clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1]));
} catch (NumberFormatException e) {
- throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
+ throw new ConfigException("Address unresolved: " + clientHostName + ":" + clientParts[clientParts.length - 1]);
}
}
+ if (serverClientParts.length == 3 && !serverClientParts[2].isEmpty()) {
+ String[] secureClientParts = ConfigUtils.getHostAndPort(serverClientParts[2]);
+ if (secureClientParts.length > 2) {
+ throw new ConfigException(addressStr + wrongFormat);
+ }
+
+ // is secure client config a host:port or just a port
+ String secureClientHostName = (secureClientParts.length == 2) ? secureClientParts[0] : "0.0.0.0";
+ try {
+ secureClientAddr = new InetSocketAddress(secureClientHostName, Integer.parseInt(secureClientParts[secureClientParts.length - 1]));
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Address unresolved: " + secureClientHostName + ":" + secureClientParts[secureClientParts.length - 1]);
+ }
+ // set x509 auth provider if not already set
+ configureSSLAuth();
+ }
+
boolean multiAddressEnabled = Boolean.parseBoolean(
System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
if (!multiAddressEnabled && serverAddresses.length > 1) {
@@ -338,9 +375,8 @@ private void initializeWithAddressString(String addressStr, Function<InetSocketA
CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES));
for (String serverAddress : serverAddresses) {
- String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
- if ((serverClientParts.length > 2) || (serverParts.length < 3)
- || (serverParts.length > 4)) {
+ String[] serverParts = ConfigUtils.getHostAndPort(serverAddress);
+ if ((serverParts.length < 3) || (serverParts.length > 4)) {
throw new ConfigException(addressStr + wrongFormat);
}
@@ -415,6 +451,7 @@ private void setMyAddrs() {
this.myAddrs = new ArrayList<>();
this.myAddrs.addAll(this.addr.getAllAddresses());
this.myAddrs.add(this.clientAddr);
+ this.myAddrs.add(this.secureClientAddr);
this.myAddrs.addAll(this.electionAddr.getAllAddresses());
this.myAddrs = excludedSpecialAddresses(this.myAddrs);
}
@@ -448,13 +485,24 @@ public String toString() {
sw.append(":participant");
}
+ boolean clientPortSpecAdded = false;
if (clientAddr != null && !isClientAddrFromStatic) {
+ clientPortSpecAdded = true;
sw.append(";");
sw.append(delimitedHostString(clientAddr));
sw.append(":");
sw.append(String.valueOf(clientAddr.getPort()));
}
+ if (secureClientAddr != null & !isSecureClientAddrFromStatic) {
+ if (!clientPortSpecAdded) {
+ sw.append(";");
+ }
+ sw.append(";");
+ sw.append(delimitedHostString(secureClientAddr));
+ sw.append(":");
+ sw.append(String.valueOf(secureClientAddr.getPort()));
+ }
return sw.toString();
}
@@ -463,7 +511,7 @@ public int hashCode() {
return 42; // any arbitrary constant will do
}
- private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
+ private static boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
return (addr1 != null || addr2 == null)
&& (addr1 == null || addr2 != null)
&& (addr1 == null || addr2 == null || addr1.equals(addr2));
@@ -483,12 +531,16 @@ public boolean equals(Object o) {
if (!electionAddr.equals(qs.electionAddr)) {
return false;
}
- return checkAddressesEqual(clientAddr, qs.clientAddr);
+ if (!checkAddressesEqual(clientAddr, qs.clientAddr)) {
+ return false;
+ }
+ return checkAddressesEqual(secureClientAddr, qs.secureClientAddr);
}
public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException {
List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses());
otherAddrs.add(s.clientAddr);
+ otherAddrs.add(s.secureClientAddr);
otherAddrs.addAll(s.electionAddr.getAllAddresses());
otherAddrs = excludedSpecialAddresses(otherAddrs);
@@ -709,6 +761,7 @@ public synchronized void setCurrentVote(Vote v) {
* value of one indicates the default backlog will be used.
*/
protected int clientPortListenBacklog = -1;
+ protected int maxClientCnxns = -1;
/**
* The number of ticks that the initial synchronization phase can take
@@ -994,7 +1047,7 @@ public void recreateSocketAddresses(long id) {
if (qs != null) {
qs.recreateSocketAddresses();
if (id == getMyId()) {
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr, qs.secureClientAddr);
}
}
}
@@ -1040,9 +1093,15 @@ public InetSocketAddress getClientAddress() {
return (addrs == null) ? null : addrs.clientAddr;
}
- private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
+ public InetSocketAddress getSecureClientAddress() {
+ final AddressTuple addrs = myAddrs.get();
+ return (addrs == null) ? null : addrs.secureClientAddr;
+ }
+
+ private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr,
+ InetSocketAddress secureClientAddr) {
synchronized (QV_LOCK) {
- myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
+ myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr, secureClientAddr));
QV_LOCK.notifyAll();
}
}
@@ -1305,7 +1364,7 @@ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir
private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException {
QuorumServer quorumServer = quorumPeers.get(myid);
if (null == quorumServer) {
- throw new IOException("No QuorumServer correspoding to myid " + myid);
+ throw new IOException("No QuorumServer corresponding to myid " + myid);
}
if (null == quorumServer.clientAddr) {
return new InetSocketAddress(clientPort);
@@ -1825,6 +1884,16 @@ public void setClientPortListenBacklog(int backlog) {
this.clientPortListenBacklog = backlog;
}
+ /** The server max client connections */
+ public int getMaxClientCnxns() {
+ return maxClientCnxns;
+ }
+
+ /** Sets the server's max client connections */
+ public void setMaxClientCnxns(int maxClientCnxns) {
+ this.maxClientCnxns = maxClientCnxns;
+ }
+
/**
* Get the number of ticks that the initial synchronization phase can take
*/
@@ -1975,7 +2044,7 @@ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk)
try {
String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
- QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
+ QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig(), needEraseSecureClientInfoFromStaticConfig());
} catch (IOException e) {
LOG.error("Error closing file", e);
}
@@ -1989,7 +2058,16 @@ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk)
}
QuorumServer qs = qv.getAllMembers().get(getMyId());
if (qs != null) {
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr, qs.secureClientAddr);
+
+ // we only set this once, because quorum verifier can change based on dynamic reconfig
+ if (isClientAddrFromStatic == null) {
+ isClientAddrFromStatic = qs.isClientAddrFromStatic;
+ }
+
+ if (isSecureClientAddrFromStatic == null) {
+ isSecureClientAddrFromStatic = qs.isSecureClientAddrFromStatic;
+ }
}
updateObserverMasterList();
return prevQV;
@@ -2005,6 +2083,11 @@ private boolean needEraseClientInfoFromStaticConfig() {
return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic);
}
+ private boolean needEraseSecureClientInfoFromStaticConfig() {
+ QuorumServer server = quorumVerifier.getAllMembers().get(getMyId());
+ return (server != null && server.secureClientAddr != null && !server.isSecureClientAddrFromStatic);
+ }
+
/**
* Get an instance of LeaderElection
*/
@@ -2270,6 +2353,7 @@ public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long z
}
InetSocketAddress oldClientAddr = getClientAddress();
+ InetSocketAddress oldSecureClientAddr = getSecureClientAddress();
// update last committed quorum verifier, write the new config to disk
// and restart leader election if config changed.
@@ -2293,8 +2377,53 @@ public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long z
}
QuorumServer myNewQS = newMembers.get(getMyId());
- if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) {
- cnxnFactory.reconfigure(myNewQS.clientAddr);
+ if (myNewQS != null) {
+ if (myNewQS.clientAddr == null) {
+ if (!isClientAddrFromStatic && oldClientAddr != null && cnxnFactory != null) {
+ // clientAddr omitted in new config, shutdown cnxnFactory
+ cnxnFactory.shutdown();
+ cnxnFactory = null;
+ }
+ } else if (!myNewQS.clientAddr.equals(oldClientAddr)) {
+ // clientAddr has changed
+ if (cnxnFactory == null) {
+ // start cnxnFactory first
+ try {
+ cnxnFactory = ServerCnxnFactory.createFactory();
+ cnxnFactory.configure(myNewQS.clientAddr, getMaxClientCnxns(), getClientPortListenBacklog(), false);
+ cnxnFactory.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ cnxnFactory.reconfigure(myNewQS.clientAddr);
+ }
+ }
+
+ if (myNewQS.secureClientAddr == null) {
+ if (!isSecureClientAddrFromStatic && oldSecureClientAddr != null && secureCnxnFactory != null) {
+ // secureClientAddr omitted in new config, shutdown secureCnxnFactory
+ secureCnxnFactory.shutdown();
+ secureCnxnFactory = null;
+ }
+ } else if (!myNewQS.secureClientAddr.equals(oldSecureClientAddr)) {
+ // secureClientAddr has changed
+ if (secureCnxnFactory == null) {
+ // start secureCnxnFactory first
+ try {
+ configureSSLAuth();
+ secureCnxnFactory = ServerCnxnFactory.createFactory();
+ secureCnxnFactory.configure(myNewQS.secureClientAddr, getMaxClientCnxns(), getClientPortListenBacklog(), true);
+ secureCnxnFactory.start();
+
+ } catch (IOException | ConfigException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ secureCnxnFactory.reconfigure(myNewQS.secureClientAddr);
+ }
+ }
+
updateThreadName();
}
@@ -2673,6 +2802,7 @@ public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOExce
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
+ quorumPeer.setMaxClientCnxns(config.getMaxClientCnxns());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index 05246ba..d72d844 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -570,9 +570,11 @@ public void write(Writer out) throws IOException {
* it will remove them.
* If it needs to erase client port information left by the old config,
* "eraseClientPortAddress" should be set true.
+ * If it needs to erase secure client port information left by the old config,
+ * "eraseSecureClientPortAddress" should be set true.
* It should also updates dynamic file pointer on reconfig.
*/
- public static void editStaticConfig(final String configFileStr, final String dynamicFileStr, final boolean eraseClientPortAddress) throws IOException {
+ public static void editStaticConfig(final String configFileStr, final String dynamicFileStr, final boolean eraseClientPortAddress, final boolean eraseSecureClientPortAddress) throws IOException {
// Some tests may not have a static config file.
if (configFileStr == null) {
return;
@@ -604,7 +606,10 @@ public void write(Writer out) throws IOException {
|| key.startsWith("peerType")
|| (eraseClientPortAddress
&& (key.startsWith("clientPort")
- || key.startsWith("clientPortAddress")))) {
+ || key.startsWith("clientPortAddress")))
+ || (eraseSecureClientPortAddress
+ && (key.startsWith("secureClientPort")
+ || key.startsWith("secureClientPortAddress")))) {
// not writing them back to static file
continue;
}
@@ -659,6 +664,7 @@ void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityM
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath);
setupMyId();
setupClientPort();
+ setupSecureClientPort();
setupPeerType();
checkValidity();
}
@@ -765,6 +771,29 @@ private void setupClientPort() throws ConfigException {
}
}
+ private void setupSecureClientPort() throws ConfigException {
+ if (serverId == UNSET_SERVERID) {
+ return;
+ }
+ QuorumServer qs = quorumVerifier.getAllMembers().get(serverId);
+ if (secureClientPortAddress != null && qs != null && qs.secureClientAddr != null) {
+ if ((!secureClientPortAddress.getAddress().isAnyLocalAddress() && !secureClientPortAddress.equals(qs.secureClientAddr)) || (
+ secureClientPortAddress.getAddress().isAnyLocalAddress()
+ && secureClientPortAddress.getPort() != qs.secureClientAddr.getPort())) {
+ throw new ConfigException("secure client address for this server (id = " + serverId
+ + ") in static config file is " + secureClientPortAddress
+ + " is different from secure client address found in dynamic file: " + qs.secureClientAddr);
+ }
+ }
+ if (qs != null && qs.secureClientAddr != null) {
+ secureClientPortAddress = qs.secureClientAddr;
+ }
+ if (qs != null && qs.secureClientAddr == null) {
+ qs.secureClientAddr = secureClientPortAddress;
+ qs.isSecureClientAddrFromStatic = true;
+ }
+ }
+
private void setupPeerType() {
// Warn about inconsistent peer type
LearnerType roleByServersList = quorumVerifier.getObservingMembers().containsKey(serverId)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
index 5ed2d42..0cad9ae 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
@@ -190,6 +190,7 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServ
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
+ quorumPeer.setMaxClientCnxns(config.getMaxClientCnxns());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTLSTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTLSTest.java
new file mode 100644
index 0000000..79baeab
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTLSTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread.UNSET_STATIC_CLIENTPORT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.KeyStoreFileType;
+import org.apache.zookeeper.common.X509KeyType;
+import org.apache.zookeeper.common.X509TestContext;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ReconfigTest;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class QuorumPeerMainTLSTest extends QuorumPeerTestBase {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMainTLSTest.class);
+ private static File tempDir;
+ private static X509TestContext x509TestContext = null;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ Security.addProvider(new BouncyCastleProvider());
+ tempDir = ClientBase.createEmptyTestDir();
+ x509TestContext = X509TestContext.newBuilder()
+ .setTempDir(tempDir)
+ .setKeyStoreKeyType(X509KeyType.EC)
+ .setTrustStoreKeyType(X509KeyType.EC)
+ .build();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+
+ // TODO - test reconfig - NIO cnxn factory initially, reconfig to listen on TLS port, should fail coz Netty cnxn factory is needed
+ // TODO - equivalent of testReconfigRemoveClientFromStatic, but for secureClientPort
+ interface QuorumConfigBuilder {
+ String build(int id, String role, int quorumPort, int leaderPort, int clientPort, int secureClientPort);
+ }
+
+ static class MaybeSecureServers extends Servers {
+ public int[] quorumPorts;
+ public int[] leaderPorts;
+ public boolean[] isSecureClient;
+ public int[] secureClientPorts;
+ int numParticipants;
+ int numObservers;
+ String quorumCfg;
+
+ String otherCfg;
+
+ public MaybeSecureServers(int numParticipants, int numObservers, String otherCfg, QuorumConfigBuilder quorumConfigBuilder) throws IOException {
+ this.numParticipants = numParticipants;
+ this.numObservers = numObservers;
+ this.otherCfg = otherCfg;
+ int SIZE = numParticipants + numObservers;
+
+ this.mt = new MainThread[SIZE];
+ this.zk = new ZooKeeper[SIZE];
+ this.quorumPorts = new int[SIZE];
+ this.leaderPorts = new int[SIZE];
+ this.clientPorts = new int[SIZE];
+ this.adminPorts = new int[SIZE];
+ this.secureClientPorts = new int[SIZE];
+ this.isSecureClient = new boolean[SIZE];
+
+ StringBuilder quorumCfg = new StringBuilder();
+
+
+ for (int i = 0; i < SIZE; i++){
+ this.quorumPorts[i] = PortAssignment.unique();
+ this.leaderPorts[i] = PortAssignment.unique();
+ this.clientPorts[i] = PortAssignment.unique();
+ this.adminPorts[i] = PortAssignment.unique();
+ this.secureClientPorts[i] = PortAssignment.unique();
+ String role = i < numParticipants ? "participant" : "observer";
+ String serverEntry = quorumConfigBuilder.build(i, role, this.quorumPorts[i], this.leaderPorts[i], this.clientPorts[i], this.secureClientPorts[i]);
+ quorumCfg.append(serverEntry).append("\n");
+ if (serverEntry.endsWith("" + this.secureClientPorts[i])) {
+ this.isSecureClient[i] = true;
+ }
+ }
+
+ this.quorumCfg = quorumCfg.toString();
+ for (int i = 0; i < SIZE; i++){
+ this.mt[i] = new MainThread(i, UNSET_STATIC_CLIENTPORT, this.adminPorts[i], null, this.quorumCfg, this.otherCfg, null, true, null);
+ }
+ }
+
+ public void restartSecureClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
+ if (zk[clientIndex] != null) {
+ zk[clientIndex].close();
+ }
+
+ isSecureClient[clientIndex] = true;
+ zk[clientIndex] = new ZooKeeper(
+ "127.0.0.1:" + secureClientPorts[clientIndex],
+ ClientBase.CONNECTION_TIMEOUT,
+ watcher, getClientTLSConfigs(x509TestContext));
+
+
+ }
+
+ public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
+ int index = 0;
+ for (MainThread t : mt) {
+ if (!t.isAlive()) {
+ t.start();
+ index++;
+ }
+ }
+ for (int i = 0; i < zk.length; i++) {
+ if (isSecureClient[i]) {
+ restartSecureClient(i, watcher);
+ } else {
+ restartClient(i, watcher);
+ }
+ }
+ }
+ }
+
+ static Map<String, String> getServerTLSConfigs(X509TestContext x509TestContext) throws IOException {
+ Map<String, String> sslConfigs = new HashMap<>();
+ sslConfigs.put("ssl.keyStore.location", x509TestContext.getKeyStoreFile(KeyStoreFileType.PEM).getAbsolutePath());
+ sslConfigs.put("ssl.trustStore.location", x509TestContext.getTrustStoreFile(KeyStoreFileType.PEM).getAbsolutePath());
+ sslConfigs.put("ssl.keyStore.type", "PEM");
+ sslConfigs.put("ssl.trustStore.type", "PEM");
+ // Netty is required for TLS
+ sslConfigs.put("serverCnxnFactory", org.apache.zookeeper.server.NettyServerCnxnFactory.class.getName());
+ return sslConfigs;
+ }
+
+ static ZKClientConfig getClientTLSConfigs(X509TestContext x509TestContext) throws IOException {
+ if (x509TestContext == null) {
+ throw new RuntimeException("x509TestContext cannot be null");
+ }
+ File clientKeyStore = x509TestContext.getKeyStoreFile(KeyStoreFileType.PEM);
+ File clientTrustStore = x509TestContext.getTrustStoreFile(KeyStoreFileType.PEM);
+
+ ZKClientConfig zKClientConfig = new ZKClientConfig();
+ zKClientConfig.setProperty("zookeeper.client.secure", "true");
+ zKClientConfig.setProperty("zookeeper.ssl.keyStore.location", clientKeyStore.getAbsolutePath());
+ zKClientConfig.setProperty("zookeeper.ssl.trustStore.location", clientTrustStore.getAbsolutePath());
+ zKClientConfig.setProperty("zookeeper.ssl.keyStore.type", "PEM");
+ zKClientConfig.setProperty("zookeeper.ssl.trustStore.type", "PEM");
+ // only netty supports TLS
+ zKClientConfig.setProperty("zookeeper.clientCnxnSocket", org.apache.zookeeper.ClientCnxnSocketNetty.class.getName());
+ return zKClientConfig;
+ }
+
+ /**
+ * Starts a single server in replicated mode
+ */
+ @Test
+ public void testTLSQuorumPeers() throws IOException, InterruptedException {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("standaloneEnabled", "false");
+ configMap.put("authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
+ configMap.putAll(getServerTLSConfigs(x509TestContext));
+
+ StringBuilder configBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : configMap.entrySet()) {
+ configBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+ }
+
+ MaybeSecureServers maybeSecureServers = new MaybeSecureServers(3, 2, configBuilder.toString(),
+ (id, role, quorumPort, leaderPort, clientPort, secureClientPort) -> String.format("server.%d=127.0.0.1:%d:%d:%s;;127.0.0.1:%d", id, quorumPort, leaderPort, role, secureClientPort));
+
+ // wire to "servers" of QuorumPeerTestBase, so it can be destroyed in QuorumPeerTestBase.tearDown()
+ servers = maybeSecureServers;
+
+ // start servers and clients
+ maybeSecureServers.restartAllServersAndClients(this);
+
+ // wait for clients to connect
+ waitForAll(maybeSecureServers, ZooKeeper.States.CONNECTED);
+
+ // Find and log leader
+ maybeSecureServers.findLeader();
+
+ QuorumPeer qp0 = maybeSecureServers.mt[0].getQuorumPeer();
+
+ assertNotNull(qp0);
+
+ // verify no listener on client port
+ assertNull(qp0.cnxnFactory);
+ assertNull(qp0.getClientAddress());
+ assertEquals(-1, qp0.getClientPort());
+
+ // verify valid secure client port listener exists
+ assertNotNull(qp0.secureCnxnFactory);
+ assertNotNull(qp0.getSecureClientAddress());
+ assertEquals(maybeSecureServers.secureClientPorts[0], qp0.getSecureClientPort());
+ assertEquals(maybeSecureServers.secureClientPorts[0], qp0.getSecureClientAddress().getPort());
+ }
+
+ @Test
+ public void reconfigFromClientPortToSecureClientPort() throws IOException, InterruptedException, KeeperException {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("reconfigEnabled", "true");
+ configMap.put("authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
+ configMap.put("DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
+ configMap.putAll(getServerTLSConfigs(x509TestContext));
+
+ StringBuilder configBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : configMap.entrySet()) {
+ configBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+ }
+
+ MaybeSecureServers maybeSecureServers = new MaybeSecureServers(3, 0, configBuilder.toString(),
+ (id, role, quorumPort, leaderPort, clientPort, secureClientPort) -> String.format("server.%d=127.0.0.1:%d:%d:%s;127.0.0.1:%d", id, quorumPort, leaderPort, role, clientPort));
+
+ servers = maybeSecureServers;
+
+ maybeSecureServers.restartAllServersAndClients(this);
+
+ waitForAll(maybeSecureServers, ZooKeeper.States.CONNECTED);
+
+ ZooKeeperAdmin zkAdmin = new ZooKeeperAdmin("127.0.0.1:" + maybeSecureServers.clientPorts[0], ClientBase.CONNECTION_TIMEOUT, this);
+ zkAdmin.addAuthInfo("digest", "super:test".getBytes());
+
+ List<String> joiningServers = new ArrayList<>();
+ List<String> leavingServers = new ArrayList<>();
+
+ int reconfigIndex = 1;
+ leavingServers.add(Integer.toString(reconfigIndex));
+ joiningServers.add(String.format("server.%d=127.0.0.1:%d:%d:%s;;127.0.0.1:%d", reconfigIndex,
+ maybeSecureServers.quorumPorts[reconfigIndex], maybeSecureServers.leaderPorts[reconfigIndex],
+ "participant", maybeSecureServers.secureClientPorts[reconfigIndex]));
+
+ ReconfigTest.reconfig(zkAdmin, null, leavingServers, null, -1);
+ LOG.info("Reconfig REMOVE done with leavingServers={}!", leavingServers);
+ ReconfigTest.testServerHasConfig(maybeSecureServers.zk[0], null, leavingServers);
+
+ ReconfigTest.reconfig(zkAdmin, joiningServers, null, null, -1);
+ LOG.info("Reconfig ADD done with joiningServers={}!", joiningServers);
+
+ ReconfigTest.testServerHasConfig(maybeSecureServers.zk[0], joiningServers, null);
+
+ assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + maybeSecureServers.clientPorts[reconfigIndex], 5000, false));
+
+ maybeSecureServers.restartSecureClient(reconfigIndex, this);
+ waitForOne(maybeSecureServers.zk[reconfigIndex], ZooKeeper.States.CONNECTED);
+ ReconfigTest.testNormalOperation(maybeSecureServers.zk[0], maybeSecureServers.zk[reconfigIndex]);
+ }
+
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index c384280..27e2e32 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -60,14 +60,17 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
@AfterEach
public void tearDown() throws Exception {
- if (servers == null || servers.mt == null) {
+ if (servers == null) {
LOG.info("No servers to shutdown!");
return;
}
- for (int i = 0; i < numServers; i++) {
- if (i < servers.mt.length) {
- servers.mt[i].shutdown();
- }
+
+ if (servers.zk != null) {
+ servers.shutdownAllClients();
+ }
+
+ if (servers.mt != null) {
+ servers.shutDownAllServers();
}
}
@@ -427,6 +430,12 @@ public void shutDownAllServers() throws InterruptedException {
}
}
+ public void shutdownAllClients() throws InterruptedException {
+ for (ZooKeeper zk : zk) {
+ zk.close(5000);
+ }
+ }
+
public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
int index = 0;
for (MainThread t : mt) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumServerTest.java
index 7163091..f8831fc 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumServerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumServerTest.java
@@ -18,6 +18,8 @@
package org.apache.zookeeper.server.quorum;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.net.InetSocketAddress;
import org.apache.zookeeper.KeeperException;
@@ -69,6 +71,17 @@ public void testToString() throws ConfigException {
expected = "example.com:1234:1236:participant;0.0.0.0:1237";
qs = new QuorumServer(0, provided);
assertEquals(expected, qs.toString(), "Use hostname");
+
+ provided = "example.com:1234:1236:participant;1237;1238";
+ expected = "example.com:1234:1236:participant;0.0.0.0:1237;0.0.0.0:1238";
+ qs = new QuorumServer(0, provided);
+ assertEquals(expected, qs.toString(), "clientPort and secureClientPort");
+
+ provided = ipv4config + ":participant;;1.2.3.4:1237";
+ expected = ipv4config + ":participant;;1.2.3.4:1237";
+ qs = new QuorumServer(0, provided);
+ assertEquals(expected, qs.toString(), "Only secureClientPort");
+
}
@Test
@@ -112,6 +125,9 @@ public void shouldAllowMultipleAddressesWhenMultiAddressFeatureIsEnabled() throw
System.setProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "true");
QuorumServer qs = new QuorumServer(0, "127.0.0.1:1234:1236|127.0.0.1:2234:2236");
assertEquals("127.0.0.1:1234:1236|127.0.0.1:2234:2236:participant", qs.toString(), "MultiAddress parse error");
+
+ qs = new QuorumServer(0, "127.0.0.1:1234:1236|127.0.0.1:2234:2236;1237;1238");
+ assertEquals("127.0.0.1:1234:1236|127.0.0.1:2234:2236:participant;0.0.0.0:1237;0.0.0.0:1238", qs.toString(), "MultiAddress parse with clientPort and secureClientPort");
}
@Test
@@ -147,4 +163,31 @@ public void testDuplicate() {
});
}
+ @Test
+ public void testClientAddrAndSecureClientAddr() throws ConfigException {
+ QuorumPeer.QuorumServer qs = new QuorumPeer.QuorumServer(0, "example.com:1234:1236:participant;1237;1238");
+ assertNotNull(qs.clientAddr, "clientPort specified");
+ assertNotNull(qs.secureClientAddr, "secureClientPort specified");
+
+ qs = new QuorumPeer.QuorumServer(0, "example.com:1234:1236:participant;;1238");
+ assertNull(qs.clientAddr, "clientPort not specified");
+ assertNotNull(qs.secureClientAddr, "secureClientPort specified");
+
+ qs = new QuorumPeer.QuorumServer(0, "example.com:1234:1236:participant;1237;");
+ assertNotNull(qs.clientAddr, "clientPort specified");
+ assertNull(qs.secureClientAddr, "secureClientPort not specified");
+
+ qs = new QuorumPeer.QuorumServer(0, "example.com:1234:1236:participant;1237");
+ assertNotNull(qs.clientAddr, "clientPort specified");
+ assertNull(qs.secureClientAddr, "secureClientPort not specified");
+
+ qs = new QuorumPeer.QuorumServer(0, "example.com:1234:1236:participant");
+ assertNull(qs.clientAddr, "clientPort not specified");
+ assertNull(qs.secureClientAddr, "secureClientPort not specified");
+
+ qs = new QuorumPeer.QuorumServer(0, "example.com:1234:1236:participant;;");
+ assertNull(qs.clientAddr, "clientPort not specified");
+ assertNull(qs.secureClientAddr, "secureClientPort not specified");
+ }
+
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigLegacyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigLegacyTest.java
index 2f9a530..6f95689 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigLegacyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigLegacyTest.java
@@ -18,6 +18,8 @@
package org.apache.zookeeper.server.quorum;
+import static org.apache.zookeeper.server.quorum.QuorumPeerMainTLSTest.getClientTLSConfigs;
+import static org.apache.zookeeper.server.quorum.QuorumPeerMainTLSTest.getServerTLSConfigs;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -25,22 +27,57 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.security.Security;
import java.util.ArrayList;
+import java.util.Map;
import java.util.Properties;
+import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.X509KeyType;
+import org.apache.zookeeper.common.X509TestContext;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ReconfigTest;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
public class ReconfigLegacyTest extends QuorumPeerTestBase {
private static final int SERVER_COUNT = 3;
+ private static File tempDir;
+ private static X509TestContext x509TestContext = null;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ Security.addProvider(new BouncyCastleProvider());
+ tempDir = ClientBase.createEmptyTestDir();
+ x509TestContext = X509TestContext.newBuilder()
+ .setTempDir(tempDir)
+ .setKeyStoreKeyType(X509KeyType.EC)
+ .setTrustStoreKeyType(X509KeyType.EC)
+ .build();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ } catch (IOException e) {
+ // ignore
+ }
+ }
@BeforeEach
public void setup() {
@@ -65,7 +102,7 @@ public void testConfigFileBackwardCompatibility() throws Exception {
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=localhost:" + PortAssignment.unique() + ":" + PortAssignment.unique()
- + ":participant;localhost:" + clientPorts[i];
+ + ":participant;localhost:" + clientPorts[i];
allServers.add(server);
sb.append(server + "\n");
}
@@ -144,14 +181,17 @@ public void testConfigFileBackwardCompatibility() throws Exception {
* and new port added to dynamic file.
* @throws Exception
*/
- @Test
- public void testReconfigRemoveClientFromStatic() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testReconfigRemoveClientFromStatic(boolean isSecure) throws Exception {
final int[] clientPorts = new int[SERVER_COUNT];
+ final int[] secureClientPorts = new int[SERVER_COUNT];
+ final int[] adminServerPorts = new int[SERVER_COUNT];
final int[] quorumPorts = new int[SERVER_COUNT];
final int[] electionPorts = new int[SERVER_COUNT];
final int changedServerId = 0;
- final int newClientPort = PortAssignment.unique();
+ final int newClientPortOrSecureClientPort = PortAssignment.unique();
StringBuilder sb = new StringBuilder();
ArrayList<String> allServers = new ArrayList<>();
@@ -159,6 +199,8 @@ public void testReconfigRemoveClientFromStatic() throws Exception {
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
+ secureClientPorts[i] = PortAssignment.unique();
+ adminServerPorts[i] = PortAssignment.unique();
quorumPorts[i] = PortAssignment.unique();
electionPorts[i] = PortAssignment.unique();
@@ -167,7 +209,12 @@ public void testReconfigRemoveClientFromStatic() throws Exception {
sb.append(server + "\n");
if (i == changedServerId) {
- newServers.add(server + ";0.0.0.0:" + newClientPort);
+ if (isSecure) {
+ newServers.add(server + ";;0.0.0.0:" + newClientPortOrSecureClientPort);
+ } else {
+ newServers.add(server + ";0.0.0.0:" + newClientPortOrSecureClientPort);
+ }
+
} else {
newServers.add(server);
}
@@ -178,27 +225,51 @@ public void testReconfigRemoveClientFromStatic() throws Exception {
ZooKeeper[] zk = new ZooKeeper[SERVER_COUNT];
ZooKeeperAdmin[] zkAdmin = new ZooKeeperAdmin[SERVER_COUNT];
+ Map<String, String> configMap = getServerTLSConfigs(x509TestContext);
+ StringBuilder configBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : configMap.entrySet()) {
+ configBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+ }
+
// Start the servers with a static config file, without a dynamic config file.
for (int i = 0; i < SERVER_COUNT; i++) {
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, false);
+ if (isSecure) {
+ mt[i] = new MainThread(i, MainThread.UNSET_STATIC_CLIENTPORT, adminServerPorts[i], secureClientPorts[i], quorumCfgSection, configBuilder.toString(), null, false, null);
+ } else {
+ mt[i] = new MainThread(i, clientPorts[i], adminServerPorts[i], quorumCfgSection, null, null, false);
+ }
mt[i].start();
}
+
+ ZKClientConfig clientConfig;
+ if (isSecure) {
+ clientConfig = getClientTLSConfigs(x509TestContext);
+ } else {
+ clientConfig = null;
+ }
+
// Check that when a server starts from old style config, it should keep the client
// port in static config file.
for (int i = 0; i < SERVER_COUNT; i++) {
+ String cnxnString = "127.0.0.1:" + (isSecure ? secureClientPorts[i] : clientPorts[i]);
assertTrue(
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
+ ClientBase.waitForServerUp(cnxnString, CONNECTION_TIMEOUT, isSecure, clientConfig),
"waiting for server " + i + " being up");
- zk[i] = ClientBase.createZKClient("127.0.0.1:" + clientPorts[i]);
- zkAdmin[i] = new ZooKeeperAdmin("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[i] = ClientBase.createZKClient(cnxnString, CONNECTION_TIMEOUT, CONNECTION_TIMEOUT, clientConfig);
+ zkAdmin[i] = new ZooKeeperAdmin(cnxnString, ClientBase.CONNECTION_TIMEOUT, this, clientConfig);
zkAdmin[i].addAuthInfo("digest", "super:test".getBytes());
ReconfigTest.testServerHasConfig(zk[i], allServers, null);
Properties cfg = readPropertiesFromFile(mt[i].confFile);
assertTrue(cfg.containsKey("dynamicConfigFile"));
- assertTrue(cfg.containsKey("clientPort"));
+ if (isSecure) {
+ assertTrue(cfg.containsKey("secureClientPort"));
+ } else {
+ assertTrue(cfg.containsKey("clientPort"));
+ }
+
}
ReconfigTest.testNormalOperation(zk[0], zk[1]);
@@ -215,10 +286,11 @@ public void testReconfigRemoveClientFromStatic() throws Exception {
for (int i = 0; i < SERVER_COUNT; i++) {
ReconfigTest.testServerHasConfig(zk[i], newServers, null);
Properties staticCfg = readPropertiesFromFile(mt[i].confFile);
+ String configKey = isSecure ? "secureClientPort" : "clientPort";
if (i == changedServerId) {
- assertFalse(staticCfg.containsKey("clientPort"));
+ assertFalse(staticCfg.containsKey(configKey));
} else {
- assertTrue(staticCfg.containsKey("clientPort"));
+ assertTrue(staticCfg.containsKey(configKey));
}
}
@@ -255,7 +327,7 @@ public void testRestartZooKeeperServer() throws Exception {
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
- + ":participant;127.0.0.1:" + clientPorts[i];
+ + ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
index b8b8e48..843b804 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
@@ -40,6 +40,7 @@
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
+import javax.net.ssl.SSLContext;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.TestableZooKeeper;
@@ -49,8 +50,11 @@
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception.SSLContextException;
+import org.apache.zookeeper.common.X509Util;
+import org.apache.zookeeper.common.ZKConfig;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -127,7 +131,7 @@ public synchronized boolean isConnected() {
protected synchronized String connectionDescription() {
return String.format("connected(%s), syncConnected(%s), readOnlyConnected(%s)",
- connected, syncConnected, readOnlyConnected);
+ connected, syncConnected, readOnlyConnected);
}
public synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
@@ -151,7 +155,7 @@ public synchronized void waitForSyncConnected(long timeout) throws InterruptedEx
if (!syncConnected) {
throw new TimeoutException(
"Failed to connect to read-write ZooKeeper server: "
- + connectionDescription());
+ + connectionDescription());
}
}
public synchronized void waitForReadOnlyConnected(long timeout) throws InterruptedException, TimeoutException {
@@ -164,7 +168,7 @@ public synchronized void waitForReadOnlyConnected(long timeout) throws Interrupt
if (!readOnlyConnected) {
throw new TimeoutException(
"Failed to connect in read-only mode to ZooKeeper server: "
- + connectionDescription());
+ + connectionDescription());
}
}
public synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
@@ -258,16 +262,27 @@ public static List<HostPort> parseHostPortList(String hplist) {
}
public static boolean waitForServerUp(String hp, long timeout) {
- return waitForServerUp(hp, timeout, false);
+ return waitForServerUp(hp, timeout, false, null);
}
public static boolean waitForServerUp(String hp, long timeout, boolean secure) {
+ return waitForServerUp(hp, timeout, secure, null);
+ }
+
+ public static boolean waitForServerUp(String hp, long timeout, boolean secure, ZKConfig zkConfig) {
long start = Time.currentElapsedTime();
while (true) {
try {
// if there are multiple hostports, just take the first one
HostPort hpobj = parseHostPortList(hp).get(0);
- String result = send4LetterWord(hpobj.host, hpobj.port, "stat", secure);
+ SSLContext sslContext = null;
+ String result;
+ if (zkConfig != null) {
+ try (X509Util x509Util = new ClientX509Util()) {
+ sslContext = x509Util.createSSLContext(zkConfig);
+ }
+ }
+ result = send4LetterWord(hpobj.host, hpobj.port, "stat", secure, 5000, sslContext);
if (result.startsWith("Zookeeper version:") && !result.contains("READ-ONLY")) {
return true;
}
@@ -290,6 +305,7 @@ public static boolean waitForServerUp(String hp, long timeout, boolean secure) {
// ignore
}
}
+ LOG.error("server failed to come up: {}", hp);
return false;
}
@@ -395,22 +411,29 @@ private static int getPort(String hostPort) {
return Integer.parseInt(portstr);
}
+ public static void startServerInstance(File dataDir,
+ ServerCnxnFactory factory,
+ String hostPort,
+ int serverId) throws IOException, InterruptedException {
+ startServerInstance(dataDir, factory, hostPort, serverId, null);
+ }
+
/**
* Starting the given server instance
*/
public static void startServerInstance(
- File dataDir,
- ServerCnxnFactory factory,
- String hostPort,
- int serverId) throws IOException, InterruptedException {
+ File dataDir,
+ ServerCnxnFactory factory,
+ String hostPort,
+ int serverId, ZKConfig zkConfig) throws IOException, InterruptedException {
final int port = getPort(hostPort);
LOG.info("STARTING server instance 127.0.0.1:{}", port);
ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
zks.setCreateSessionTrackerServerId(serverId);
factory.startup(zks);
assertTrue(
- ClientBase.waitForServerUp("127.0.0.1:" + port, CONNECTION_TIMEOUT, factory.isSecure()),
- "waiting for server up");
+ ClientBase.waitForServerUp("127.0.0.1:" + port, CONNECTION_TIMEOUT, factory.isSecure(), zkConfig),
+ "waiting for server up");
}
/**
@@ -428,9 +451,9 @@ public static void startServerInstance(
* for more information.
*/
public static ServerCnxnFactory createNewServerInstance(
- ServerCnxnFactory factory,
- String hostPort,
- int maxCnxns) throws IOException, InterruptedException {
+ ServerCnxnFactory factory,
+ String hostPort,
+ int maxCnxns) throws IOException, InterruptedException {
final int port = getPort(hostPort);
LOG.info("CREATING server instance 127.0.0.1:{}", port);
if (factory == null) {
@@ -459,8 +482,8 @@ static void shutdownServerInstance(ServerCnxnFactory factory, String hostPort) {
final int PORT = getPort(hostPort);
assertTrue(
- ClientBase.waitForServerDown("127.0.0.1:" + PORT, CONNECTION_TIMEOUT, factory.isSecure()),
- "waiting for server down");
+ ClientBase.waitForServerDown("127.0.0.1:" + PORT, CONNECTION_TIMEOUT, factory.isSecure()),
+ "waiting for server down");
}
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
index 20ec82d..02b8ff6 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
@@ -1217,6 +1217,7 @@ private void assertRemotePeerMXBeanAttributes(QuorumServer qs, String beanName)
private static class ServerConfigLine {
private final int serverId;
private Integer clientPort;
+ private Integer secureClientPort;
// hostName -> <quorumPort1, quorumPort2>
private final Map<String, Set<Integer>> quorumPorts = new HashMap<>();
@@ -1229,7 +1230,7 @@ private ServerConfigLine(String configLine) {
serverId = parseInt(parts[0].split("\\.")[1]);
String[] serverConfig = parts[1].split(";");
String[] serverAddresses = serverConfig[0].split("\\|");
- if (serverConfig.length > 1) {
+ if (serverConfig.length > 1 && !serverConfig[1].isEmpty()) {
String[] clientParts = serverConfig[1].split(":");
if (clientParts.length > 1) {
clientPort = parseInt(clientParts[1]);
@@ -1238,6 +1239,15 @@ private ServerConfigLine(String configLine) {
}
}
+ if (serverConfig.length > 2 && !serverConfig[2].isEmpty()) {
+ String[] secureClientParts = serverConfig[2].split(":");
+ if (secureClientParts.length > 1) {
+ secureClientPort = parseInt(secureClientParts[1]);
+ } else {
+ secureClientPort = parseInt(secureClientParts[0]);
+ }
+ }
+
for (String addr : serverAddresses) {
// addr like: 127.0.0.1:11230:11229:participant or [0:0:0:0:0:0:0:1]:11346:11347
String serverHost;
@@ -1268,13 +1278,14 @@ public boolean equals(Object o) {
ServerConfigLine that = (ServerConfigLine) o;
return serverId == that.serverId
&& Objects.equals(clientPort, that.clientPort)
+ && Objects.equals(secureClientPort, that.secureClientPort)
&& quorumPorts.equals(that.quorumPorts)
&& electionPorts.equals(that.electionPorts);
}
@Override
public int hashCode() {
- return Objects.hash(serverId, clientPort, quorumPorts, electionPorts);
+ return Objects.hash(serverId, clientPort, secureClientPort, quorumPorts, electionPorts);
}
}