blob: 9cad2cb8d53eb5b4a81944dc0a178d20075b6db6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* The state of our connection to each node in the cluster.
*
*/
final class ClusterConnectionStates {
private final long reconnectBackoffInitMs;
private final long reconnectBackoffMaxMs;
private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
private final double reconnectBackoffMaxExp;
private final Map<String, NodeConnectionState> nodeState;
private final Logger log;
private final HostResolver hostResolver;
public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
LogContext logContext, HostResolver hostResolver) {
this.log = logContext.logger(ClusterConnectionStates.class);
this.reconnectBackoffInitMs = reconnectBackoffMs;
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / (double) Math.max(reconnectBackoffMs, 1)) / Math.log(RECONNECT_BACKOFF_EXP_BASE);
this.nodeState = new HashMap<>();
this.hostResolver = hostResolver;
}
/**
* Return true iff we can currently initiate a new connection. This will be the case if we are not
* connected and haven't been connected for at least the minimum reconnection backoff period.
* @param id the connection id to check
* @param now the current time in ms
* @return true if we can initiate a new connection
*/
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state.isDisconnected() &&
now - state.lastConnectAttemptMs >= state.reconnectBackoffMs;
}
/**
* Return true if we are disconnected from the given node and can't re-establish a connection yet.
* @param id the connection to check
* @param now the current time in ms
*/
public boolean isBlackedOut(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return false;
else
return state.state.isDisconnected() &&
now - state.lastConnectAttemptMs < state.reconnectBackoffMs;
}
/**
* Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
* disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
* connections.
* @param id the connection to check
* @param now the current time in ms
*/
public long connectionDelay(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null) return 0;
long timeWaited = now - state.lastConnectAttemptMs;
if (state.state.isDisconnected()) {
return Math.max(state.reconnectBackoffMs - timeWaited, 0);
} else {
// When connecting or connected, we should be able to delay indefinitely since other events (connection or
// data acked) will cause a wakeup once data can be sent.
return Long.MAX_VALUE;
}
}
/**
* Return true if a specific connection establishment is currently underway
* @param id The id of the node to check
*/
public boolean isConnecting(String id) {
NodeConnectionState state = nodeState.get(id);
return state != null && state.state == ConnectionState.CONNECTING;
}
/**
* Enter the connecting state for the given connection, moving to a new resolved address if necessary.
* @param id the id of the connection
* @param now the current time in ms
* @param host the host of the connection, to be resolved internally if needed
* @param clientDnsLookup the mode of DNS lookup to use when resolving the {@code host}
*/
public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) {
NodeConnectionState connectionState = nodeState.get(id);
if (connectionState != null && connectionState.host().equals(host)) {
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
// Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved
connectionState.moveToNextAddress();
return;
} else if (connectionState != null) {
log.info("Hostname for node {} changed from {} to {}.", id, connectionState.host(), host);
}
// Create a new NodeConnectionState if nodeState does not already contain one
// for the specified id or if the hostname associated with the node id changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
this.reconnectBackoffInitMs, host, clientDnsLookup, hostResolver));
}
/**
* Returns a resolved address for the given connection, resolving it if necessary.
* @param id the id of the connection
* @throws UnknownHostException if the address was not resolvable
*/
public InetAddress currentAddress(String id) throws UnknownHostException {
return nodeState(id).currentAddress();
}
/**
* Enter the disconnected state for the given node.
* @param id the connection we have disconnected
* @param now the current time in ms
*/
public void disconnected(String id, long now) {
NodeConnectionState nodeState = nodeState(id);
nodeState.lastConnectAttemptMs = now;
updateReconnectBackoff(nodeState);
if (nodeState.state.isConnected()) {
// If a connection had previously been established, clear the addresses to trigger a new DNS resolution
// because the node IPs may have changed
nodeState.clearAddresses();
}
nodeState.state = ConnectionState.DISCONNECTED;
}
/**
* Indicate that the connection is throttled until the specified deadline.
* @param id the connection to be throttled
* @param throttleUntilTimeMs the throttle deadline in milliseconds
*/
public void throttle(String id, long throttleUntilTimeMs) {
NodeConnectionState state = nodeState.get(id);
// The throttle deadline should never regress.
if (state != null && state.throttleUntilTimeMs < throttleUntilTimeMs) {
state.throttleUntilTimeMs = throttleUntilTimeMs;
}
}
/**
* Return the remaining throttling delay in milliseconds if throttling is in progress. Return 0, otherwise.
* @param id the connection to check
* @param now the current time in ms
*/
public long throttleDelayMs(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state != null && state.throttleUntilTimeMs > now) {
return state.throttleUntilTimeMs - now;
} else {
return 0;
}
}
/**
* Return the number of milliseconds to wait, based on the connection state and the throttle time, before
* attempting to send data. If the connection has been established but being throttled, return throttle delay.
* Otherwise, return connection delay.
* @param id the connection to check
* @param now the current time in ms
*/
public long pollDelayMs(String id, long now) {
long throttleDelayMs = throttleDelayMs(id, now);
if (isConnected(id) && throttleDelayMs > 0) {
return throttleDelayMs;
} else {
return connectionDelay(id, now);
}
}
/**
* Enter the checking_api_versions state for the given node.
* @param id the connection identifier
*/
public void checkingApiVersions(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
}
/**
* Enter the ready state for the given node.
* @param id the connection identifier
*/
public void ready(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.READY;
nodeState.authenticationException = null;
resetReconnectBackoff(nodeState);
}
/**
* Enter the authentication failed state for the given node.
* @param id the connection identifier
* @param now the current time in ms
* @param exception the authentication exception
*/
public void authenticationFailed(String id, long now, AuthenticationException exception) {
NodeConnectionState nodeState = nodeState(id);
nodeState.authenticationException = exception;
nodeState.state = ConnectionState.AUTHENTICATION_FAILED;
nodeState.lastConnectAttemptMs = now;
updateReconnectBackoff(nodeState);
}
/**
* Return true if the connection is in the READY state and currently not throttled.
*
* @param id the connection identifier
* @param now the current time in ms
*/
public boolean isReady(String id, long now) {
return isReady(nodeState.get(id), now);
}
private boolean isReady(NodeConnectionState state, long now) {
return state != null && state.state == ConnectionState.READY && state.throttleUntilTimeMs <= now;
}
/**
* Return true if there is at least one node with connection in the READY state and not throttled. Returns false
* otherwise.
*
* @param now the current time in ms
*/
public boolean hasReadyNodes(long now) {
for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) {
if (isReady(entry.getValue(), now)) {
return true;
}
}
return false;
}
/**
* Return true if the connection has been established
* @param id The id of the node to check
*/
public boolean isConnected(String id) {
NodeConnectionState state = nodeState.get(id);
return state != null && state.state.isConnected();
}
/**
* Return true if the connection has been disconnected
* @param id The id of the node to check
*/
public boolean isDisconnected(String id) {
NodeConnectionState state = nodeState.get(id);
return state != null && state.state.isDisconnected();
}
/**
* Return authentication exception if an authentication error occurred
* @param id The id of the node to check
*/
public AuthenticationException authenticationException(String id) {
NodeConnectionState state = nodeState.get(id);
return state != null ? state.authenticationException : null;
}
/**
* Resets the failure count for a node and sets the reconnect backoff to the base
* value configured via reconnect.backoff.ms
*
* @param nodeState The node state object to update
*/
private void resetReconnectBackoff(NodeConnectionState nodeState) {
nodeState.failedAttempts = 0;
nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
}
/**
* Update the node reconnect backoff exponentially.
* The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random jitter)
* Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
*
* @param nodeState The node state object to update
*/
private void updateReconnectBackoff(NodeConnectionState nodeState) {
if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
nodeState.failedAttempts += 1;
double backoffExp = Math.min(nodeState.failedAttempts - 1, this.reconnectBackoffMaxExp);
double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, backoffExp);
long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * backoffFactor);
// Actual backoff is randomized to avoid connection storms.
double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 1.2);
nodeState.reconnectBackoffMs = (long) (randomFactor * reconnectBackoffMs);
}
}
/**
* Remove the given node from the tracked connection states. The main difference between this and `disconnected`
* is the impact on `connectionDelay`: it will be 0 after this call whereas `reconnectBackoffMs` will be taken
* into account after `disconnected` is called.
*
* @param id the connection to remove
*/
public void remove(String id) {
nodeState.remove(id);
}
/**
* Get the state of a given connection.
* @param id the id of the connection
* @return the state of our connection
*/
public ConnectionState connectionState(String id) {
return nodeState(id).state;
}
/**
* Get the state of a given node.
* @param id the connection to fetch the state for
*/
private NodeConnectionState nodeState(String id) {
NodeConnectionState state = this.nodeState.get(id);
if (state == null)
throw new IllegalStateException("No entry found for connection " + id);
return state;
}
/**
* The state of our connection to a node.
*/
private static class NodeConnectionState {
ConnectionState state;
AuthenticationException authenticationException;
long lastConnectAttemptMs;
long failedAttempts;
long reconnectBackoffMs;
// Connection is being throttled if current time < throttleUntilTimeMs.
long throttleUntilTimeMs;
private List<InetAddress> addresses;
private int addressIndex;
private final String host;
private final ClientDnsLookup clientDnsLookup;
private final HostResolver hostResolver;
private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
String host, ClientDnsLookup clientDnsLookup, HostResolver hostResolver) {
this.state = state;
this.addresses = Collections.emptyList();
this.addressIndex = -1;
this.authenticationException = null;
this.lastConnectAttemptMs = lastConnectAttempt;
this.failedAttempts = 0;
this.reconnectBackoffMs = reconnectBackoffMs;
this.throttleUntilTimeMs = 0;
this.host = host;
this.clientDnsLookup = clientDnsLookup;
this.hostResolver = hostResolver;
}
public String host() {
return host;
}
/**
* Fetches the current selected IP address for this node, resolving {@link #host()} if necessary.
* @return the selected address
* @throws UnknownHostException if resolving {@link #host()} fails
*/
private InetAddress currentAddress() throws UnknownHostException {
if (addresses.isEmpty()) {
// (Re-)initialize list
addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver);
addressIndex = 0;
}
return addresses.get(addressIndex);
}
/**
* Jumps to the next available resolved address for this node. If no other addresses are available, marks the
* list to be refreshed on the next {@link #currentAddress()} call.
*/
private void moveToNextAddress() {
if (addresses.isEmpty())
return; // Avoid div0. List will initialize on next currentAddress() call
addressIndex = (addressIndex + 1) % addresses.size();
if (addressIndex == 0)
addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
}
/**
* Clears the resolved addresses in order to trigger re-resolving on the next {@link #currentAddress()} call.
*/
private void clearAddresses() {
addresses = Collections.emptyList();
}
public String toString() {
return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
}
}
}