blob: 37bd78047bcdc8acf3c5a6e8d6876d480ebfe8aa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <pulsar/ProtobufNativeSchema.h>
#include <stdexcept>
#include "PaddingDemo.pb.h"
#include "Test.pb.h"
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
const ::google::protobuf::Descriptor* getTestMessageDescriptor() {
#if GOOGLE_PROTOBUF_VERSION < 3020000
::proto::TestMessage msg;
return msg.GetDescriptor();
#else
return ::proto::TestMessage::GetDescriptor();
#endif
}
const ::google::protobuf::Descriptor* getExternalMessageDescriptor() {
#if GOOGLE_PROTOBUF_VERSION < 3020000
::proto::external::ExternalMessage msg;
return msg.GetDescriptor();
#else
return ::proto::TestMessage::GetDescriptor();
#endif
}
const ::google::protobuf::Descriptor* getDemoPersonDescriptor() {
#if GOOGLE_PROTOBUF_VERSION < 3020000
::padding::demo::Person p;
return p.GetDescriptor();
#else
return ::padding::demo::Person::GetDescriptor();
#endif
}
TEST(ProtobufNativeSchemaTest, testSchemaJson) {
const std::string expectedSchemaJson =
"{\"fileDescriptorSet\":"
"\"CtMDCgpUZXN0LnByb3RvEgVwcm90bxoSRXh0ZXJuYWxUZXN0LnByb3RvImUKClN1Yk1lc3NhZ2USCwoDZm9vGAEgASgJEgsKA2"
"JhchgCIAEoARo9Cg1OZXN0ZWRNZXNzYWdlEgsKA3VybBgBIAEoCRINCgV0aXRsZRgCIAEoCRIQCghzbmlwcGV0cxgDIAMoCSLlAQ"
"oLVGVzdE1lc3NhZ2USEwoLc3RyaW5nRmllbGQYASABKAkSEwoLZG91YmxlRmllbGQYAiABKAESEAoIaW50RmllbGQYBiABKAUSIQ"
"oIdGVzdEVudW0YBCABKA4yDy5wcm90by5UZXN0RW51bRImCgtuZXN0ZWRGaWVsZBgFIAEoCzIRLnByb3RvLlN1Yk1lc3NhZ2USFQ"
"oNcmVwZWF0ZWRGaWVsZBgKIAMoCRI4Cg9leHRlcm5hbE1lc3NhZ2UYCyABKAsyHy5wcm90by5leHRlcm5hbC5FeHRlcm5hbE1lc3"
"NhZ2UqJAoIVGVzdEVudW0SCgoGU0hBUkVEEAASDAoIRkFJTE9WRVIQAUItCiVvcmcuYXBhY2hlLnB1bHNhci5jbGllbnQuc2NoZW"
"1hLnByb3RvQgRUZXN0YgZwcm90bzMKoAEKEkV4dGVybmFsVGVzdC5wcm90bxIOcHJvdG8uZXh0ZXJuYWwiOwoPRXh0ZXJuYWxNZX"
"NzYWdlEhMKC3N0cmluZ0ZpZWxkGAEgASgJEhMKC2RvdWJsZUZpZWxkGAIgASgBQjUKJW9yZy5hcGFjaGUucHVsc2FyLmNsaWVudC"
"5zY2hlbWEucHJvdG9CDEV4dGVybmFsVGVzdGIGcHJvdG8z\",\"rootMessageTypeName\":\"proto.TestMessage\","
"\"rootFileDescriptorName\":\"Test.proto\"}";
const auto schemaInfo = createProtobufNativeSchema(getTestMessageDescriptor());
ASSERT_EQ(schemaInfo.getSchemaType(), pulsar::PROTOBUF_NATIVE);
ASSERT_TRUE(schemaInfo.getName().empty());
ASSERT_EQ(schemaInfo.getSchema(), expectedSchemaJson);
ASSERT_TRUE(schemaInfo.getProperties().empty());
}
TEST(ProtobufNativeSchemaTest, testAutoCreateSchema) {
const std::string topicPrefix = "ProtobufNativeSchemaTest-testAutoCreateSchema-";
Client client(lookupUrl);
const auto schemaInfo = createProtobufNativeSchema(getTestMessageDescriptor());
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicPrefix + "producer",
ProducerConfiguration().setSchema(schemaInfo), producer));
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicPrefix + "consumer", "my-sub",
ConsumerConfiguration().setSchema(schemaInfo), consumer));
client.close();
}
TEST(ProtobufNativeSchemaTest, testSchemaIncompatibility) {
const std::string topic = "ProtobufNativeSchemaTest-testSchemaIncompatibility";
Client client(lookupUrl);
Producer producer;
auto createProducerResult = [&](const google::protobuf::Descriptor* descriptor) {
return client.createProducer(
topic, ProducerConfiguration().setSchema(createProtobufNativeSchema(descriptor)), producer);
};
// Create the protobuf native schema automatically
ASSERT_EQ(ResultOk, createProducerResult(getTestMessageDescriptor()));
producer.close();
// Try to create producer with another protobuf generated class
ASSERT_EQ(ResultIncompatibleSchema, createProducerResult(getExternalMessageDescriptor()));
// Try to create producer with the original schema again
ASSERT_EQ(ResultOk, createProducerResult(getTestMessageDescriptor()));
// createProtobufNativeSchema() cannot accept a null descriptor
try {
createProducerResult(nullptr);
} catch (const std::invalid_argument& e) {
ASSERT_STREQ(e.what(), "descriptor is null");
}
client.close();
}
TEST(ProtobufNativeSchemaTest, testEndToEnd) {
const std::string topic = "ProtobufSchemaTest-testEndToEnd";
Client client(lookupUrl);
const auto schemaInfo = createProtobufNativeSchema(getTestMessageDescriptor());
Consumer consumer;
ASSERT_EQ(ResultOk,
client.subscribe(topic, "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer));
Producer producer;
ASSERT_EQ(ResultOk,
client.createProducer(topic, ProducerConfiguration().setSchema(schemaInfo), producer));
// Send a message that is serialized from a ProtoBuf class
::proto::TestMessage testMessage;
testMessage.set_testenum(::proto::TestEnum::FAILOVER);
std::string content(testMessage.ByteSizeLong(), '\0');
testMessage.SerializeToArray(const_cast<char*>(content.data()), content.size());
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(content).build()));
// Receive a message and parse it to the ProtoBuf class
::proto::TestMessage receivedTestMessage;
ASSERT_EQ(receivedTestMessage.testenum(), ::proto::TestEnum::SHARED);
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
receivedTestMessage.ParseFromArray(msg.getData(), msg.getLength());
ASSERT_EQ(receivedTestMessage.testenum(), ::proto::TestEnum::FAILOVER);
ASSERT_TRUE(msg.hasSchemaVersion());
ASSERT_EQ(msg.getSchemaVersion(), std::string(8L, '\0'));
client.close();
}
TEST(ProtobufNativeSchemaTest, testBase64WithPadding) {
const auto schemaInfo = createProtobufNativeSchema(getDemoPersonDescriptor());
const auto schemaJson = schemaInfo.getSchema();
size_t pos = schemaJson.find(R"(","rootMessageTypeName":)");
ASSERT_NE(pos, std::string::npos);
ASSERT_TRUE(pos > 0);
ASSERT_EQ(schemaJson[pos - 1], '='); // the tail of fileDescriptorSet is a padding character
Client client(lookupUrl);
const std::string topic = "ProtobufSchemaTest-testBase64WithPadding";
Producer producer;
ASSERT_EQ(ResultOk,
client.createProducer(topic, ProducerConfiguration().setSchema(schemaInfo), producer));
client.close();
}