blob: 8136cf07c345aa5085314ad4b885ab78c0c4911a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
* A Pulsar Client that is used for testing scenarios where the different
* asynchronous operations of the client-broker interaction must be orchestrated by the test
* so that race conditions caused by the test code can be eliminated.
* features:
* - can override remote endpoint protocol version in a thread safe manner
* - can reject new connections from the client to the broker
* - can drop all OpSend messages after they have been added to pendingMessages and processed
* by the client. This simulates a situation where sending messages go to a "black hole".
* - can synchronize operations with the help of the pending message callback which gets
* called after the message to send out has been added to the pending messages in the client.
public class PulsarTestClient extends PulsarClientImpl {
private volatile int overrideRemoteEndpointProtocolVersion;
private volatile boolean rejectNewConnections;
private volatile boolean dropOpSendMessages;
private volatile Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback;
* Create a new PulsarTestClient instance.
* @param clientBuilder ClientBuilder instance containing the configuration of the client
* @return a new
* @throws PulsarClientException
public static PulsarTestClient create(ClientBuilder clientBuilder) throws PulsarClientException {
ClientConfigurationData clientConfigurationData =
((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
// the reason to do all the following is to be able to pass the supplier for creating new ClientCnx
// instances after the constructor of PulsarClientImpl has been called.
// An anonymous subclass of ClientCnx class is used to override the getRemoteEndpointProtocolVersion()
// method.
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(clientConfigurationData.getNumIoThreads(),
new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()));
AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new AtomicReference<>();
ConnectionPool connectionPool = new ConnectionPool(clientConfigurationData, eventLoopGroup,
() -> clientCnxSupplierReference.get().get());
return new PulsarTestClient(clientConfigurationData, eventLoopGroup, connectionPool,
private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference)
throws PulsarClientException {
super(conf, eventLoopGroup, cnxPool);
// workaround initialization order issue so that ClientCnx can be created in this class
* Overrides the default ClientCnx implementation with an implementation that overrides the
* getRemoteEndpointProtocolVersion() method. This is used to test client behaviour in certain cases.
* @return new ClientCnx instance
protected ClientCnx createClientCnx() {
return new ClientCnx(conf, eventLoopGroup) {
public int getRemoteEndpointProtocolVersion() {
return overrideRemoteEndpointProtocolVersion != 0
? overrideRemoteEndpointProtocolVersion
: super.getRemoteEndpointProtocolVersion();
* Overrides the getConnection method to reject new connections from being established between
* the client and brokers.
* @param topic the topic for the connection
* @return the ClientCnx to use, passed a future. Will complete with an exception when connections are rejected.
public CompletableFuture<ClientCnx> getConnection(String topic) {
if (rejectNewConnections) {
CompletableFuture<ClientCnx> result = new CompletableFuture<>();
result.completeExceptionally(new IOException("New connections are rejected."));
return result;
} else {
return super.getConnection(topic);
* Overrides the producer instance with an anonynomous subclass that adds hooks for observing new
* OpSendMsg instances being added to pending messages in the client.
* It also configures the hook to drop OpSend messages when dropping is enabled.
protected <T> ProducerImpl<T> newProducerImpl(String topic, int partitionIndex, ProducerConfigurationData conf,
Schema<T> schema, ProducerInterceptors interceptors,
CompletableFuture<Producer<T>> producerCreatedFuture) {
return new ProducerImpl<T>(this, topic, conf, producerCreatedFuture, partitionIndex, schema,
interceptors) {
protected OpSendMsgQueue createPendingMessagesQueue() {
return new OpSendMsgQueue() {
public boolean add(OpSendMsg opSendMsg) {
boolean added = super.add(opSendMsg);
if (pendingMessageCallback != null) {
return added;
protected ClientCnx getCnxIfReady() {
if (dropOpSendMessages) {
return null;
} else {
return super.getCnxIfReady();
public void setOverrideRemoteEndpointProtocolVersion(int overrideRemoteEndpointProtocolVersion) {
this.overrideRemoteEndpointProtocolVersion = overrideRemoteEndpointProtocolVersion;
public void setRejectNewConnections(boolean rejectNewConnections) {
this.rejectNewConnections = rejectNewConnections;
* Simulates the producer connection getting dropped. Will also reject reconnections to simulate an
* outage. This reduces race conditions since the reconnection has to be explicitly enabled by calling
* allowReconnecting() method.
public void disconnectProducerAndRejectReconnecting(ProducerImpl<?> producer) throws IOException {
// wait until all possible in-flight messages have been delivered
Awaitility.await().untilAsserted(() -> {
if (!dropOpSendMessages && producer.isConnected()) {
assertEquals(producer.getPendingQueueSize(), 0);
// reject new connection attempts
// make the existing connection between the producer and broker to break by explicitly closing it
ClientCnx cnx = producer.cnx();
* Resets possible dropping of OpSend messages and allows the client to reconnect to the broker.
public void allowReconnecting() {
dropOpSendMessages = false;
* Assigns the callback to use for handling OpSend messages once a message had been added to pending messages.
* @param pendingMessageCallback
public void setPendingMessageCallback(
Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback) {
this.pendingMessageCallback = pendingMessageCallback;
* Enable dropping of OpSend messages after they have been added to pendingMessages and processed
* by the client. The OpSend messages won't be delivered until the allowReconnecting method has been called.
public void dropOpSendMessages() {
this.dropOpSendMessages = true;