blob: b75ac5829f0c09089d141c615ea2f70f95a1d56e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
/**
* A mock network client for use testing code
*/
public class MockClient implements KafkaClient {
public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
return true;
}
};
private static class FutureResponse {
private final Node node;
private final RequestMatcher requestMatcher;
private final AbstractResponse responseBody;
private final boolean disconnected;
private final boolean isUnsupportedRequest;
public FutureResponse(Node node,
RequestMatcher requestMatcher,
AbstractResponse responseBody,
boolean disconnected,
boolean isUnsupportedRequest) {
this.node = node;
this.requestMatcher = requestMatcher;
this.responseBody = responseBody;
this.disconnected = disconnected;
this.isUnsupportedRequest = isUnsupportedRequest;
}
}
private int correlation;
private Runnable wakeupHook;
private final Time time;
private final MockMetadataUpdater metadataUpdater;
private final Map<String, ConnectionState> connections = new HashMap<>();
private final Map<Node, Long> pendingAuthenticationErrors = new HashMap<>();
private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>();
// Use concurrent queue for requests so that requests may be queried from a different thread
private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
// Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
private final Queue<ClientResponse> responses = new ConcurrentLinkedDeque<>();
private final Queue<FutureResponse> futureResponses = new ConcurrentLinkedDeque<>();
private final Queue<MetadataUpdate> metadataUpdates = new ConcurrentLinkedDeque<>();
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
private volatile int numBlockingWakeups = 0;
private volatile boolean active = true;
public MockClient(Time time, Metadata metadata) {
this(time, new DefaultMockMetadataUpdater(metadata));
}
public MockClient(Time time, MockMetadataUpdater metadataUpdater) {
this.time = time;
this.metadataUpdater = metadataUpdater;
}
public boolean isConnected(String idString) {
return connectionState(idString).state == ConnectionState.State.CONNECTED;
}
private ConnectionState connectionState(String idString) {
ConnectionState connectionState = connections.get(idString);
if (connectionState == null) {
connectionState = new ConnectionState();
connections.put(idString, connectionState);
}
return connectionState;
}
@Override
public boolean isReady(Node node, long now) {
return connectionState(node.idString()).isReady(now);
}
@Override
public boolean ready(Node node, long now) {
return connectionState(node.idString()).ready(now);
}
@Override
public long connectionDelay(Node node, long now) {
return connectionState(node.idString()).connectionDelay(now);
}
@Override
public long pollDelayMs(Node node, long now) {
return connectionDelay(node, now);
}
public void blackout(Node node, long durationMs) {
connectionState(node.idString()).backoff(time.milliseconds() + durationMs);
}
public void setUnreachable(Node node, long durationMs) {
disconnect(node.idString());
connectionState(node.idString()).setUnreachable(time.milliseconds() + durationMs);
}
public void throttle(Node node, long durationMs) {
connectionState(node.idString()).throttle(time.milliseconds() + durationMs);
}
public void delayReady(Node node, long durationMs) {
connectionState(node.idString()).setReadyDelayed(time.milliseconds() + durationMs);
}
public void authenticationFailed(Node node, long blackoutMs) {
pendingAuthenticationErrors.remove(node);
authenticationErrors.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
disconnect(node.idString());
blackout(node, blackoutMs);
}
public void createPendingAuthenticationError(Node node, long blackoutMs) {
pendingAuthenticationErrors.put(node, blackoutMs);
}
@Override
public boolean connectionFailed(Node node) {
return connectionState(node.idString()).isBackingOff(time.milliseconds());
}
@Override
public AuthenticationException authenticationException(Node node) {
return authenticationErrors.get(node);
}
@Override
public void disconnect(String node) {
long now = time.milliseconds();
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
ClientRequest request = iter.next();
if (request.destination().equals(node)) {
short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), now, true, null, null, null));
iter.remove();
}
}
connectionState(node).disconnect();
}
@Override
public void send(ClientRequest request, long now) {
if (!connectionState(request.destination()).isReady(now))
throw new IllegalStateException("Cannot send " + request + " since the destination is not ready");
// Check if the request is directed to a node with a pending authentication error.
for (Iterator<Map.Entry<Node, Long>> authErrorIter =
pendingAuthenticationErrors.entrySet().iterator(); authErrorIter.hasNext(); ) {
Map.Entry<Node, Long> entry = authErrorIter.next();
Node node = entry.getKey();
long blackoutMs = entry.getValue();
if (node.idString().equals(request.destination())) {
authErrorIter.remove();
// Set up a disconnected ClientResponse and create an authentication error
// for the affected node.
authenticationFailed(node, blackoutMs);
AbstractRequest.Builder<?> builder = request.requestBuilder();
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), true, null,
new AuthenticationException("Authentication failed"), null);
responses.add(resp);
return;
}
}
Iterator<FutureResponse> iterator = futureResponses.iterator();
while (iterator.hasNext()) {
FutureResponse futureResp = iterator.next();
if (futureResp.node != null && !request.destination().equals(futureResp.node.idString()))
continue;
AbstractRequest.Builder<?> builder = request.requestBuilder();
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
UnsupportedVersionException unsupportedVersionException = null;
if (futureResp.isUnsupportedRequest)
unsupportedVersionException = new UnsupportedVersionException("Api " +
request.apiKey() + " with version " + version);
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
unsupportedVersionException, null, futureResp.responseBody);
responses.add(resp);
iterator.remove();
return;
}
this.requests.add(request);
}
/**
* Simulate a blocking poll in order to test wakeup behavior.
*
* @param numBlockingWakeups The number of polls which will block until woken up
*/
public synchronized void enableBlockingUntilWakeup(int numBlockingWakeups) {
this.numBlockingWakeups = numBlockingWakeups;
}
@Override
public synchronized void wakeup() {
if (numBlockingWakeups > 0) {
numBlockingWakeups--;
notify();
}
if (wakeupHook != null) {
wakeupHook.run();
}
}
private synchronized void maybeAwaitWakeup() {
try {
int remainingBlockingWakeups = numBlockingWakeups;
if (remainingBlockingWakeups <= 0)
return;
while (numBlockingWakeups == remainingBlockingWakeups)
wait();
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
@Override
public List<ClientResponse> poll(long timeoutMs, long now) {
maybeAwaitWakeup();
checkTimeoutOfPendingRequests(now);
// We skip metadata updates if all nodes are currently blacked out
if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
MetadataUpdate metadataUpdate = metadataUpdates.poll();
if (metadataUpdate != null) {
metadataUpdater.update(time, metadataUpdate);
} else {
metadataUpdater.updateWithCurrentMetadata(time);
}
}
List<ClientResponse> copy = new ArrayList<>();
ClientResponse response;
while ((response = this.responses.poll()) != null) {
response.onComplete();
copy.add(response);
}
return copy;
}
private long elapsedTimeMs(long currentTimeMs, long startTimeMs) {
return Math.max(0, currentTimeMs - startTimeMs);
}
private void checkTimeoutOfPendingRequests(long nowMs) {
ClientRequest request = requests.peek();
while (request != null && elapsedTimeMs(nowMs, request.createdTimeMs()) > request.requestTimeoutMs()) {
disconnect(request.destination());
requests.poll();
request = requests.peek();
}
}
public Queue<ClientRequest> requests() {
return this.requests;
}
public void respond(AbstractResponse response) {
respond(response, false);
}
public void respond(RequestMatcher matcher, AbstractResponse response) {
ClientRequest nextRequest = requests.peek();
if (nextRequest == null)
throw new IllegalStateException("No current requests queued");
AbstractRequest request = nextRequest.requestBuilder().build();
if (!matcher.matches(request))
throw new IllegalStateException("Request matcher did not match next-in-line request " + request);
respond(response);
}
// Utility method to enable out of order responses
public void respondToRequest(ClientRequest clientRequest, AbstractResponse response) {
AbstractRequest request = clientRequest.requestBuilder().build();
requests.remove(clientRequest);
short version = clientRequest.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(clientRequest.makeHeader(version), clientRequest.callback(), clientRequest.destination(),
clientRequest.createdTimeMs(), time.milliseconds(), false, null, null, response));
}
public void respond(AbstractResponse response, boolean disconnected) {
if (requests.isEmpty())
throw new IllegalStateException("No requests pending for inbound response " + response);
ClientRequest request = requests.poll();
short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
}
public void respondFrom(AbstractResponse response, Node node) {
respondFrom(response, node, false);
}
public void respondFrom(AbstractResponse response, Node node, boolean disconnected) {
Iterator<ClientRequest> iterator = requests.iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (request.destination().equals(node.idString())) {
iterator.remove();
short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
return;
}
}
throw new IllegalArgumentException("No requests available to node " + node);
}
public void prepareResponse(AbstractResponse response) {
prepareResponse(ALWAYS_TRUE, response, false);
}
public void prepareResponseFrom(AbstractResponse response, Node node) {
prepareResponseFrom(ALWAYS_TRUE, response, node, false, false);
}
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
* match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException
* @param matcher The matcher to apply
* @param response The response body
*/
public void prepareResponse(RequestMatcher matcher, AbstractResponse response) {
prepareResponse(matcher, response, false);
}
public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse response, Node node) {
prepareResponseFrom(matcher, response, node, false, false);
}
public void prepareResponse(AbstractResponse response, boolean disconnected) {
prepareResponse(ALWAYS_TRUE, response, disconnected);
}
public void prepareResponseFrom(AbstractResponse response, Node node, boolean disconnected) {
prepareResponseFrom(ALWAYS_TRUE, response, node, disconnected, false);
}
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
* match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException.
* @param matcher The request matcher to apply
* @param response The response body
* @param disconnected Whether the request was disconnected
*/
public void prepareResponse(RequestMatcher matcher, AbstractResponse response, boolean disconnected) {
prepareResponseFrom(matcher, response, null, disconnected, false);
}
/**
* Raise an unsupported version error on the next request if it matches the given matcher.
* If the matcher does not match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException.
* @param matcher The request matcher to apply
*/
public void prepareUnsupportedVersionResponse(RequestMatcher matcher) {
prepareResponseFrom(matcher, null, null, false, true);
}
private void prepareResponseFrom(RequestMatcher matcher,
AbstractResponse response,
Node node,
boolean disconnected,
boolean isUnsupportedVersion) {
futureResponses.add(new FutureResponse(node, matcher, response, disconnected, isUnsupportedVersion));
}
public void waitForRequests(final int minRequests, long maxWaitMs) throws InterruptedException {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return requests.size() >= minRequests;
}
}, maxWaitMs, "Expected requests have not been sent");
}
public void reset() {
connections.clear();
requests.clear();
responses.clear();
futureResponses.clear();
metadataUpdates.clear();
authenticationErrors.clear();
}
public boolean hasPendingMetadataUpdates() {
return !metadataUpdates.isEmpty();
}
public int numAwaitingResponses() {
return futureResponses.size();
}
public void prepareMetadataUpdate(MetadataResponse updateResponse) {
prepareMetadataUpdate(updateResponse, false);
}
public void prepareMetadataUpdate(MetadataResponse updateResponse,
boolean expectMatchMetadataTopics) {
metadataUpdates.add(new MetadataUpdate(updateResponse, expectMatchMetadataTopics));
}
public void updateMetadata(MetadataResponse updateResponse) {
metadataUpdater.update(time, new MetadataUpdate(updateResponse, false));
}
@Override
public int inFlightRequestCount() {
return requests.size();
}
@Override
public boolean hasInFlightRequests() {
return !requests.isEmpty();
}
public boolean hasPendingResponses() {
return !responses.isEmpty() || !futureResponses.isEmpty();
}
@Override
public int inFlightRequestCount(String node) {
int result = 0;
for (ClientRequest req : requests) {
if (req.destination().equals(node))
++result;
}
return result;
}
@Override
public boolean hasInFlightRequests(String node) {
return inFlightRequestCount(node) > 0;
}
@Override
public boolean hasReadyNodes(long now) {
return connections.values().stream().anyMatch(cxn -> cxn.isReady(now));
}
@Override
public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse) {
return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, 5000, null);
}
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder,
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
RequestCompletionHandler callback) {
return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs,
expectResponse, requestTimeoutMs, callback);
}
@Override
public void initiateClose() {
close();
}
@Override
public boolean active() {
return active;
}
@Override
public void close() {
active = false;
metadataUpdater.close();
}
@Override
public void close(String node) {
connections.remove(node);
}
@Override
public Node leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
for (Node node : metadataUpdater.fetchNodes()) {
if (!connectionState(node.idString()).isBackingOff(now))
return node;
}
return null;
}
public void setWakeupHook(Runnable wakeupHook) {
this.wakeupHook = wakeupHook;
}
/**
* The RequestMatcher provides a way to match a particular request to a response prepared
* through {@link #prepareResponse(RequestMatcher, AbstractResponse)}. Basically this allows testers
* to inspect the request body for the type of the request or for specific fields that should be set,
* and to fail the test if it doesn't match.
*/
public interface RequestMatcher {
boolean matches(AbstractRequest body);
}
public void setNodeApiVersions(NodeApiVersions nodeApiVersions) {
this.nodeApiVersions = nodeApiVersions;
}
public static class MetadataUpdate {
final MetadataResponse updateResponse;
final boolean expectMatchRefreshTopics;
MetadataUpdate(MetadataResponse updateResponse, boolean expectMatchRefreshTopics) {
this.updateResponse = updateResponse;
this.expectMatchRefreshTopics = expectMatchRefreshTopics;
}
private Set<String> topics() {
return updateResponse.topicMetadata().stream()
.map(MetadataResponse.TopicMetadata::topic)
.collect(Collectors.toSet());
}
}
/**
* This is a dumbed down version of {@link MetadataUpdater} which is used to facilitate
* metadata tracking primarily in order to serve {@link KafkaClient#leastLoadedNode(long)}
* and bookkeeping through {@link Metadata}. The extensibility allows AdminClient, which does
* not rely on {@link Metadata} to do its own thing.
*/
public interface MockMetadataUpdater {
List<Node> fetchNodes();
boolean isUpdateNeeded();
void update(Time time, MetadataUpdate update);
default void updateWithCurrentMetadata(Time time) {}
default void close() {}
}
private static class DefaultMockMetadataUpdater implements MockMetadataUpdater {
private final Metadata metadata;
private MetadataUpdate lastUpdate;
public DefaultMockMetadataUpdater(Metadata metadata) {
this.metadata = metadata;
}
@Override
public List<Node> fetchNodes() {
return metadata.fetch().nodes();
}
@Override
public boolean isUpdateNeeded() {
return metadata.updateRequested();
}
@Override
public void updateWithCurrentMetadata(Time time) {
if (lastUpdate == null)
throw new IllegalStateException("No previous metadata update to use");
update(time, lastUpdate);
}
private void maybeCheckExpectedTopics(MetadataUpdate update, MetadataRequest.Builder builder) {
if (update.expectMatchRefreshTopics) {
if (builder.isAllTopics())
throw new IllegalStateException("The metadata topics does not match expectation. "
+ "Expected topics: " + update.topics()
+ ", asked topics: ALL");
Set<String> requestedTopics = new HashSet<>(builder.topics());
if (!requestedTopics.equals(update.topics())) {
throw new IllegalStateException("The metadata topics does not match expectation. "
+ "Expected topics: " + update.topics()
+ ", asked topics: " + requestedTopics);
}
}
}
@Override
public void update(Time time, MetadataUpdate update) {
MetadataRequest.Builder builder = metadata.newMetadataRequestBuilder();
maybeCheckExpectedTopics(update, builder);
metadata.update(update.updateResponse, time.milliseconds());
this.lastUpdate = update;
}
@Override
public void close() {
metadata.close();
}
}
private static class ConnectionState {
enum State { CONNECTING, CONNECTED, DISCONNECTED }
private long throttledUntilMs = 0L;
private long readyDelayedUntilMs = 0L;
private long backingOffUntilMs = 0L;
private long unreachableUntilMs = 0L;
private State state = State.DISCONNECTED;
void backoff(long untilMs) {
backingOffUntilMs = untilMs;
}
void throttle(long untilMs) {
throttledUntilMs = untilMs;
}
void setUnreachable(long untilMs) {
unreachableUntilMs = untilMs;
}
void setReadyDelayed(long untilMs) {
readyDelayedUntilMs = untilMs;
}
boolean isReady(long now) {
return state == State.CONNECTED && notThrottled(now);
}
boolean isReadyDelayed(long now) {
return now < readyDelayedUntilMs;
}
boolean notThrottled(long now) {
return now > throttledUntilMs;
}
boolean isBackingOff(long now) {
return now < backingOffUntilMs;
}
boolean isUnreachable(long now) {
return now < unreachableUntilMs;
}
void disconnect() {
state = State.DISCONNECTED;
}
long connectionDelay(long now) {
if (state != State.DISCONNECTED)
return Long.MAX_VALUE;
if (backingOffUntilMs > now)
return backingOffUntilMs - now;
return 0;
}
boolean ready(long now) {
switch (state) {
case CONNECTED:
return notThrottled(now);
case CONNECTING:
if (isReadyDelayed(now))
return false;
state = State.CONNECTED;
return ready(now);
case DISCONNECTED:
if (isBackingOff(now)) {
return false;
} else if (isUnreachable(now)) {
backingOffUntilMs = now + 100;
return false;
}
state = State.CONNECTING;
return ready(now);
default:
throw new IllegalArgumentException("Invalid state: " + state);
}
}
}
}