| # PIP-420: Provides an ability for Pulsar clients to integrate with third-party schema registry service |
| |
| # Motivation |
| |
| Apache Pulsar currently provides a built-in schema management system tightly coupled with the broker. |
| Pulsar clients interact with this system implicitly when creating producers and consumers. |
| |
| However, many organizations already have independent schema registry services (such as Confluent Schema Registry) |
| and wish to reuse their existing schema governance processes across multiple messaging systems, including Pulsar. |
| |
| By enabling Pulsar clients to integrate with third-party schema registry services: |
| - Users can unify schema management across different platforms. |
| - Pulsar brokers can be decoupled from schema storage and validation responsibilities. |
| - Pulsar users can integrate with ecosystems that rely on external schema registries easier. |
| |
| This flexibility is particularly valuable for enterprises with strict schema validation, versioning, |
| and governance workflows already centralized in external registries. |
| |
| # Goals |
| |
| ## In Scope |
| |
| - Provide the ability for Pulsar clients to leverage third-party schema registry services for schema operations. |
| |
| ## Out Scope |
| |
| - Providing built-in implementations for third-party schemas. |
| - Support `AutoProduceBytesSchema` and `AutoConsumeSchema`. |
| - Migrating existing Pulsar-managed schemas to external schema registries. |
| |
| # High Level Design |
| |
| - Provide a mechanism to configure the Pulsar client to use either: |
| - The existing Pulsar schema registry (default) |
| - Third-party schema registry implementations |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| |
| This PIP aims to enable the Pulsar client to directly integrate with external schema registry services for schema management. |
| In this model, the external schema registry is fully responsible for schema storage, retrieval, and validation. |
| The Pulsar broker will no longer manage schema data for topics using external schemas. |
| |
| ### SchemaType: EXTERNAL |
| |
| Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**. |
| |
| - All schemas that integrate with external schema registries must declare `SchemaType.EXTERNAL`. |
| - When using `EXTERNAL` schema type, the Pulsar client will provide empty schema data to the broker. |
| - The broker will only record the schema type for topics. |
| - Compatibility restrictions: |
| - Introduce a new compatibility check in broker side. |
| - The schema type `SchemaType.EXTERNAL` can't be compatible with other Pulsar schemas |
| - This prevents accidental data corruption or schema conflicts between internal and external schema management systems. |
| - Pulsar Geo replicator needs to transfer the external schema info to the remote clusters. |
| |
| This design isolates external schema management and protects existing topics using Pulsar’s native schema system. |
| |
| ### Extensibility via Client Interfaces |
| |
| To integrate with external schema registries, users can: |
| - Implement the `Schema` interface to define custom schema encoding and decoding logic. |
| |
| #### Key `Schema` Interface Methods: |
| |
| Schema interface introduces methods for encoding and decoding messages, |
| allowing external schema implementations to handle serialization and deserialization. |
| |
| - EncodeData encode(String topic, T message) **(New addition)** |
| - Serializes the message using the external schema. |
| - The encode method will be responsible for managing schema evolution and versioning. |
| - The method returns an `EncodeData` object that contains: |
| - The encoded byte array. |
| - The schema ID associated with the serialized data. |
| - Implementations should throw `SchemaSerializationException` if the serialization or deserialization fails. |
| |
| - T decode(String topic, byte[] data, byte[] schemaId) **(New addition)** |
| - Deserialize the message using the external schema. |
| - The external schema can retrieve the schema by the schema ID. |
| - Users should handle exceptions when get value by themselves. |
| |
| - closeAsync() **(New addition)** |
| - Called when the producer or consumer is closed. |
| - Allows external schema implementations to release resources. |
| |
| #### Example Workflow: |
| |
| - During producer or consumer initialization: |
| The external schema info will be registered to Pulsar schema storage. |
| |
| - During producing or receiving messages: |
| The `encode` and `decode` methods handle the schema-aware serialization and deserialization using the external schema registry. |
| |
| #### Schema ID & Schema Version |
| |
| Unlike Pulsar, which uses **schema version** to identify schemas, many external schema registry systems use **schema ID** as the primary schema identifier. |
| |
| When integrating with external schema registries: |
| - The `schemaVersion` will point to external schema info. |
| - The external schema will be responsible for managing schema evolution and versioning, this is different from Pulsar's native schema versioning. |
| - The schema encode method will return an `EncodeData` object that contains the encoded data and the schema ID. |
| - For store the external schema ID, this PIP introduces a new optional field `schema_id` in the `MessageMetadata`. |
| - The KeyValueSchema doesn't support using Pulsar's native schema and external schema at the same time. |
| |
| The KeyValueSchemaID is also a byte array, the format is: keySchemaIdLength(4) + keySchemaId + valueSchemaIdLength(4) + valueSchemaId, |
| external schemas need to decode the key and value schema IDs from the KeyValueSchemaID. |
| |
| This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism. |
| This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL; |
| they need to be updated to support setting schema properties, using the new schema type and handle external schemas appropriately. |
| |
| #### Example usage |
| |
| ```java |
| public void workWithExternalSchemaRegistry() throws Exception { |
| String topic = "testExternalJsonSchema"; |
| |
| Map<String, String> configs = new HashedMap<>(); |
| configs.put("schema.registry.url", getSchemaRegistryUrl()); |
| configs.put("json.fail.unknown.properties", "false"); |
| KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(configs); |
| Schema<User> schema = kafkaSchemaFactory.json(User.class); |
| |
| @Cleanup |
| PulsarClient pulsarClient = getPulsarClient(); |
| |
| @Cleanup |
| Producer<User> producer = pulsarClient.newProducer(schema) |
| .topic(topic) |
| .create(); |
| |
| @Cleanup |
| Consumer<User> consumer = pulsarClient.newConsumer(schema) |
| .topic(topic) |
| .subscriptionName("sub") |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .subscribe(); |
| |
| for (int i = 0; i < 10; i++) { |
| producer.send(new User("name-" + i, 10 + i)); |
| } |
| |
| for (int i = 0; i < 10; i++) { |
| Message<User> message = consumer.receive(); |
| consumer.acknowledge(message); |
| System.out.println("receive msg " + message.getValue().getClass().getName() + " " + message.getValue()); |
| } |
| } |
| ``` |
| |
| ## Public-facing Changes |
| |
| ```protobuf |
| // File `SchemaRegistryFormat.proto` |
| message SchemaInfo { |
| enum SchemaType { |
| EXTERNAL = 22; |
| } |
| } |
| ``` |
| |
| ```protobuf |
| // File `PulsarApi.proto` |
| message Schema { |
| enum Type { |
| External = 22; |
| } |
| } |
| ``` |
| |
| Add a new field `schema_id` to the `MessageMetadata` to store the schema ID for messages that use external schemas. |
| ```protobuf |
| // File `PulsarApi.proto` |
| message MessageMetadata { |
| optional bytes schema_id = 32; |
| } |
| ``` |
| |
| Introduce a new SchemaType `EXTERNAL` to represent the schema types that work with external schema registry. |
| ```java |
| public enum SchemaType { |
| |
| /** |
| * External Schema Type. |
| * <p> |
| * This is used to indicate that the schema is managed externally, such as in a schema registry. |
| * External schema type is not compatible with any other schema type. |
| * </p> |
| */ |
| EXTERNAL(21) |
| |
| } |
| ``` |
| |
| Add a new class `EncodeData` to encapsulate the encoded data and schema ID. |
| ```java |
| public class EncodeData { |
| |
| private byte[] data; |
| |
| private byte[] schemaId; |
| |
| } |
| ``` |
| |
| Add a new method `getSchemaId()` to the `Message` interface to retrieve the schema ID of the message. |
| This method will return the schema ID if the message is produced with an external schema, otherwise it will return null. |
| ```java |
| public interface Message<T> { |
| |
| /** |
| * Get schema ID of the message. |
| * PIP-420 provides a way to produce messages with external schema, |
| * and the schema ID will be set to the message metadata. |
| * |
| * @return the schema ID if the message is produced with external schema and schema ID is set, otherwise empty. |
| */ |
| Optional<byte[]> getSchemaId(); |
| |
| } |
| ``` |
| |
| Add two methods to encode and decode messages, which can be used by external schemas to serialize and deserialize messages. |
| The encode method returns an `EncodeData` object that contains the encoded byte array and schema ID. |
| The customized external schemas can set the `SchemaInfoProvider` and retrieve the configs from it, |
| extends the interface `AutoCloseable` to support close external schema resources. |
| |
| ```java |
| import java.util.concurrent.CompletableFuture; |
| |
| public interface Schema extends Cloneable { |
| |
| /** |
| * Encodes the message into a byte array using the schema. |
| * |
| * @param topic the topic for which the message is being encoded |
| * @param message the message to encode |
| * @return the encoded byte array and schema ID |
| * @throws SchemaSerializationException if the encoding fails |
| */ |
| default EncodeData encode(String topic, T message) { |
| return new EncodeData(encode(message), null); |
| } |
| |
| default T decode(String topic, ByteBuffer data, byte[] schemaId) { |
| return decode(topic, getBytes(data), schemaId); |
| } |
| |
| /** |
| * Decodes a byte array into a message using the schema. |
| * |
| * @param topic the topic for which the message is being decoded |
| * @param data the byte array to decode |
| * @param schemaId the schema ID associated with the data |
| * @return the decoded message |
| * @throws SchemaSerializationException if the decoding fails |
| */ |
| default T decode(String topic, byte[] data, byte[] schemaId) { |
| return decode(data, schemaId); |
| } |
| |
| default void closeAsync() { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| } |
| ``` |
| |
| # Pulsar Function |
| |
| For support using third-party schema registry service in Pulsar Function, |
| - Support the `SchemaType.EXTERNAL` schema type in Pulsar Function |
| |
| # Pulsar-GEO replication impact |
| |
| Integrating third-party schema registry services introduces a new approach to managing schemas for geo-replicated topics. |
| |
| In the current Pulsar architecture: |
| - Schema definitions are stored and managed by the Pulsar brokers. |
| - During geo-replication, schema information must also be replicated across clusters to ensure schema consistency. |
| |
| By using an external schema registry: |
| - **Schema management is fully decoupled from Pulsar brokers and replication mechanisms.** |
| - This eliminates the need for synchronizing schema data between Pulsar clusters, simplifying geo-replication processes. |
| - Supports a unified schema registry for cross-cluster producers and consumers |
| |
| # Backward & Forward Compatibility |
| |
| The new schema type `SchemaType.EXTERNAL` doesn't break any existing Pulsar topics, it's not compatible with other Pulsar schema types. |
| |
| # Alternatives |
| |
| Use `bytes` schema for "external" schemas, it can't provide any compatibility checks to protect topic data that use Pulsar's native schema system. |
| |
| # General Notes |
| |
| # Links |
| |
| <!-- |
| Updated afterwards |
| --> |
| * Mailing List voting thread: https://lists.apache.org/thread/g7hypmql3gk2zog6cmmhg4h93hfw1o15 |