| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kafka.clients; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotSame; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import org.apache.kafka.common.errors.AuthenticationException; |
| import org.apache.kafka.common.utils.LogContext; |
| import org.apache.kafka.common.utils.MockTime; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class ClusterConnectionStatesTest { |
| |
| private static ArrayList<InetAddress> initialAddresses; |
| private static ArrayList<InetAddress> newAddresses; |
| |
| static { |
| try { |
| initialAddresses = new ArrayList<>(Arrays.asList( |
| InetAddress.getByName("10.200.20.100"), |
| InetAddress.getByName("10.200.20.101"), |
| InetAddress.getByName("10.200.20.102") |
| )); |
| newAddresses = new ArrayList<>(Arrays.asList( |
| InetAddress.getByName("10.200.20.103"), |
| InetAddress.getByName("10.200.20.104"), |
| InetAddress.getByName("10.200.20.105") |
| )); |
| } catch (UnknownHostException e) { |
| fail("Attempted to create an invalid InetAddress, this should not happen"); |
| } |
| } |
| |
| private final MockTime time = new MockTime(); |
| private final long reconnectBackoffMs = 10 * 1000; |
| private final long reconnectBackoffMax = 60 * 1000; |
| private final double reconnectBackoffJitter = 0.2; |
| private final String nodeId1 = "1001"; |
| private final String nodeId2 = "2002"; |
| private final String nodeId3 = "3003"; |
| private final String hostTwoIps = "multiple.ip.address"; |
| private ClusterConnectionStates connectionStates; |
| |
| // For testing nodes with a single IP address, use localhost and default DNS resolution |
| private DefaultHostResolver singleIPHostResolver = new DefaultHostResolver(); |
| |
| // For testing nodes with multiple IP addresses, mock DNS resolution to get consistent results |
| private AddressChangeHostResolver multipleIPHostResolver = new AddressChangeHostResolver( |
| initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));; |
| |
| @Before |
| public void setup() { |
| this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, |
| new LogContext(), this.singleIPHostResolver); |
| } |
| |
| @Test |
| public void testClusterConnectionStateChanges() { |
| assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| |
| // Start connecting to Node and check state |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.CONNECTING); |
| assertTrue(connectionStates.isConnecting(nodeId1)); |
| assertFalse(connectionStates.isReady(nodeId1, time.milliseconds())); |
| assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); |
| assertFalse(connectionStates.hasReadyNodes(time.milliseconds())); |
| |
| time.sleep(100); |
| |
| // Successful connection |
| connectionStates.ready(nodeId1); |
| assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.READY); |
| assertTrue(connectionStates.isReady(nodeId1, time.milliseconds())); |
| assertTrue(connectionStates.hasReadyNodes(time.milliseconds())); |
| assertFalse(connectionStates.isConnecting(nodeId1)); |
| assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); |
| assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), Long.MAX_VALUE); |
| |
| time.sleep(15000); |
| |
| // Disconnected from broker |
| connectionStates.disconnected(nodeId1, time.milliseconds()); |
| assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.DISCONNECTED); |
| assertTrue(connectionStates.isDisconnected(nodeId1)); |
| assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); |
| assertFalse(connectionStates.isConnecting(nodeId1)); |
| assertFalse(connectionStates.hasReadyNodes(time.milliseconds())); |
| assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| |
| // After disconnecting we expect a backoff value equal to the reconnect.backoff.ms setting (plus minus 20% jitter) |
| double backoffTolerance = reconnectBackoffMs * reconnectBackoffJitter; |
| long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); |
| assertEquals(reconnectBackoffMs, currentBackoff, backoffTolerance); |
| |
| time.sleep(currentBackoff + 1); |
| // after waiting for the current backoff value we should be allowed to connect again |
| assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| } |
| |
| @Test |
| public void testMultipleNodeConnectionStates() { |
| // Check initial state, allowed to connect to all nodes, but no nodes shown as ready |
| assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds())); |
| assertFalse(connectionStates.hasReadyNodes(time.milliseconds())); |
| |
| // Start connecting one node and check that the pool only shows ready nodes after |
| // successful connect |
| connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| assertFalse(connectionStates.hasReadyNodes(time.milliseconds())); |
| time.sleep(1000); |
| connectionStates.ready(nodeId2); |
| assertTrue(connectionStates.hasReadyNodes(time.milliseconds())); |
| |
| // Connect second node and check that both are shown as ready, pool should immediately |
| // show ready nodes, since node2 is already connected |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| assertTrue(connectionStates.hasReadyNodes(time.milliseconds())); |
| time.sleep(1000); |
| connectionStates.ready(nodeId1); |
| assertTrue(connectionStates.hasReadyNodes(time.milliseconds())); |
| |
| time.sleep(12000); |
| |
| // disconnect nodes and check proper state of pool throughout |
| connectionStates.disconnected(nodeId2, time.milliseconds()); |
| assertTrue(connectionStates.hasReadyNodes(time.milliseconds())); |
| assertTrue(connectionStates.isBlackedOut(nodeId2, time.milliseconds())); |
| assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); |
| time.sleep(connectionStates.connectionDelay(nodeId2, time.milliseconds())); |
| // by the time node1 disconnects node2 should have been unblocked again |
| connectionStates.disconnected(nodeId1, time.milliseconds() + 1); |
| assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); |
| assertFalse(connectionStates.isBlackedOut(nodeId2, time.milliseconds())); |
| assertFalse(connectionStates.hasReadyNodes(time.milliseconds())); |
| } |
| |
| @Test |
| public void testAuthorizationFailed() { |
| // Try connecting |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| |
| time.sleep(100); |
| |
| connectionStates.authenticationFailed(nodeId1, time.milliseconds(), new AuthenticationException("No path to CA for certificate!")); |
| time.sleep(1000); |
| assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.AUTHENTICATION_FAILED); |
| assertTrue(connectionStates.authenticationException(nodeId1) instanceof AuthenticationException); |
| assertFalse(connectionStates.hasReadyNodes(time.milliseconds())); |
| assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| |
| time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1); |
| |
| assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| connectionStates.ready(nodeId1); |
| assertNull(connectionStates.authenticationException(nodeId1)); |
| } |
| |
| @Test |
| public void testRemoveNode() { |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| time.sleep(1000); |
| connectionStates.ready(nodeId1); |
| time.sleep(10000); |
| |
| connectionStates.disconnected(nodeId1, time.milliseconds()); |
| // Node is disconnected and blocked, removing it from the list should reset all blocks |
| connectionStates.remove(nodeId1); |
| assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); |
| assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), 0L); |
| } |
| |
| @Test |
| public void testMaxReconnectBackoff() { |
| long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter)); |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| time.sleep(1000); |
| connectionStates.disconnected(nodeId1, time.milliseconds()); |
| |
| // Do 100 reconnect attempts and check that MaxReconnectBackoff (plus jitter) is not exceeded |
| for (int i = 0; i < 100; i++) { |
| long reconnectBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); |
| assertTrue(reconnectBackoff <= effectiveMaxReconnectBackoff); |
| assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| time.sleep(reconnectBackoff + 1); |
| assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| time.sleep(10); |
| connectionStates.disconnected(nodeId1, time.milliseconds()); |
| } |
| } |
| |
| @Test |
| public void testExponentialReconnectBackoff() { |
| // Calculate fixed components for backoff process |
| final int reconnectBackoffExpBase = 2; |
| double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) |
| / Math.log(reconnectBackoffExpBase); |
| |
| // Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt |
| for (int i = 0; i < 10; i++) { |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| connectionStates.disconnected(nodeId1, time.milliseconds()); |
| // Calculate expected backoff value without jitter |
| long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp)) |
| * reconnectBackoffMs); |
| long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); |
| assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff); |
| time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1); |
| } |
| } |
| |
| @Test |
| public void testThrottled() { |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| time.sleep(1000); |
| connectionStates.ready(nodeId1); |
| time.sleep(10000); |
| |
| // Initially not throttled. |
| assertEquals(0, connectionStates.throttleDelayMs(nodeId1, time.milliseconds())); |
| |
| // Throttle for 100ms from now. |
| connectionStates.throttle(nodeId1, time.milliseconds() + 100); |
| assertEquals(100, connectionStates.throttleDelayMs(nodeId1, time.milliseconds())); |
| |
| // Still throttled after 50ms. The remaining delay is 50ms. The poll delay should be same as throttling delay. |
| time.sleep(50); |
| assertEquals(50, connectionStates.throttleDelayMs(nodeId1, time.milliseconds())); |
| assertEquals(50, connectionStates.pollDelayMs(nodeId1, time.milliseconds())); |
| |
| // Not throttled anymore when the deadline is reached. The poll delay should be same as connection delay. |
| time.sleep(50); |
| assertEquals(0, connectionStates.throttleDelayMs(nodeId1, time.milliseconds())); |
| assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), |
| connectionStates.pollDelayMs(nodeId1, time.milliseconds())); |
| } |
| |
| @Test |
| public void testSingleIPWithDefault() throws UnknownHostException { |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| InetAddress currAddress = connectionStates.currentAddress(nodeId1); |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| assertSame(currAddress, connectionStates.currentAddress(nodeId1)); |
| } |
| |
| @Test |
| public void testSingleIPWithUseAll() throws UnknownHostException { |
| assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS, singleIPHostResolver).size()); |
| |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS); |
| InetAddress currAddress = connectionStates.currentAddress(nodeId1); |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS); |
| assertSame(currAddress, connectionStates.currentAddress(nodeId1)); |
| } |
| |
| @Test |
| public void testMultipleIPsWithDefault() throws UnknownHostException { |
| setupMultipleIPs(); |
| |
| assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1); |
| |
| connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT); |
| InetAddress currAddress = connectionStates.currentAddress(nodeId1); |
| connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT); |
| assertSame(currAddress, connectionStates.currentAddress(nodeId1)); |
| } |
| |
| @Test |
| public void testMultipleIPsWithUseAll() throws UnknownHostException { |
| setupMultipleIPs(); |
| |
| assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1); |
| |
| connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS); |
| InetAddress addr1 = connectionStates.currentAddress(nodeId1); |
| connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS); |
| InetAddress addr2 = connectionStates.currentAddress(nodeId1); |
| assertNotSame(addr1, addr2); |
| connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS); |
| InetAddress addr3 = connectionStates.currentAddress(nodeId1); |
| assertNotSame(addr1, addr3); |
| } |
| |
| @Test |
| public void testHostResolveChange() throws UnknownHostException, ReflectiveOperationException { |
| setupMultipleIPs(); |
| |
| assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1); |
| |
| connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT); |
| InetAddress addr1 = connectionStates.currentAddress(nodeId1); |
| |
| multipleIPHostResolver.changeAddresses(); |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| InetAddress addr2 = connectionStates.currentAddress(nodeId1); |
| |
| assertNotSame(addr1, addr2); |
| } |
| |
| @Test |
| public void testNodeWithNewHostname() throws UnknownHostException { |
| setupMultipleIPs(); |
| |
| connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); |
| InetAddress addr1 = connectionStates.currentAddress(nodeId1); |
| |
| this.multipleIPHostResolver.changeAddresses(); |
| connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT); |
| InetAddress addr2 = connectionStates.currentAddress(nodeId1); |
| |
| assertNotSame(addr1, addr2); |
| } |
| |
| private void setupMultipleIPs() { |
| this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, |
| new LogContext(), this.multipleIPHostResolver); |
| } |
| } |