blob: 5d42173c364c2221fc2ba0f8a57632c8f69e8e0e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <chrono>
#include <future>
#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "WaitUtils.h"
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
DECLARE_LOG_OBJECT()
TEST(TableViewTest, testCreateTableView) {
const std::string topic = "testCreateTableView" + std::to_string(time(nullptr));
Client client(lookupUrl);
static const std::string jsonSchema =
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);
ProducerConfiguration producerConfiguration;
producerConfiguration.setSchema(schemaInfo);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));
// Create table view failed, The schema is not compatible
TableViewConfiguration tableViewConfiguration{.schemaInfo = SchemaInfo(AVRO, "", "")};
TableView tableView;
ASSERT_EQ(ResultIncompatibleSchema, client.createTableView(topic, tableViewConfiguration, tableView));
ASSERT_EQ(ResultConsumerNotInitialized, tableView.close());
// Create table view success.
ASSERT_EQ(ResultOk, client.createTableView(topic, {.schemaInfo = schemaInfo}, tableView));
ASSERT_EQ(ResultOk, tableView.close());
client.close();
}
TEST(TableViewTest, testSimpleTableView) {
const std::string topic = "testSimpleTableView" + std::to_string(time(nullptr));
Client client(lookupUrl);
ProducerConfiguration producerConfiguration;
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));
auto count = 20;
for (int i = 0; i < count; ++i) {
auto msg = MessageBuilder()
.setPartitionKey("key" + std::to_string(i))
.setContent("value" + std::to_string(i))
.build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
// Create table view and assert size.
TableView tableView;
ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView));
ASSERT_EQ(tableView.size(), count);
// Send some more messages, The 0 ~ count message key/value is duplicated send.
for (int i = 0; i < count * 2; ++i) {
auto msg = MessageBuilder()
.setPartitionKey("key" + std::to_string(i))
.setContent("value" + std::to_string(i))
.build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
waitUntil(
std::chrono::seconds(2), [&] { return tableView.size() == count * 2; }, 1000);
ASSERT_EQ(tableView.size(), count * 2);
// assert interfaces.
std::string value;
ASSERT_TRUE(tableView.getValue("key1", value));
ASSERT_EQ(value, "value1");
ASSERT_TRUE(tableView.retrieveValue("key1", value));
ASSERT_EQ(value, "value1");
ASSERT_FALSE(tableView.containsKey("key1"));
ASSERT_EQ(tableView.snapshot().size(), count * 2 - 1);
ASSERT_EQ(tableView.size(), 0);
client.close();
}
TEST(TableViewTest, testPublishEmptyValue) {
const std::string topic = "testPublishEmptyValue" + std::to_string(time(nullptr));
Client client(lookupUrl);
ProducerConfiguration producerConfiguration;
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));
auto count = 20;
for (int i = 0; i < count; ++i) {
auto msg = MessageBuilder()
.setPartitionKey("key" + std::to_string(i))
.setContent("value" + std::to_string(i))
.build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
// Create table view failed, The schema is not compatible
TableView tableView;
ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView));
ASSERT_EQ(tableView.size(), count);
// Set the v of k1 is empty
auto msg = MessageBuilder().setPartitionKey("key1").setContent("").build();
ASSERT_EQ(ResultOk, producer.send(msg));
waitUntil(
std::chrono::seconds(2), [&] { return tableView.size() == count - 1; }, 1000);
ASSERT_EQ(tableView.size(), count - 1);
// assert interfaces.
std::string value;
ASSERT_TRUE(!tableView.containsKey("key1"));
ASSERT_TRUE(!tableView.getValue("key1", value));
ASSERT_TRUE(value.empty());
client.close();
}
TEST(TableViewTest, testNotSupportNonPersistentTopic) {
const std::string topic = TopicDomain::NonPersistent +
"://public/default/testNotSupportNonPersistentTopic" +
std::to_string(time(nullptr));
Client client(lookupUrl);
TableView tableView;
ASSERT_EQ(ResultNotAllowedError, client.createTableView(topic, {}, tableView));
client.close();
}
TEST(TableViewTest, testMultiTopicAndAutoUpdatePartitions) {
std::string uniqueTimeStr = std::to_string(time(nullptr));
std::string topic = "persistent://public/default/testMultiTopicAndAutoUpdatePartitions" + uniqueTimeStr;
ClientConfiguration clientConfiguration;
clientConfiguration.setPartititionsUpdateInterval(1);
Client client(lookupUrl, clientConfiguration);
// create partition is 5
{
std::string url = adminUrl +
"admin/v2/persistent/public/default/testMultiTopicAndAutoUpdatePartitions" +
uniqueTimeStr + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
}
ProducerConfiguration producerConfiguration;
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));
auto count = 20;
for (int i = 0; i < count; ++i) {
auto msg = MessageBuilder()
.setPartitionKey("key" + std::to_string(i))
.setContent("value" + std::to_string(i))
.build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
TableView tableView;
ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView));
ASSERT_EQ(tableView.size(), count);
// update partitions is 10
{
std::string url = adminUrl +
"admin/v2/persistent/public/default/testMultiTopicAndAutoUpdatePartitions" +
uniqueTimeStr + "/partitions";
int res = makePostRequest(url, "10");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
}
waitUntil(
std::chrono::seconds(5), [&] { return PulsarFriend::getPartitionProducerSize(producer) == 10; }, 200);
ASSERT_EQ(PulsarFriend::getPartitionProducerSize(producer), 10);
for (int i = count; i < count * 2; ++i) {
auto msg = MessageBuilder()
.setPartitionKey("key" + std::to_string(i))
.setContent("value" + std::to_string(i))
.build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
waitUntil(
std::chrono::seconds(10), [&] { return tableView.size() == count * 2; }, 200);
ASSERT_EQ(tableView.size(), count * 2);
client.close();
}