blob: b134631bd8b11e5b66b44d985d9320316a2d1315 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
* user-facing producer and consumer clients.
* <p>
* This class is not thread-safe!
*/
public class NetworkClient implements KafkaClient {
private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
/* the selector used to perform network i/o */
private final Selectable selector;
private final MetadataUpdater metadataUpdater;
private final Random randOffset;
/* the state of each node's connection */
private final ClusterConnectionStates connectionStates;
/* the set of requests currently being sent or awaiting a response */
private final InFlightRequests inFlightRequests;
/* the socket send buffer size in bytes */
private final int socketSendBuffer;
/* the socket receive size buffer in bytes */
private final int socketReceiveBuffer;
/* the client id used to identify this client in requests to the server */
private final String clientId;
/* the current correlation id to use when sending requests to servers */
private int correlation;
/* max time in ms for the producer to wait for acknowledgement from server*/
private final int requestTimeoutMs;
private final Time time;
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
}
public NetworkClient(Selectable selector,
MetadataUpdater metadataUpdater,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
}
private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
*/
if (metadataUpdater == null) {
if (metadata == null)
throw new IllegalArgumentException("`metadata` must not be null");
this.metadataUpdater = new DefaultMetadataUpdater(metadata);
} else {
this.metadataUpdater = metadataUpdater;
}
this.selector = selector;
this.clientId = clientId;
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.randOffset = new Random();
this.requestTimeoutMs = requestTimeoutMs;
this.time = time;
}
/**
* Begin connecting to the given node, return true if we are already connected and ready to send to that node.
*
* @param node The node to check
* @param now The current timestamp
* @return True if we are ready to send to the given node
*/
@Override
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
initiateConnect(node, now);
return false;
}
/**
* Closes the connection to a particular node (if there is one).
*
* @param nodeId The id of the node
*/
@Override
public void close(String nodeId) {
selector.close(nodeId);
for (ClientRequest request : inFlightRequests.clearAll(nodeId))
metadataUpdater.maybeHandleDisconnection(request);
connectionStates.remove(nodeId);
}
/**
* Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
* disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
* connections.
*
* @param node The node to check
* @param now The current timestamp
* @return The number of milliseconds to wait.
*/
@Override
public long connectionDelay(Node node, long now) {
return connectionStates.connectionDelay(node.idString(), now);
}
/**
* Check if the connection of the node has failed, based on the connection state. Such connection failure are
* usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
* call, but there are cases where transient failures needs to be caught and re-acted upon.
*
* @param node the node to check
* @return true iff the connection has failed and the node is disconnected
*/
@Override
public boolean connectionFailed(Node node) {
return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
}
/**
* Check if the node with the given id is ready to send more requests.
*
* @param node The node
* @param now The current time in ms
* @return true if the node is ready
*/
@Override
public boolean isReady(Node node, long now) {
// if we need to update our metadata now declare all requests unready to make metadata requests first
// priority
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}
/**
* Are we connected and ready and able to send more requests to the given connection?
*
* @param node The node
*/
private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
*
* @param request The request
* @param now The current timestamp
*/
@Override
public void send(ClientRequest request, long now) {
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
doSend(request, now);
}
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
/**
* Do actual reads and writes to sockets.
*
* @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
* must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
* metadata timeout
* @param now The current time in milliseconds
* @return The list of responses received
*/
@Override
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
/**
* Get the number of in-flight requests
*/
@Override
public int inFlightRequestCount() {
return this.inFlightRequests.inFlightRequestCount();
}
/**
* Get the number of in-flight requests for a given node
*/
@Override
public int inFlightRequestCount(String node) {
return this.inFlightRequests.inFlightRequestCount(node);
}
/**
* Generate a request header for the given API key
*
* @param key The api key
* @return A request header with the appropriate client id and correlation id
*/
@Override
public RequestHeader nextRequestHeader(ApiKeys key) {
return new RequestHeader(key.id, clientId, correlation++);
}
/**
* Generate a request header for the given API key and version
*
* @param key The api key
* @param version The api version
* @return A request header with the appropriate client id and correlation id
*/
@Override
public RequestHeader nextRequestHeader(ApiKeys key, short version) {
return new RequestHeader(key.id, version, clientId, correlation++);
}
/**
* Interrupt the client if it is blocked waiting on I/O.
*/
@Override
public void wakeup() {
this.selector.wakeup();
}
/**
* Close the network client
*/
@Override
public void close() {
this.selector.close();
}
/**
* Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
* prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
* connection if all existing connections are in use. This method will never choose a node for which there is no
* existing connection and from which we have disconnected within the reconnect backoff period.
*
* @return The node with the fewest in-flight requests.
*/
@Override
public Node leastLoadedNode(long now) {
List<Node> nodes = this.metadataUpdater.fetchNodes();
int inflight = Integer.MAX_VALUE;
Node found = null;
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
// if we find an established connection with no in-flight requests we can stop right away
return node;
} else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
found = node;
}
}
return found;
}
public static Struct parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
// Always expect the response version id to be the same as the request version id
short apiKey = requestHeader.apiKey();
short apiVer = requestHeader.apiVersion();
Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
correlate(requestHeader, responseHeader);
return responseBody;
}
/**
* Post process disconnection of a node
*
* @param responses The list of responses to update
* @param nodeId Id of the node to be disconnected
* @param now The current time
*/
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
connectionStates.disconnected(nodeId, now);
for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
if (!metadataUpdater.maybeHandleDisconnection(request))
responses.add(new ClientResponse(request, now, true, null));
}
}
/**
* Iterate over all the inflight requests and expire any requests that have exceeded the configured requestTimeout.
* The connection to the node associated with the request will be terminated and will be treated as a disconnection.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now);
}
// we disconnected, so we should probably refresh our metadata
if (nodeIds.size() > 0)
metadataUpdater.requestUpdate();
}
/**
* Handle any completed request send. In particular if no response is expected consider the request complete.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) {
this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false, null));
}
}
}
/**
* Handle any completed receives and update the response list with the responses received.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
/**
* Handle any disconnected connections
*
* @param responses The list of responses that completed with the disconnection
* @param now The current time
*/
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (String node : this.selector.disconnected()) {
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now);
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
metadataUpdater.requestUpdate();
}
/**
* Record any newly completed connections
*/
private void handleConnections() {
for (String node : this.selector.connected()) {
log.debug("Completed connection to node {}", node);
this.connectionStates.connected(node);
}
}
/**
* Validate that the response corresponds to the request we expect or else explode
*/
private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
if (requestHeader.correlationId() != responseHeader.correlationId())
throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+ ") does not match request (" + requestHeader.correlationId() + ")");
}
/**
* Initiate a connection to the given node
*/
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
class DefaultMetadataUpdater implements MetadataUpdater {
/* the current cluster metadata */
private final Metadata metadata;
/* true iff there is a metadata request that has been sent and for which we have not yet received a response */
private boolean metadataFetchInProgress;
/* the last timestamp when no broker node is available to connect */
private long lastNoNodeAvailableMs;
DefaultMetadataUpdater(Metadata metadata) {
this.metadata = metadata;
this.metadataFetchInProgress = false;
this.lastNoNodeAvailableMs = 0;
}
@Override
public List<Node> fetchNodes() {
return metadata.fetch().nodes();
}
@Override
public boolean isUpdateDue(long now) {
return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
}
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
if (metadataTimeout == 0) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
maybeUpdate(now, node);
}
return metadataTimeout;
}
@Override
public boolean maybeHandleDisconnection(ClientRequest request) {
ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
if (requestKey == ApiKeys.METADATA) {
Cluster cluster = metadata.fetch();
if (cluster.isBootstrapConfigured()) {
int nodeId = Integer.parseInt(request.request().destination());
Node node = cluster.nodeById(nodeId);
if (node != null)
log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
}
metadataFetchInProgress = false;
return true;
}
return false;
}
@Override
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
@Override
public void requestUpdate() {
this.metadata.requestUpdate();
}
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);
}
}
/**
* Create a metadata request for the given topics
*/
private ClientRequest request(long now, String node, MetadataRequest metadata) {
RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
return new ClientRequest(now, true, send, null, true);
}
/**
* Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdate(long now, Node node) {
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else { // connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
this.lastNoNodeAvailableMs = now;
}
}
}
}