blob: 61d83322bab579f179ab701cdc6358097b652640 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.schema.compatibility;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.Schemas;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "schema")
public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
private final static String CLUSTER_NAME = "test";
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
// Setup namespaces
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfo tenantInfo = TenantInfo.builder()
.allowedClusters(Collections.singleton(CLUSTER_NAME))
.build();
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}
@DataProvider(name = "CanReadLastSchemaCompatibilityStrategy")
public Object[] canReadLastSchemaCompatibilityStrategy() {
return new Object[] {
SchemaCompatibilityStrategy.BACKWARD,
SchemaCompatibilityStrategy.FORWARD_TRANSITIVE,
SchemaCompatibilityStrategy.FORWARD,
SchemaCompatibilityStrategy.FULL
};
}
@DataProvider(name = "ReadAllCheckSchemaCompatibilityStrategy")
public Object[] readAllCheckSchemaCompatibilityStrategy() {
return new Object[] {
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE,
SchemaCompatibilityStrategy.FULL_TRANSITIVE
};
}
@DataProvider(name = "AllCheckSchemaCompatibilityStrategy")
public Object[] allCheckSchemaCompatibilityStrategy() {
return new Object[] {
SchemaCompatibilityStrategy.BACKWARD,
SchemaCompatibilityStrategy.FORWARD_TRANSITIVE,
SchemaCompatibilityStrategy.FORWARD,
SchemaCompatibilityStrategy.FULL,
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE,
SchemaCompatibilityStrategy.FULL_TRANSITIVE
};
}
@Test(dataProvider = "CanReadLastSchemaCompatibilityStrategy")
public void testConsumerCompatibilityCheckCanReadLastTest(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
admin.schemas().createSchema(fqtn, Schema.AVRO(SchemaDefinition.builder()
.withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
Consumer<Schemas.PersonThree> consumerThree = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonThree>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonThree.class).build()))
.subscriptionName("test")
.topic(fqtn)
.subscribe();
Producer<Schemas.PersonOne> producerOne = pulsarClient
.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(fqtn)
.create();
Schemas.PersonOne personOne = new Schemas.PersonOne();
personOne.setId(1);
producerOne.send(personOne);
Message<Schemas.PersonThree> message = null;
try {
message = consumerThree.receive();
message.getValue();
} catch (Exception e) {
Assert.assertTrue(e instanceof SchemaSerializationException);
consumerThree.acknowledge(message);
}
Producer<Schemas.PersonTwo> producerTwo = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtn)
.create();
Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Jerry");
producerTwo.send(personTwo);
message = consumerThree.receive();
Schemas.PersonThree personThree = message.getValue();
consumerThree.acknowledge(message);
assertEquals(personThree.getId(), 1);
assertEquals(personThree.getName(), "Jerry");
consumerThree.close();
producerOne.close();
producerTwo.close();
}
@Test(dataProvider = "ReadAllCheckSchemaCompatibilityStrategy")
public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
admin.schemas().createSchema(fqtn, Schema.AVRO(SchemaDefinition.builder()
.withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
try {
pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonThree>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonThree.class).build()))
.subscriptionName("test")
.topic(fqtn)
.subscribe();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Unable to read schema"));
}
}
@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtn);
try {
producerThreeBuilder.create();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
}
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName("test")
.topic(fqtn);
Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();
producer.send(new Schemas.PersonTwo(2, "Lucy"));
Message<Schemas.PersonTwo> message = consumerTwo.receive();
Schemas.PersonTwo personTwo = message.getValue();
consumerTwo.acknowledge(message);
assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");
producer.close();
consumerTwo.close();
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
producer = producerThreeBuilder.create();
consumerTwo = comsumerBuilder.subscribe();
producer.send(new Schemas.PersonTwo(2, "Lucy"));
message = consumerTwo.receive();
personTwo = message.getValue();
consumerTwo.acknowledge(message);
assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");
consumerTwo.close();
producer.close();
}
@Test
public void testSchemaComparison() throws Exception {
final String tenant = PUBLIC_TENANT;
final String topic = "test-schema-comparison";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);
byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
.getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes();
SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
admin.schemas().createSchema(fqtn, schemaInfo);
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
ProducerBuilder<Schemas.PersonOne> producerOneBuilder = pulsarClient
.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(fqtn);
producerOneBuilder.create().close();
assertEquals(changeSchemaBytes, admin.schemas().getSchemaInfo(fqtn).getSchema());
ProducerBuilder<Schemas.PersonThree> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(Schemas.PersonThree.class))
.topic(fqtn);
try {
producerThreeBuilder.create();
fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
}
}
@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
admin.schemas().createSchema(fqtn, Schema.AVRO(SchemaDefinition.builder()
.withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
Producer<Schemas.PersonOne> producerOne = pulsarClient
.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(fqtn)
.create();
Schemas.PersonOne personOne = new Schemas.PersonOne(10);
Consumer<Schemas.PersonOne> consumerOne = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonOne.class).build()))
.subscriptionName("test")
.topic(fqtn)
.subscribe();
producerOne.send(personOne);
Message<Schemas.PersonOne> message = consumerOne.receive();
personOne = message.getValue();
assertEquals(personOne.getId(), 10);
consumerOne.close();
producerOne.close();
}
@Test(dataProvider = "CanReadLastSchemaCompatibilityStrategy")
public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
admin.schemas().createSchema(fqtn, Schema.AVRO(SchemaDefinition.builder()
.withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
try {
pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonFour>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonFour.class).build()))
.subscriptionName("test")
.topic(fqtn)
.subscribe();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Unable to read schema"));
}
}
public static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numChars; i++) {
sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
}
return sb.toString();
}
}