blob: 4d5c1569948059e434c7b5c5297d3d308011f497 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Test(groups = "broker-api")
public class SimpleSchemaTest extends ProducerConsumerBase {
@DataProvider(name = "batchingModes")
public static Object[][] batchingModes() {
return new Object[][] {
{ true },
{ false }
};
}
@DataProvider(name = "schemaValidationModes")
public static Object[][] schemaValidationModes() {
return new Object[][] { { true }, { false } };
}
@DataProvider(name = "topicDomain")
public static Object[] topicDomain() {
return new Object[] { "persistent://", "non-persistent://" };
}
private final boolean schemaValidationEnforced;
@Factory(dataProvider = "schemaValidationModes")
public SimpleSchemaTest(boolean schemaValidationEnforced) {
this.schemaValidationEnforced = schemaValidationEnforced;
}
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setSchemaValidationEnforced(schemaValidationEnforced);
this.isTcpLookup = true;
super.internalSetup();
super.producerBaseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testString() throws Exception {
try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic1").create()) {
int N = 10;
for (int i = 0; i < N; i++) {
producer.send("my-message-" + i);
}
for (int i = 0; i < N; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "my-message-" + i);
consumer.acknowledge(msg);
}
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class V1Data {
int i;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class V2Data {
int i;
Integer j;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class IncompatibleData {
int i;
int j;
}
@Test
public void newProducerNewTopicNewSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic).create()) {
p.send(new V1Data(0));
}
}
@Test
public void newProducerTopicExistsWithoutSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create()) {
p.send(topic.getBytes(UTF_8));
}
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic).create()) {
p.send(new V1Data(0));
}
}
@Test
public void newProducerTopicExistsWithSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic).create()) {
p.send(new V1Data(1));
}
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic).create()) {
p.send(new V1Data(0));
}
}
@Test
public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic).create()) {
p.send(new V1Data(0));
}
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.BYTES).topic(topic).create()) {
if (!schemaValidationEnforced) {
p.send("junkdata".getBytes(UTF_8));
} else {
Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema"
+ " if SchemaValidationEnabled is enabled");
}
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
+ " if SchemaValidationEnforced is disabled");
}
}
// if using AUTO_PRODUCE_BYTES, producer can connect but the publish will fail
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create()) {
p.send("junkdata".getBytes(UTF_8));
} catch (PulsarClientException e) {
assertTrue(e.getCause() instanceof SchemaSerializationException);
}
}
@Test
public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
AvroWriter<V1Data> v1Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)));
Schema<V2Data> v2Schema = Schema.AVRO(V2Data.class);
byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema();
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
try (Producer<V1Data> ignored = pulsarClient.newProducer(v1Schema)
.topic(topic).create()) {
}
try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class))
.topic(topic).create()) {
p.send(new V2Data(-1, -1));
}
V1Data dataV1 = new V1Data(2);
V2Data dataV2 = new V2Data(3, 5);
byte[] contentV1 = v1Writer.write(dataV1);
byte[] contentV2 = v2Writer.write(dataV2);
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(topic).create();
Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
.topic(topic)
.subscriptionName("sub1").subscribe()) {
Assert.expectThrows(SchemaSerializationException.class, () -> p.send(contentV1));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(contentV1).send();
p.send(contentV2);
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
Assert.assertEquals(dataV1.i, msg1Value.i);
Assert.assertNull(msg1Value.j);
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
Message<V2Data> msg2 = c.receive();
Assert.assertEquals(dataV2, msg2.getValue());
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());
try {
p.newMessage(Schema.BYTES).value(contentV1).send();
if (schemaValidationEnforced) {
Assert.fail("Shouldn't be able to send to a schema'd topic with no schema"
+ " if SchemaValidationEnabled is enabled");
}
Message<V2Data> msg3 = c.receive();
Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
+ " if SchemaValidationEnforced is disabled");
}
}
}
}
@Test
public void newProducerForMessageOnTopicWithDifferentSchemaType() throws Exception {
String topic = "my-property/my-ns/schema-test";
V1Data data1 = new V1Data(2);
V2Data data2 = new V2Data(3, 5);
V1Data data3 = new V1Data(8);
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic).create();
Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
.topic(topic)
.subscriptionName("sub1").subscribe()) {
p.newMessage().value(data1).send();
p.newMessage(Schema.AVRO(V2Data.class)).value(data2).send();
p.newMessage(Schema.AVRO(V1Data.class)).value(data3).send();
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
Assert.assertEquals(data1.i, msg1Value.i);
Assert.assertNull(msg1Value.j);
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
Message<V2Data> msg2 = c.receive();
Assert.assertEquals(data2, msg2.getValue());
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());
Message<V2Data> msg3 = c.receive();
V2Data msg3Value = msg3.getValue();
Assert.assertEquals(data3.i, msg3Value.i);
Assert.assertNull(msg3Value.j);
Assert.assertEquals(msg3.getSchemaVersion(), new LongSchemaVersion(0).bytes());
}
}
@Test
public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
AvroWriter<V1Data> v1Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)));
Schema<V2Data> v2Schema = Schema.AVRO(V2Data.class);
byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema();
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
try (Producer<byte[]> p = pulsarClient.newProducer()
.topic(topic).create();
Consumer<byte[]> c = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub1").subscribe()) {
for (int i = 0; i < 2; ++i) {
V1Data dataV1 = new V1Data(i);
V2Data dataV2 = new V2Data(i, -i);
byte[] contentV1 = v1Writer.write(dataV1);
byte[] contentV2 = v2Writer.write(dataV2);
p.newMessage(Schema.AUTO_PRODUCE_BYTES(v1Schema)).value(contentV1).send();
Message<byte[]> msg1 = c.receive();
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
Assert.assertEquals(msg1.getData(), contentV1);
p.newMessage(Schema.AUTO_PRODUCE_BYTES(v2Schema)).value(contentV2).send();
Message<byte[]> msg2 = c.receive();
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());
Assert.assertEquals(msg2.getData(), contentV2);
}
}
List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
v2Schema.getSchemaInfo()));
}
@Test
public void newProducerForMessageSchemaWithBatch() throws Exception {
String topic = "my-property/my-ns/schema-test";
Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
.topic(topic)
.subscriptionName("sub1").subscribe();
Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V1Data.class));
AvroWriter<V2Data> v2DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V2Data.class));
AvroWriter<IncompatibleData> incompatibleDataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(IncompatibleData.class));
int total = 20;
int batch = 5;
int incompatible = 3;
for (int i = 0; i < total; ++i) {
if (i / batch % 2 == 0) {
byte[] content = v1DataAvroWriter.write(new V1Data(i));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(content).sendAsync();
} else {
byte[] content = v2DataAvroWriter.write(new V2Data(i, i + total));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class)))
.value(content).sendAsync();
}
if ((i + 1) % incompatible == 0) {
byte[] content = incompatibleDataAvroWriter.write(new IncompatibleData(-i, -i));
try {
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class)))
.value(content).send();
} catch (Exception e) {
Assert.assertTrue(e instanceof IncompatibleSchemaException, e.getMessage());
}
}
}
p.flush();
for (int i = 0; i < total; ++i) {
V2Data value = c.receive().getValue();
if (i / batch % 2 == 0) {
Assert.assertNull(value.j);
Assert.assertEquals(value.i, i);
} else {
Assert.assertEquals(value, new V2Data(i, i + total));
}
}
c.close();
}
@Test
public void newProducerWithMultipleSchemaDisabled() throws Exception {
String topic = "my-property/my-ns/schema-test";
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V1Data.class));
try (Producer<byte[]> p = pulsarClient.newProducer()
.topic(topic)
.enableMultiSchema(false).create()) {
Assert.assertThrows(InvalidMessageException.class,
() -> p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(v1DataAvroWriter.write(new V1Data(0))).send());
}
}
@Test
public void newConsumerWithSchemaOnNewTopic() throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic).subscriptionName("sub1").subscribe();
Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create()) {
V1Data toSend = new V1Data(1);
p.send(toSend);
Assert.assertEquals(toSend, c.receive().getValue());
}
}
@Test
public void newConsumerWithSchemaOnExistingTopicWithoutSchema() {
String topic = "my-property/my-ns/schema-test";
try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create();
Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic).subscriptionName("sub1").subscribe()) {
Assert.fail("Shouldn't be able to consume with a schema from a topic which has no schema set");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
}
}
@Test
public void newConsumerWithSchemaTopicHasSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create();
Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic).subscriptionName("sub1").subscribe()) {
V1Data toSend = new V1Data(1);
p.send(toSend);
Assert.assertEquals(toSend, c.receive().getValue());
}
}
@Test
public void newBytesConsumerWithTopicWithSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create();
Consumer<byte[]> c = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe()) {
p.send(new V1Data(1));
Assert.assertTrue(c.receive().getValue().length > 0);
}
}
@Test
public void getSchemaVersionFromMessagesBatchingDisabled() throws Exception {
getSchemaVersionFromMessages(false);
}
@Test
public void getSchemaVersionFromMessagesBatchingEnabled() throws Exception {
getSchemaVersionFromMessages(true);
}
private void getSchemaVersionFromMessages(boolean batching) throws Exception {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic)
.enableBatching(batching)
.create();
Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic)
.subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
p.send(new V1Data(1));
Message<V1Data> data = c.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue(), new V1Data(1));
}
}
@Test(dataProvider = "batchingModes")
public void testAutoConsume(boolean batching) throws Exception {
String topic = "my-property/my-ns/schema-test-auto-consume-" + batching;
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic)
.enableBatching(batching)
.create();
Consumer<GenericRecord> c = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
p.sendAsync(new V1Data(i));
}
p.flush();
for (int i = 0; i < numMessages; i++) {
Message<GenericRecord> data = c.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getField("i"), i);
MessageImpl impl = (MessageImpl) data;
org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchema().getNativeSchema().get();
assertNotNull(avroSchema);
}
}
}
@Test(dataProvider = "batchingModes")
public void testAutoKeyValueConsume(boolean batching) throws Exception {
String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-" + batching;
Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
Schema.AVRO(V1Data.class),
Schema.AVRO(V1Data.class),
KeyValueEncodingType.SEPARATED);
try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
.topic(topic)
.enableBatching(batching)
.create();
Consumer<KeyValue<GenericRecord, GenericRecord>> c1 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AUTO_CONSUME(),
Schema.AUTO_CONSUME(),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<KeyValue<V1Data, V1Data>> c2 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AVRO(V1Data.class),
Schema.AVRO(V1Data.class),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub2")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<KeyValue<GenericRecord, V1Data>> c3 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AUTO_CONSUME(),
Schema.AVRO(V1Data.class),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub3")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<KeyValue<V1Data, GenericRecord>> c4 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AVRO(V1Data.class),
Schema.AUTO_CONSUME(),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub4")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
p.sendAsync(new KeyValue<>(new V1Data(i * 100), new V1Data(i * 1000)));
}
p.flush();
// verify c1
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<GenericRecord, GenericRecord>> data = c1.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
}
// verify c2
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<V1Data, V1Data>> data = c2.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
}
// verify c3
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<GenericRecord, V1Data>> data = c3.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
}
// verify c4
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<V1Data, GenericRecord>> data = c4.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
}
}
}
@Test
public void testGetSchemaByVersion() throws PulsarClientException, PulsarAdminException, ExecutionException, InterruptedException {
final String topic = "persistent://my-property/my-ns/testGetSchemaByVersion";
PulsarClientImpl httpProtocolClient = (PulsarClientImpl) PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient;
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic)
.create();
pulsarClient.newProducer(Schema.AVRO(V2Data.class))
.topic(topic)
.create();
LookupService httpLookupService = httpProtocolClient.getLookup();
LookupService binaryLookupService = binaryProtocolClient.getLookup();
Assert.assertTrue(httpLookupService instanceof HttpLookupService);
Assert.assertTrue(binaryLookupService instanceof BinaryProtoLookupService);
Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
}
@Test(dataProvider = "topicDomain")
public void testAutoCreatedSchema(String domain) throws Exception {
final String topic1 = domain + "my-property/my-ns/testAutoCreatedSchema-1";
final String topic2 = domain + "my-property/my-ns/testAutoCreatedSchema-2";
pulsarClient.newProducer(Schema.BYTES).topic(topic1).create().close();
try {
// BYTES schema is treated as no schema
admin.schemas().getSchemaInfo(topic1);
fail("The schema of topic1 should not exist");
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 404);
}
pulsarClient.newProducer(Schema.STRING).topic(topic1).create().close();
// topic1's schema becomes STRING now
Assert.assertEquals(admin.schemas().getSchemaInfo(topic1).getType(), SchemaType.STRING);
pulsarClient.newConsumer(Schema.BYTES).topic(topic2).subscriptionName("sub").subscribe().close();
try {
// BYTES schema is treated as no schema
admin.schemas().getSchemaInfo(topic2);
fail("The schema of topic2 should not exist");
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 404);
}
pulsarClient.newConsumer(Schema.STRING).topic(topic2).subscriptionName("sub").subscribe().close();
// topic2's schema becomes STRING now.
Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
}
}