blob: 6e860adbc3f12a2bcf420935dcff4a095ee0136c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.schema;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "schema")
public class SchemaTest extends MockedPulsarServiceBaseTest {
private final static String CLUSTER_NAME = "test";
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
// Setup namespaces
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfo tenantInfo = TenantInfo.builder()
.allowedClusters(Collections.singleton(CLUSTER_NAME))
.build();
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testMultiTopicSetSchemaProvider() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "test-multi-version-schema-one";
final String topicTwo = "test-multi-version-schema-two";
final String fqtnOne = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicOne
).toString();
final String fqtnTwo = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicTwo
).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
admin.topics().createPartitionedTopic(fqtnOne, 3);
admin.topics().createPartitionedTopic(fqtnTwo, 3);
admin.schemas().createSchema(fqtnOne, Schema.AVRO(
SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonOne.class).build()).getSchemaInfo());
admin.schemas().createSchema(fqtnOne, Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnTwo, personTwoSchema.getSchemaInfo());
Producer<Schemas.PersonTwo> producer = pulsarClient.newProducer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtnOne)
.create();
Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Tom");
Consumer<Schemas.PersonTwo> consumer = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName("test")
.topic(fqtnOne, fqtnTwo)
.subscribe();
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test2")
.topic(fqtnOne, fqtnTwo)
.subscribe();
producer.send(personTwo);
Message<Schemas.PersonTwo> message = consumer.receive();
Schemas.PersonTwo personConsume = message.getValue();
assertEquals(personConsume.getName(), "Tom");
assertEquals(personConsume.getId(), 1);
Schema<?> schema = message.getReaderSchema().get();
log.info("the-schema {}", schema);
assertEquals(personTwoSchema.getSchemaInfo(), schema.getSchemaInfo());
org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema);
assertNotNull(nativeSchema);
// verify that with AUTO_CONSUME we can access the original schema
// and the Native AVRO schema
Message<?> message2 = consumer2.receive();
Schema<?> schema2 = message2.getReaderSchema().get();
log.info("the-schema {}", schema2);
assertEquals(personTwoSchema.getSchemaInfo(), schema2.getSchemaInfo());
org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema2);
assertNotNull(nativeSchema2);
producer.close();
consumer.close();
}
@Test
public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "test-multi-version-schema-one";
final String topicTwo = "test-multi-version-schema-two";
final String fqtnOne = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicOne
).toString();
final String fqtnTwo = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicTwo
).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
admin.topics().createPartitionedTopic(fqtnOne, 3);
admin.topics().createPartitionedTopic(fqtnTwo, 3);
Schema<Schemas.PersonOne> schemaOne = Schema.AVRO(
SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonOne.class).build());
admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaOne).getSchemaInfo());
Schema<Schemas.PersonTwo> schemaTwo = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo());
Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnTwo, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo());
Producer<KeyValue<String, Schemas.PersonTwo>> producer = pulsarClient.newProducer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build())))
.topic(fqtnOne)
.create();
Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Tom");
Consumer<KeyValue<String, Schemas.PersonTwo>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build())))
.subscriptionName("test")
.topic(fqtnOne, fqtnTwo)
.subscribe();
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test2")
.topic(fqtnOne, fqtnTwo)
.subscribe();
producer.send(new KeyValue<>("foo", personTwo));
Message<KeyValue<String, Schemas.PersonTwo>> message = consumer.receive();
assertEquals("foo", message.getValue().getKey());
Schemas.PersonTwo personConsume = message.getValue().getValue();
assertEquals(personConsume.getName(), "Tom");
assertEquals(personConsume.getId(), 1);
KeyValueSchemaImpl schema = (KeyValueSchemaImpl) message.getReaderSchema().get();
log.info("the-schema {}", schema);
assertEquals(personTwoSchema.getSchemaInfo(), schema.getValueSchema().getSchemaInfo());
org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema);
assertNotNull(nativeSchema);
// verify that with AUTO_CONSUME we can access the original schema
// and the Native AVRO schema
Message<?> message2 = consumer2.receive();
KeyValueSchemaImpl schema2 = (KeyValueSchemaImpl) message2.getReaderSchema().get();
log.info("the-schema {}", schema2);
assertEquals(personTwoSchema.getSchemaInfo(), schema2.getValueSchema().getSchemaInfo());
org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema2);
assertNotNull(nativeSchema2);
producer.close();
consumer.close();
}
@Test
public void testJSONSchemaDeserialize() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-bytes-schema";
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
admin.topics().createPartitionedTopic(topic, 2);
admin.schemas().createSchema(topic, Schema.JSON(Schemas.BytesRecord.class).getSchemaInfo());
Producer<Schemas.BytesRecord> producer = pulsarClient
.newProducer(Schema.JSON(Schemas.BytesRecord.class))
.topic(topic)
.create();
Schemas.BytesRecord bytesRecord = new Schemas.BytesRecord();
bytesRecord.setId(1);
bytesRecord.setName("Tom");
bytesRecord.setAddress("test".getBytes());
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub")
.topic(topic)
.subscribe();
Consumer<Schemas.BytesRecord> consumer1 = pulsarClient.newConsumer(Schema.JSON(Schemas.BytesRecord.class))
.subscriptionName("test-sub1")
.topic(topic)
.subscribe();
producer.send(bytesRecord);
Message<GenericRecord> message = consumer.receive();
Message<Schemas.BytesRecord> message1 = consumer1.receive();
assertEquals(message.getValue().getField("address").getClass(),
message1.getValue().getAddress().getClass());
Schema<?> schema = message.getReaderSchema().get();
Schema<?> schema1 = message1.getReaderSchema().get();
log.info("schema {}", schema);
log.info("schema1 {}", schema1);
assertEquals(schema.getSchemaInfo(), schema1.getSchemaInfo());
producer.close();
consumer.close();
consumer1.close();
}
@Test
public void testStringSchema() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-string-schema";
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
admin.topics().createPartitionedTopic(topic, 2);
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.topic(topic)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName("test-sub")
.topic(topic)
.subscribe();
// use GenericRecord even for primitive types
// it will be a PrimitiveRecord
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub3")
.topic(topic)
.subscribe();
producer.send("foo");
Message<String> message = consumer.receive();
Message<GenericRecord> message2 = consumer2.receive();
assertEquals(SchemaType.STRING, message.getReaderSchema().get().getSchemaInfo().getType());
assertEquals(SchemaType.STRING, message2.getReaderSchema().get().getSchemaInfo().getType());
assertEquals("foo", message.getValue());
assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
assertEquals(SchemaType.STRING, message2.getValue().getSchemaType());
assertEquals("foo", message2.getValue().getNativeObject());
producer.close();
consumer.close();
consumer2.close();
}
@Test
public void testUseAutoConsumeWithBytesSchemaTopic() throws Exception {
testUseAutoConsumeWithSchemalessTopic(SchemaType.BYTES);
}
@Test
public void testUseAutoConsumeWithNoneSchemaTopic() throws Exception {
testUseAutoConsumeWithSchemalessTopic(SchemaType.NONE);
}
private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-schemaless";
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
admin.topics().createPartitionedTopic(topic, 2);
// set schema
SchemaInfo schemaInfo = SchemaInfoImpl
.builder()
.schema(new byte[0])
.name("dummySchema")
.type(schema)
.build();
admin.schemas().createSchema(topic, schemaInfo);
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.subscriptionName("test-sub")
.topic(topic)
.subscribe();
// use GenericRecord even for primitive types
// it will be a GenericObjectWrapper
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub3")
.topic(topic)
.subscribe();
producer.send("foo".getBytes(StandardCharsets.UTF_8));
Message<byte[]> message = consumer.receive();
Message<GenericRecord> message2 = consumer2.receive();
if (schema == SchemaType.BYTES) {
assertEquals(schema, message.getReaderSchema().get().getSchemaInfo().getType());
assertEquals(schema, message2.getReaderSchema().get().getSchemaInfo().getType());
} else if (schema == SchemaType.NONE) {
// schema NONE is always reported as BYTES
assertEquals(SchemaType.BYTES, message.getReaderSchema().get().getSchemaInfo().getType());
assertEquals(SchemaType.BYTES, message2.getReaderSchema().get().getSchemaInfo().getType());
} else {
fail();
}
assertEquals("foo".getBytes(StandardCharsets.UTF_8), message.getValue());
assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
assertEquals(SchemaType.BYTES, message2.getValue().getSchemaType());
assertEquals("foo".getBytes(StandardCharsets.UTF_8), message2.getValue().getNativeObject());
producer.close();
consumer.close();
consumer2.close();
}
@Test
public void testKeyValueSchemaINLINE() throws Exception {
testKeyValueSchema(KeyValueEncodingType.INLINE);
}
@Test
public void testKeyValueSchemaSEPARATED() throws Exception {
testKeyValueSchema(KeyValueEncodingType.SEPARATED);
}
private void testKeyValueSchema(KeyValueEncodingType keyValueEncodingType) throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-kv-schema-" + randomName(16);
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
admin.topics().createPartitionedTopic(topic, 2);
Producer<KeyValue<String, Integer>> producer = pulsarClient
.newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, keyValueEncodingType))
.topic(topic)
.create();
Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, keyValueEncodingType))
.subscriptionName("test-sub")
.topic(topic)
.subscribe();
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) // keyValueEncodingType autodetected
.subscriptionName("test-sub2")
.topic(topic)
.subscribe();
producer.send(new KeyValue<>("foo", 123));
Message<KeyValue<String, Integer>> message = consumer.receive();
Message<GenericRecord> message2 = consumer2.receive();
assertEquals(message.getValue(), message2.getValue().getNativeObject());
if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
// with "SEPARATED encoding the routing key is the key of the KeyValue
assertArrayEquals("foo".getBytes(StandardCharsets.UTF_8), message.getKeyBytes());
assertArrayEquals("foo".getBytes(StandardCharsets.UTF_8), message2.getKeyBytes());
} else {
assertNull(message.getKey());
assertNull(message2.getKey());
}
producer.close();
consumer.close();
consumer2.close();
}
@Test
public void testKeyValueSchemaWithStructsINLINE() throws Exception {
testKeyValueSchemaWithStructs(KeyValueEncodingType.INLINE);
}
@Test
public void testKeyValueSchemaWithStructsSEPARATED() throws Exception {
testKeyValueSchemaWithStructs(KeyValueEncodingType.SEPARATED);
}
private void testKeyValueSchemaWithStructs(KeyValueEncodingType keyValueEncodingType) throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-kv-schema-" + randomName(16);
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
admin.topics().createPartitionedTopic(topic, 2);
Producer<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> producer = pulsarClient
.newProducer(Schema.KeyValue(Schema.AVRO(Schemas.PersonOne.class), Schema.AVRO(Schemas.PersonTwo.class), keyValueEncodingType))
.topic(topic)
.create();
Consumer<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.AVRO(Schemas.PersonOne.class), Schema.AVRO(Schemas.PersonTwo.class), keyValueEncodingType))
.subscriptionName("test-sub")
.topic(topic)
.subscribe();
Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) // keyValueEncodingType autodetected
.subscriptionName("test-sub2")
.topic(topic)
.subscribe();
Schemas.PersonOne key = new Schemas.PersonOne(8787);
Schemas.PersonTwo value = new Schemas.PersonTwo(323, "foo");
producer.send(new KeyValue<>(key, value));
Message<KeyValue<Schemas.PersonOne, Schemas.PersonTwo>> message = consumer.receive();
Message<GenericRecord> message2 = consumer2.receive();
log.info("message: {}", message.getValue(), message.getValue().getClass());
log.info("message2: {}", message2.getValue().getNativeObject(), message2.getValue().getNativeObject().getClass());
KeyValue<GenericRecord, GenericRecord> keyValue2 = (KeyValue<GenericRecord, GenericRecord>) message2.getValue().getNativeObject();
assertEquals(message.getValue().getKey().id, keyValue2.getKey().getField("id"));
assertEquals(message.getValue().getValue().id, keyValue2.getValue().getField("id"));
assertEquals(message.getValue().getValue().name, keyValue2.getValue().getField("name"));
Schema<?> schema = message.getReaderSchema().get();
Schema<?> schemaFromGenericRecord = message.getReaderSchema().get();
KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) schema;
KeyValueSchemaImpl keyValueSchemaFromGenericRecord = (KeyValueSchemaImpl) schemaFromGenericRecord;
assertEquals(keyValueSchema.getSchemaInfo(), keyValueSchemaFromGenericRecord.getSchemaInfo());
if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
// with "SEPARATED encoding the routing key is the key of the KeyValue
assertNotNull(message.getKeyBytes());
assertNotNull(message2.getKeyBytes());
} else {
assertNull(message.getKey());
assertNull(message2.getKey());
}
producer.close();
consumer.close();
consumer2.close();
}
@Test
public void testIsUsingAvroSchemaParser() {
for (SchemaType value : SchemaType.values()) {
if (value == SchemaType.AVRO || value == SchemaType.JSON || value == SchemaType.PROTOBUF) {
assertTrue(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
} else {
assertFalse(SchemaRegistryServiceImpl.isUsingAvroSchemaParser(value));
}
}
}
@Test
public void testNullKeyValueProperty() throws PulsarAdminException, PulsarClientException {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test";
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
final Map<String, String> map = new HashMap<>();
map.put("key", null);
map.put(null, "value"); // null key is not allowed for JSON, it's only for test here
((SchemaInfoImpl)Schema.INT32.getSchemaInfo()).setProperties(map);
final Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic)
.subscriptionName("sub")
.subscribe();
consumer.close();
}
@Test
public void testDeleteTopicAndSchema() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-delete-topic-and-schema";
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
@Cleanup
Producer<Schemas.PersonOne> p1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
.topic(topic)
.create();
@Cleanup
Producer<Schemas.PersonThree> p2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
.topic(topic)
.create();
List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures =
this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic).getSchemaName()).get();
FutureUtil.waitForAll(schemaFutures).get();
List<SchemaRegistry.SchemaAndMetadata> schemas = schemaFutures.stream().map(future -> {
try {
return future.get();
} catch (Exception e) {
return null;
}
}).collect(Collectors.toList());
assertEquals(schemas.size(), 2);
for (SchemaRegistry.SchemaAndMetadata schema : schemas) {
assertNotNull(schema);
}
List<Long> ledgers = ((BookkeeperSchemaStorage)this.getPulsar().getSchemaStorage())
.getSchemaLedgerList(TopicName.get(topic).getSchemaName());
assertEquals(ledgers.size(), 2);
admin.topics().delete(topic, true, true);
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0);
for (Long ledger : ledgers) {
try {
getPulsar().getBookKeeperClient().openLedger(ledger, BookKeeper.DigestType.CRC32, new byte[]{});
fail();
} catch (BKException.BKNoSuchLedgerExistsException ignore) {
}
}
}
@Test
public void testProducerMultipleSchemaMessages() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "auto_schema_test";
String ns = tenant + "/" + namespace;
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
final String topic = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicName).toString();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(topic)
.create();
producer.newMessage(Schema.STRING).value("test").send();
producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value(new Schemas.PersonThree(0, "ran")).send();
producer.newMessage(Schema.AVRO(Schemas.PersonThree.class)).value(new Schemas.PersonThree(0, "ran")).send();
producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value(new Schemas.PersonOne(0)).send();
producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value(new Schemas.PersonThree(1, "tang")).send();
producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
producer.newMessage(Schema.BOOL).value(true).send();
List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
Assert.assertEquals(allSchemas.size(), 5);
Assert.assertEquals(allSchemas.get(0), Schema.STRING.getSchemaInfo());
Assert.assertEquals(allSchemas.get(1), Schema.JSON(Schemas.PersonThree.class).getSchemaInfo());
Assert.assertEquals(allSchemas.get(2), Schema.AVRO(Schemas.PersonThree.class).getSchemaInfo());
Assert.assertEquals(allSchemas.get(3), Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
Assert.assertEquals(allSchemas.get(4), Schema.BOOL.getSchemaInfo());
}
@Test
public void testNullKey() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-schema-" + randomName(16);
final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));
admin.topics().createPartitionedTopic(topic, 2);
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.topic(topic)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName("test-sub")
.topic(topic)
.subscribe();
producer.send("foo");
Message<String> message = consumer.receive();
assertNull(message.getKey());
assertEquals("foo", message.getValue());
}
public void testConsumeMultipleSchemaMessages() throws Exception {
final String namespace = "test-namespace-" + randomName(16);
String ns = PUBLIC_TENANT + "/" + namespace;
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
final String autoProducerTopic = getTopicName(ns, "auto_produce_topic");
Producer<byte[]> autoProducer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(autoProducerTopic)
.create();
AtomicInteger totalMsgCnt = new AtomicInteger(0);
generateDataByDifferentSchema(ns, "bytes_schema", Schema.BYTES, "bytes value".getBytes(),
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "string_schema", Schema.STRING, "string value",
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "bool_schema", Schema.BOOL, true,
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "json_one_schema", Schema.JSON(Schemas.PersonOne.class),
new Schemas.PersonOne(1), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "json_three_schema", Schema.JSON(Schemas.PersonThree.class),
new Schemas.PersonThree(3, "ran"), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "json_four_schema", Schema.JSON(Schemas.PersonFour.class),
new Schemas.PersonFour(4, "tang", 18), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "avro_one_schema", Schema.AVRO(Schemas.PersonOne.class),
new Schemas.PersonOne(10), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "k_one_v_three_schema_separate",
Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
new KeyValue<>(new Schemas.PersonOne(1), new Schemas.PersonThree(3, "kv-separate")),
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "k_one_v_four_schema_inline",
Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
Schema.JSON(Schemas.PersonFour.class), KeyValueEncodingType.INLINE),
new KeyValue<>(new Schemas.PersonOne(10), new Schemas.PersonFour(30, "kv-inline", 20)),
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "k_int_v_three_schema_separate",
Schema.KeyValue(Schema.INT32, Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
new KeyValue<>(100, new Schemas.PersonThree(40, "kv-separate")),
autoProducer, totalMsgCnt);
Consumer<GenericRecord> autoConsumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(autoProducerTopic)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<GenericRecord> message;
for (int i = 0; i < totalMsgCnt.get(); i++) {
message = autoConsumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
Assert.fail("Failed to receive multiple schema message.");
}
log.info("auto consumer get native object class: {}, value: {}",
message.getValue().getNativeObject().getClass(), message.getValue().getNativeObject());
checkSchemaForAutoSchema(message);
}
}
private String getTopicName(String ns, String baseTopic) {
return ns + "/" + baseTopic;
}
private void generateDataByDifferentSchema(String ns,
String baseTopic,
Schema schema,
Object data,
Producer<?> autoProducer,
AtomicInteger totalMsgCnt) throws PulsarClientException {
String topic = getTopicName(ns, baseTopic);
Producer<Object> producer = pulsarClient.newProducer(schema)
.topic(topic)
.create();
producer.newMessage().value(data).property("baseTopic", baseTopic).send();
totalMsgCnt.incrementAndGet();
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<GenericRecord> message = consumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
Assert.fail("Failed to receive message for topic " + topic);
}
if (!message.getReaderSchema().isPresent()) {
Assert.fail("Failed to get reader schema for topic " + topic);
}
message.getValue();
TypedMessageBuilder messageBuilder = autoProducer
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(message.getProperties());
if (message.getKeyBytes() != null) {
messageBuilder.keyBytes(message.getKeyBytes());
}
messageBuilder.send();
producer.close();
consumer.close();
}
private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
if (!message.getReaderSchema().isPresent()) {
Assert.fail("Failed to get reader schema for auto consume multiple schema topic.");
}
Object nativeObject = message.getValue().getNativeObject();
String baseTopic = message.getProperty("baseTopic");
JsonNode jsonNode;
KeyValue<?, ?> kv;
switch (baseTopic) {
case "bytes_schema":
Assert.assertEquals(new String((byte[]) nativeObject), "bytes value");
break;
case "string_schema":
Assert.assertEquals((String) nativeObject, "string value");
break;
case "bool_schema":
Assert.assertEquals(nativeObject, Boolean.TRUE);
break;
case "json_one_schema":
jsonNode = (JsonNode) nativeObject;
Assert.assertEquals(jsonNode.get("id").intValue(), 1);
break;
case "json_three_schema":
jsonNode = (JsonNode) nativeObject;
Assert.assertEquals(jsonNode.get("id").intValue(), 3);
Assert.assertEquals(jsonNode.get("name").textValue(), "ran");
break;
case "json_four_schema":
jsonNode = (JsonNode) nativeObject;
Assert.assertEquals(jsonNode.get("id").intValue(), 4);
Assert.assertEquals(jsonNode.get("name").textValue(), "tang");
Assert.assertEquals(jsonNode.get("age").intValue(), 18);
break;
case "avro_one_schema":
org.apache.avro.generic.GenericRecord genericRecord =
(org.apache.avro.generic.GenericRecord) nativeObject;
Assert.assertEquals(genericRecord.get("id"), 10);
break;
case "k_one_v_three_schema_separate":
kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 1);
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 3);
Assert.assertEquals(jsonNode.get("name").textValue(), "kv-separate");
break;
case "k_one_v_four_schema_inline":
kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 10);
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 30);
Assert.assertEquals(jsonNode.get("name").textValue(), "kv-inline");
Assert.assertEquals(jsonNode.get("age").intValue(), 20);
break;
case "k_int_v_three_schema_separate":
kv = (KeyValue<Integer, GenericRecord>) nativeObject;
Assert.assertEquals(kv.getKey(), 100);
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 40);
Assert.assertEquals(jsonNode.get("name").textValue(), "kv-separate");
break;
default:
// nothing to do
}
}
}