blob: 4b7c532269abb8da505320a357f9da4e728217db [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/c/client.h>
#include <pulsar/c/table_view.h>
#include <string.h>
#include <future>
static const char *lookup_url = "pulsar://localhost:6650";
struct tv_create_result {
pulsar_result result;
pulsar_table_view_t *tableView;
};
static void create_tv_callback(pulsar_result result, pulsar_table_view_t *tableView, void *ctx) {
std::promise<tv_create_result> *create_promise = (std::promise<tv_create_result> *)ctx;
create_promise->set_value({result, tableView});
}
TEST(c_TableViewTest, testCreateTableViewAsync) {
const char *topic_name = "persistent://public/default/test-create-tv-async";
pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
// Create table view.
pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create();
pulsar_table_view_configuration_set_subscription_name(table_view_conf, "sub-name");
std::promise<tv_create_result> create_promise;
std::future<tv_create_result> create_future = create_promise.get_future();
pulsar_client_create_table_view_async(client, topic_name, table_view_conf, create_tv_callback,
&create_promise);
tv_create_result tvResult = create_future.get();
ASSERT_EQ(pulsar_result_Ok, tvResult.result);
pulsar_table_view_free(tvResult.tableView);
pulsar_client_close(client);
}
struct tv_action_ctx {
char *expect_data;
int size;
int expect_size;
std::promise<bool> *listen_promise;
};
static void tv_action(const char *key, const void *value, size_t value_size, void *ctx) {
tv_action_ctx *context = (tv_action_ctx *)ctx;
context->size++;
ASSERT_EQ(memcmp(value, context->expect_data, value_size), 0);
if (context->size == context->expect_size) {
context->listen_promise->set_value(true);
}
}
TEST(c_TableViewTest, testSimpleTableView) {
srand(time(NULL));
char topic_name[64];
snprintf(topic_name, 64, "persistent://public/default/test-table-view-%d", rand());
const char *sub_name = "my-sub-name";
pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create();
pulsar_producer_t *producer;
pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer);
ASSERT_EQ(pulsar_result_Ok, result);
// Send messages
const int num = 10;
const char *key1 = "key1";
const char *key2 = "key2";
size_t data_size = 4;
char *data = (char *)malloc(data_size);
data[0] = 0x01;
data[1] = 0x00;
data[2] = 0x02;
data[3] = 0x00;
for (int i = 0; i < num; i++) {
pulsar_message_t *message = pulsar_message_create();
if (i % 2 == 0) {
pulsar_message_set_partition_key(message, key1);
} else {
pulsar_message_set_partition_key(message, key2);
}
pulsar_message_set_content(message, data, data_size);
pulsar_result res = pulsar_producer_send(producer, message);
ASSERT_EQ(pulsar_result_Ok, res);
pulsar_message_free(message);
}
// Create table view.
pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create();
pulsar_table_view_configuration_set_subscription_name(table_view_conf, sub_name);
pulsar_table_view_t *table_view;
result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view);
ASSERT_EQ(pulsar_result_Ok, result);
// test get value
void *v1;
size_t v1_size;
ASSERT_EQ(pulsar_table_view_size(table_view), 2);
ASSERT_TRUE(pulsar_table_view_get_value(table_view, "key1", &v1, &v1_size));
ASSERT_EQ(v1_size, data_size);
ASSERT_EQ(memcmp(v1, data, data_size), 0);
free(v1);
// test for each.
tv_action_ctx ctx;
ctx.expect_data = data;
ctx.size = 0;
pulsar_table_view_for_each(table_view, tv_action, &ctx);
ASSERT_EQ(ctx.size, 2);
// test for each and listen
std::promise<bool> listen_promise;
std::future<bool> listen_future = listen_promise.get_future();
tv_action_ctx ctx2;
ctx2.expect_data = data;
ctx2.size = 0;
ctx2.expect_size = 3;
ctx2.listen_promise = &listen_promise;
pulsar_table_view_for_each_add_listen(table_view, tv_action, &ctx2);
ASSERT_EQ(ctx.size, 2);
// send more message.
pulsar_message_t *message = pulsar_message_create();
pulsar_message_set_partition_key(message, "key3");
pulsar_message_set_content(message, data, data_size);
pulsar_result res = pulsar_producer_send(producer, message);
ASSERT_EQ(pulsar_result_Ok, res);
pulsar_message_free(message);
// wait for message.
ASSERT_TRUE(listen_future.get());
ASSERT_EQ(ctx2.size, ctx2.expect_size);
// test retrieve value
void *v2;
size_t v2_size;
ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2, &v2_size));
ASSERT_EQ(v2_size, data_size);
ASSERT_EQ(memcmp(v2, data, data_size), 0);
free(v2);
// test table view size
ASSERT_FALSE(pulsar_table_view_contain_key(table_view, "key2"));
ASSERT_EQ(pulsar_table_view_size(table_view), 2);
free(data);
pulsar_producer_close(producer);
pulsar_table_view_close(table_view);
pulsar_client_close(client);
pulsar_table_view_free(table_view);
pulsar_table_view_configuration_free(table_view_conf);
pulsar_producer_free(producer);
pulsar_producer_configuration_free(producer_conf);
pulsar_client_free(client);
pulsar_client_configuration_free(conf);
}