Apache RocketMQ Schema Registry

Clone this repo:
  1. e5314c6 Merge pull request #88 from apache/dependabot/maven/storage-jdbc/com.hazelcast-hazelcast-5.3.0 by Xiaojian Sun · 1 year, 6 months ago main
  2. 5557a77 Bump hazelcast from 5.2.3 to 5.3.0 in /storage-jdbc by dependabot[bot] · 1 year, 6 months ago
  3. 5b9304d Merge pull request #85 from apache/dependabot/maven/storage-jdbc/mysql-mysql-connector-java-8.0.28 by Xiaojian Sun · 1 year, 7 months ago
  4. d194cdc Bump mysql-connector-java from 8.0.16 to 8.0.28 in /storage-jdbc by dependabot[bot] · 1 year, 7 months ago
  5. 707ec53 Storage support jdbc #78 (#81) by Xiaojian Sun · 1 year, 7 months ago

RocketMQ Schema Registry

RocketMQ Schema Registry is a Topic Schema's management center. It provides a RESTful interface for register, delete, update, get and reference schema to subject(RocketMQ Topic). By associating Schema with subject, the New RocketMQ client can send structured data directly. User no longer need to care about the details of serialization and deserialization.

Schema struct can change, it will generate a new version number with each update. During schema evolution, the service needs to verify that changes comply with user-specified compatibility configuration. And each Schema Record in the evolution can be individually referenced to other subject.

It offers a variety of features:

  • Handle basic schema management operation including store, query, update, delete
  • Encoding / Decoding capacity by user specified serializer / deserializer in client
  • Compatibility validate in duration of schema evolution or send/receive message
  • Create reference between schema version and a new subject
  • Currently, only the Avro type is supported. Json, PB, and Thrift types will be extended later

Getting started

Prepare storage layer

Currently, Schema registry only supports RocketMQ storage, and relies on the Compact Topic feature of RocketMQ-5.0. Previous RocketMQ versions can also be worked, but there are some risk of data loss if the machine disk fails. DB-type storage layers will be extended in the future.

You can use the existing RocketMQ service or start a new one.

# Download release from the Apache mirror
$ wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip

# Unpack the release
$ unzip rocketmq-all-4.9.3-bin-release.zip

# Prepare a terminal and change to the extracted `bin` directory:
$ cd rocketmq-4.9.3/bin

# Start namesrv & broker
$ nohup sh mqnamesrv &
$ nohup sh mqbroker -n localhost:9876 &

Install & Running locally

$ git clone https://github.com/apache/rocketmq-schema-registry.git
$ cd rocketmq-schema-registry
$ mvn clean package

Take the build JAR in core/target/ and run java -jar rocketmq-schema-registry-core-0.0.3-SNAPSHOT.jar to start service.

Then REST API can be accessed from http://localhost:8080/schema-registry/v1

Swagger API documentation can be accessed from http://localhost:8080/swagger-ui/index.html

Customized configurations (Optional)

Local cache
$ echo "storage.local.cache.path=${dir}" >> storage-rocketmq/src/main/resources/rocketmq.properties
Package management

If you want to upload binary resources to your package repository like artifactory, schema-registry support schema.dependency.upload-enabled = true to enable package management.

Properties details:

PropertyDescription
schema.dependency.jdk-pathThe JDK used when compiling Java files, equal to JAVA_HOME in your server
schema.dependency.compile-pathThe root directory used when compiling Java files
schema.dependency.local-repository-pathThe local cache directory for the Jar package
schema.dependency.repository-urlThe remote repository access url, multiple repository cannot be configured
schema.dependency.usernameThe remote repository access username
schema.dependency.passwordThe remote repository access password

Notice: Please make sure your account has permission to upload to the remote repository.

API Reference


# Register new schema on specified subject with default cluster and tenant $ curl -X POST -H "Content-Type: application/json" \ -d '{"schemaIdl":"{\"type\":\"record\",\"name\":\"SchemaName\",\"namespace\":\"rocketmq.schema.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"}' \ http://localhost:8080/schema-registry/v1/subject/RMQTopic/schema/SchemaName # Register new schema with cluster specified cluster and tenant $ curl -X POST -H "Content-Type: application/json" \ -d '{"schema": "{\"type\": \"string\"}"}' \ http://localhost:8080/schema-registry/v1/cluster/default/tenant/default/subject/RMQTopic2/schema/Text # Delete schema all version $ curl -X DELETE http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema # Update schema and generate a new version, you can also use default cluster and tenant like register interface $ curl -X PUT -H "Content-Type: application/json" \ -d '{"schemaIdl":"{\"type\":\"record\",\"name\":\"SchemaName\",\"namespace\":\"rocketmq.schema.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\",\"default\":\"0\"}]}"}' \ http://localhost:8080/schema-registry/v1/subject/RMQTopic/schema/SchemaName # Get binding schema version by subject with specified cluster and tenant, , you can also use default cluster and tenant like register interface $ curl -X GET http://localhost:8080/schema-registry/v1/subject/RMQTopic/schema # Get schema record by specified version $ curl -X GET http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version} # Get all schema record $ curl -X GET http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions

Compatibility

Default compatibility is BACKWARD, which means that each change of Schema has no impact on the running programs. Transitivity specifies the compatibility check scope for each change.

Compatible strategyPermitted changesTransitivityUpgrade order
BACKWARDDelete fields Add optional fieldsLast versionConsumers
BACKWARD_TRANSITIVEDelete fields Add optional fieldsAll previous versionsConsumers
FORWARDAdd fields Delete optional fieldsLast versionProducers
FORWARD_TRANSITIVEAdd fields Delete optional fieldsAll previous versionsProducers
FULLModify optional fieldsLast versionAny order
FULL_TRANSITIVEModify optional fieldsAll previous versionsAny order
NONEAll changes are acceptedDisabledDepends

Architecture

Once Schema Registry enabled, the process for publishing and subscribing will become to:

  1. During the sending message, the Schema will be parsed and be checked whether it comply with the Topic Schema compatibility. If it passes, producer will serialize data and prefix it with an SchemaId. If the verification fails, the send request will be rejected;
  2. The consumer will deserialize message with SchemaId. And what user sees is always structured data similar to public class User { String name; int id; }

architecture

To reduce the coupling between Schema Registry and RocketMQ, Concept “Subject” were introduced to represent which combination of RocketMQ Topic and Schema.

subject

Concept

  • SchemaId

Service will generate a 64 bit globally unique ID by snowflake algorithm after Schema is created. Its last 14 bits are reserved for the version number. And SchemaId doesn't change during Schema lifetime.

  • SchemaVersion

Each schema update doesn't change SchemaId, but generates a new 14 bit monotonically increasing version number.

  • RecordId

The client uses RecordId that can uniquely locate a schema version to serialize and deserialize the message.

id

Contribute

We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see here.

License

Apache License, Version 2.0 Copyright (C) Apache Software Foundation