blob: 34e224e69b3da8c9e5d6c05d8422987c18e19f6b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include "PulsarFriend.h"
#include "SharedBuffer.h"
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
static const std::string exampleSchema =
R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})";
TEST(SchemaTest, testSchema) {
ClientConfiguration config;
Client client(lookupUrl);
Result res;
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
res = client.createProducer("topic-avro", producerConf, producer);
ASSERT_EQ(res, ResultOk);
// Check schema version
ASSERT_FALSE(producer.getSchemaVersion().empty());
producer.close();
ASSERT_EQ(ResultOk, res);
// Creating producer with no schema on same topic should fail
producerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
res = client.createProducer("topic-avro", producerConf, producer);
ASSERT_EQ(ResultIncompatibleSchema, res);
// Creating producer with no schema on same topic should failed.
// Because we set broker config isSchemaValidationEnforced=true
res = client.createProducer("topic-avro", producer);
ASSERT_EQ(ResultIncompatibleSchema, res);
ConsumerConfiguration consumerConf;
Consumer consumer;
// Subscribing with no schema will still succeed
res = client.subscribe("topic-avro", "sub-1", consumerConf, consumer);
ASSERT_EQ(ResultOk, res);
// Subscribing with same Avro schema will succeed
consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
ASSERT_EQ(ResultOk, res);
// Subscribing with different schema type will fail
consumerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
ASSERT_EQ(ResultIncompatibleSchema, res);
client.close();
}
TEST(SchemaTest, testHasSchemaVersion) {
Client client(lookupUrl);
std::string topic = "SchemaTest-HasSchemaVersion";
SchemaInfo stringSchema(SchemaType::STRING, "String", "");
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic + "1", "sub", ConsumerConfiguration().setSchema(stringSchema),
consumer));
Producer batchedProducer;
ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
batchedProducer));
Producer nonBatchedProducer;
ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
nonBatchedProducer));
ASSERT_EQ(ResultOk, batchedProducer.send(MessageBuilder().setContent("msg-0").build()));
ASSERT_EQ(ResultOk, nonBatchedProducer.send(MessageBuilder().setContent("msg-1").build()));
Message msgs[2];
ASSERT_EQ(ResultOk, consumer.receive(msgs[0], 3000));
ASSERT_EQ(ResultOk, consumer.receive(msgs[1], 3000));
std::string schemaVersion(8, '\0');
ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
ASSERT_TRUE(msgs[0].hasSchemaVersion());
ASSERT_EQ(msgs[0].getSchemaVersion(), schemaVersion);
ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
ASSERT_TRUE(msgs[1].hasSchemaVersion());
ASSERT_EQ(msgs[1].getSchemaVersion(), schemaVersion);
client.close();
}
TEST(SchemaTest, testKeyValueSchema) {
SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
ASSERT_EQ(keyValueSchema.getSchema().size(),
8 + keySchema.getSchema().size() + valueSchema.getSchema().size());
}
TEST(SchemaTest, testKeySchemaIsEmpty) {
SchemaInfo keySchema(SchemaType::AVRO, "String", "");
SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
ASSERT_EQ(keyValueSchema.getSchema().size(),
8 + keySchema.getSchema().size() + valueSchema.getSchema().size());
SharedBuffer buffer = SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
keyValueSchema.getSchema().size());
int keySchemaSize = buffer.readUnsignedInt();
ASSERT_EQ(keySchemaSize, -1);
int valueSchemaSize = buffer.readUnsignedInt();
ASSERT_EQ(valueSchemaSize, valueSchema.getSchema().size());
std::string valueSchemaStr(buffer.slice(0, valueSchemaSize).data(), valueSchemaSize);
ASSERT_EQ(valueSchema.getSchema(), valueSchemaStr);
}
TEST(SchemaTest, testValueSchemaIsEmpty) {
SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
SchemaInfo valueSchema(SchemaType::AVRO, "String", "");
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
ASSERT_EQ(keyValueSchema.getSchema().size(),
8 + keySchema.getSchema().size() + valueSchema.getSchema().size());
SharedBuffer buffer = SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
keyValueSchema.getSchema().size());
int keySchemaSize = buffer.readUnsignedInt();
ASSERT_EQ(keySchemaSize, keySchema.getSchema().size());
std::string keySchemaStr(buffer.slice(0, keySchemaSize).data(), keySchemaSize);
ASSERT_EQ(keySchemaStr, keySchema.getSchema());
buffer.consume(keySchemaSize);
int valueSchemaSize = buffer.readUnsignedInt();
ASSERT_EQ(valueSchemaSize, -1);
}
TEST(SchemaTest, testAutoDownloadSchema) {
const std::string topic = "testAutoPublicSchema" + std::to_string(time(nullptr));
std::string jsonSchema =
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo schema(JSON, "test-schema", jsonSchema);
Client client(lookupUrl);
ConsumerConfiguration consumerConfiguration;
consumerConfiguration.setSchema(schema);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer));
ProducerConfiguration producerConfiguration;
Producer producer;
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
Promise<Result, Producer> promise;
clientImplPtr->createProducerAsync(topic, producerConfiguration, WaitForCallbackValue<Producer>(promise),
true);
ASSERT_EQ(ResultOk, promise.getFuture().get(producer));
Message msg = MessageBuilder().setContent("content").build();
ASSERT_EQ(ResultOk, producer.send(msg));
ASSERT_EQ(ResultOk, consumer.receive(msg));
ASSERT_EQ("content", msg.getDataAsString());
}