blob: e5962669b9f23c28b820cb924ebc94cbb22c6048 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <time.h>
#include <atomic>
#include <map>
#include <utility>
#include "lib/Latch.h"
using namespace pulsar;
static ProducerConfiguration createDefaultProducerConfig() {
// The default producer configuration only use number of messages to limit batching
return ProducerConfiguration()
.setBatchingType(ProducerConfiguration::KeyBasedBatching)
.setBatchingMaxAllowedSizeInBytes(static_cast<unsigned long>(-1))
.setBatchingMaxPublishDelayMs(3600 * 1000);
}
class KeyBasedBatchingTest : public ::testing::Test {
protected:
KeyBasedBatchingTest() : client_("pulsar://localhost:6650") {}
void TearDown() override { client_.close(); }
void initTopicName(const std::string& testName) {
topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr));
}
void initProducer(const ProducerConfiguration& producerConfig) {
ASSERT_EQ(ResultOk, client_.createProducer(topicName_, producerConfig, producer_));
}
void initConsumer() { ASSERT_EQ(ResultOk, client_.subscribe(topicName_, "SubscriptionName", consumer_)); }
void receiveAndAck(Message& msg) {
ASSERT_EQ(ResultOk, consumer_.receive(msg, 3000));
ASSERT_EQ(ResultOk, consumer_.acknowledge(msg));
}
Client client_;
Producer producer_;
Consumer consumer_;
std::string topicName_;
};
TEST_F(KeyBasedBatchingTest, testFlush) {
initTopicName("Flush");
// no limits for batching
initProducer(createDefaultProducerConfig().setBatchingMaxMessages(
static_cast<unsigned int>(-1)) // no limits for batching
);
constexpr int numMessages = 100;
const std::string keys[] = {"A", "B"};
std::atomic_int numMessageSent{0};
for (int i = 0; i < numMessages; i++) {
producer_.sendAsync(MessageBuilder().setOrderingKey(keys[i % 2]).setContent("x").build(),
[&numMessageSent](Result result, const MessageId&) {
numMessageSent++;
ASSERT_EQ(result, ResultOk);
});
}
ASSERT_EQ(ResultOk, producer_.flush());
ASSERT_EQ(numMessageSent.load(), numMessages);
}
TEST_F(KeyBasedBatchingTest, testOrderingKeyPriority) {
initTopicName("OrderingKeyPriority");
initProducer(createDefaultProducerConfig().setBatchingMaxMessages(3));
initConsumer();
Latch latch(3);
auto sendCallback = [&latch](Result result, const MessageId& id) {
ASSERT_EQ(result, ResultOk);
latch.countdown();
};
// "0" is send to batch of "A" because ordering key has higher priority
producer_.sendAsync(MessageBuilder().setContent("0").setOrderingKey("A").setPartitionKey("B").build(),
sendCallback);
producer_.sendAsync(MessageBuilder().setContent("1").setOrderingKey("A").build(), sendCallback);
producer_.sendAsync(MessageBuilder().setContent("2").setOrderingKey("B").build(), sendCallback);
latch.countdown();
Message msg;
receiveAndAck(msg);
ASSERT_EQ("0", msg.getDataAsString());
ASSERT_EQ("A", msg.getOrderingKey());
ASSERT_EQ("B", msg.getPartitionKey());
receiveAndAck(msg);
ASSERT_EQ("1", msg.getDataAsString());
ASSERT_EQ("A", msg.getOrderingKey());
receiveAndAck(msg);
ASSERT_EQ("2", msg.getDataAsString());
ASSERT_EQ("B", msg.getOrderingKey());
}
TEST_F(KeyBasedBatchingTest, testSequenceId) {
initTopicName("SequenceId");
initProducer(createDefaultProducerConfig().setBatchingMaxMessages(6));
initConsumer();
Latch latch(6);
auto sendAsync = [this, &latch](const std::string& key, const std::string& value) {
producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
[&latch](Result result, const MessageId& id) {
ASSERT_EQ(result, ResultOk);
latch.countdown();
});
};
sendAsync("A", "0");
sendAsync("B", "1");
sendAsync("C", "2");
sendAsync("B", "3");
sendAsync("C", "4");
sendAsync("A", "5");
// sequence id: B < C < A, so there are 3 batches in order as following:
// B: 1, 3
// C: 2, 4
// A: 0, 5
latch.wait();
std::vector<std::string> receivedKeys;
std::vector<std::string> receivedValues;
for (int i = 0; i < 6; i++) {
Message msg;
receiveAndAck(msg);
receivedKeys.emplace_back(msg.getOrderingKey());
receivedValues.emplace_back(msg.getDataAsString());
}
decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"};
decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"};
EXPECT_EQ(receivedKeys, expectedKeys);
EXPECT_EQ(receivedValues, expectedValues);
}
TEST_F(KeyBasedBatchingTest, testSingleBatch) {
initTopicName("SingleBatch");
initProducer(createDefaultProducerConfig().setBatchingMaxMessages(5));
initConsumer();
constexpr int numMessages = 5 * 100;
std::atomic_int numMessageSent{0};
// messages with no key are packed to the same batch and this batch has no key
// the broker uses `NON_KEY` as the key when dispatching messages from this batch
for (int i = 0; i < numMessages; i++) {
producer_.sendAsync(MessageBuilder().setContent("x").build(),
[&numMessageSent](Result result, const MessageId&) {
ASSERT_EQ(result, ResultOk);
++numMessageSent;
});
}
Message msg;
for (int i = 0; i < numMessages; i++) {
receiveAndAck(msg);
}
ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000));
ASSERT_EQ(numMessageSent.load(), numMessages);
}
TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) {
initTopicName("CloseBeforeSend");
// Any asynchronous send won't be completed unless `close()` or `flush()` is triggered
initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast<unsigned>(-1)));
std::mutex mtx;
std::vector<Result> results;
auto saveResult = [&mtx, &results](Result result) {
std::lock_guard<std::mutex> lock(mtx);
results.emplace_back(result);
};
auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) {
producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
[saveResult](Result result, const MessageId& id) { saveResult(result); });
};
constexpr int numKeys = 10;
for (int i = 0; i < numKeys; i++) {
sendAsync("key-" + std::to_string(i), "value");
}
ASSERT_EQ(ResultOk, producer_.close());
// After close() completed, all callbacks should have failed with ResultAlreadyClosed
std::lock_guard<std::mutex> lock(mtx);
ASSERT_EQ(results.size(), numKeys);
for (int i = 0; i < numKeys; i++) {
ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i];
}
}