blob: d95f909e5511b4046fe8108ac2e228a9245c1756 [file] [log] [blame] [view]
---
id: schema-get-started
title: Get started
sidebar_label: "Get started"
description: Get started to construct Pulsar schemas and customize Pulsar schema storage.
---
````mdx-code-block
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
````
This hands-on tutorial provides instructions and examples on how to construct schemas. For instructions on administrative tasks, see [Manage schema](admin-api-schemas.md).
## Construct a schema
### bytes
This example demonstrates how to construct a [bytes schema](schema-understand.md#primitive-type) using language-specific clients and use it to produce and consume messages.
````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
<TabItem value="Java">
```java
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("my-topic")
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
producer.newMessage().value("message".getBytes()).send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
```
</TabItem>
<TabItem value="C++">
```cpp
SchemaInfo schemaInfo = SchemaInfo(SchemaType::BYTES, "Bytes", "");
Producer producer;
client.createProducer("topic-bytes", ProducerConfiguration().setSchema(schemaInfo), producer);
std::array<char, 1024> buffer;
producer.send(MessageBuilder().setContent(buffer.data(), buffer.size()).build());
Consumer consumer;
res = client.subscribe("topic-bytes", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg, 3000);
```
</TabItem>
<TabItem value="Python">
```python
producer = client.create_producer(
'bytes-schema-topic',
schema=BytesSchema())
producer.send(b"Hello")
consumer = client.subscribe(
'bytes-schema-topic',
'sub',
schema=BytesSchema())
msg = consumer.receive()
data = msg.value()
```
</TabItem>
<TabItem value="Go">
```go
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewBytesSchema(nil),
})
id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: []byte("message"),
})
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewBytesSchema(nil),
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
```
</TabItem>
</Tabs>
````
### string
This example demonstrates how to construct a [string schema](schema-understand.md#primitive-type) using language-specific clients and use it to produce and consume messages.
````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
<TabItem value="Java">
```java
Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
Message<String> message = consumer.receive();
```
</TabItem>
<TabItem value="C++">
```cpp
SchemaInfo schemaInfo = SchemaInfo(SchemaType::STRING, "String", "");
Producer producer;
client.createProducer("topic-string", ProducerConfiguration().setSchema(schemaInfo), producer);
producer.send(MessageBuilder().setContent("message").build());
Consumer consumer;
client.subscribe("topic-string", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg, 3000);
```
</TabItem>
<TabItem value="Python">
```python
producer = client.create_producer(
'string-schema-topic',
schema=StringSchema())
producer.send("Hello")
consumer = client.subscribe(
'string-schema-topic',
'sub',
schema=StringSchema())
msg = consumer.receive()
str = msg.value()
```
</TabItem>
<TabItem value="Go">
```go
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewStringSchema(nil),
})
id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: "message",
})
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewStringSchema(nil),
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
msg, err := consumer.Receive(context.Background())
```
</TabItem>
</Tabs>
````
### key/value
This example shows how to construct a [key/value schema](schema-understand.md#keyvalue-schema) using language-specific clients and use it to produce and consume messages.
````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
<TabItem value="Java">
1. Construct a key/value schema with `INLINE` encoding type.
```java
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.INLINE
);
```
Alternatively, construct a key/value schema with `SEPARATED` encoding type.
```java
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
```
2. Produce messages using a key/value schema.
```java
Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
.topic(topicName)
.create();
final int key = 100;
final String value = "value-100";
// send the key/value message
producer.newMessage()
.value(new KeyValue(key, value))
.send();
```
3. Consume messages using a key/value schema.
```java
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
...
.topic(topicName)
.subscriptionName(subscriptionName).subscribe();
// receive key/value pair
Message<KeyValue<Integer, String>> msg = consumer.receive();
KeyValue<Integer, String> kv = msg.getValue();
```
</TabItem>
<TabItem value="C++">
1. Construct a key/value schema with `INLINE` encoding type.
```cpp
//Prepare keyValue schema
std::string jsonSchema =
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo keySchema(JSON, "key-json", jsonSchema);
SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
```
2. Produce messages using a key/value schema.
```cpp
//Create Producer
Producer producer;
client.createProducer("my-topic", ProducerConfiguration().setSchema(keyValueSchema), producer);
//Prepare message
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
KeyValue keyValue(std::move(jsonData), std::move(jsonData));
Message msg = MessageBuilder().setContent(keyValue).setProperty("x", "1").build();
//Send message
producer.send(msg);
```
3. Consume messages using a key/value schema.
```cpp
//Create Consumer
Consumer consumer;
client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(keyValueSchema), consumer);
//Receive message
Message message;
consumer.receive(message);
```
</TabItem>
</Tabs>
````
### Avro
````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
<TabItem value="Java">
Suppose you have a `SensorReading` class as follows, and you'd like to transmit it over a Pulsar topic.
```java
public class SensorReading {
public float temperature;
public SensorReading(float temperature) {
this.temperature = temperature;
}
// A no-arg constructor is required
public SensorReading() {
}
public float getTemperature() {
return temperature;
}
public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
```
Create a `Producer<SensorReading>` (or `Consumer<SensorReading>`) like this:
```java
Producer<SensorReading> producer = client.newProducer(AvroSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
```
</TabItem>
<TabItem value="C++">
```cpp
// Send messages
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
client.createProducer("topic-avro", producerConf, producer);
// Receive messages
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
ConsumerConfiguration consumerConf;
Consumer consumer;
consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
```
</TabItem>
<TabItem value="Python">
You can declare an `AvroSchema` using Python through one of the following methods.
**Method 1: Record**
Declare an `AvroSchema` by passing a class that inherits from `pulsar.schema.Record` and defines the fields as class variables.
```python
class Example(Record):
a = Integer()
b = Integer()
producer = client.create_producer(
'avro-schema-topic',
schema=AvroSchema(Example))
r = Example(a=1, b=2)
producer.send(r)
consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=AvroSchema(Example))
msg = consumer.receive()
e = msg.value()
```
**Method 2: JSON definition**
1. Declare an `AvroSchema` using JSON. In this case, Avro schemas are defined using JSON.
Below is an example of `AvroSchema` defined using a JSON file (`company.avsc`).
```json
{
"doc": "this is doc",
"namespace": "example.avro",
"type": "record",
"name": "Company",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "address", "type": ["null", "string"]},
{"name": "employees", "type": ["null", {"type": "array", "items": {
"type": "record",
"name": "Employee",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "int"]}
]
}}]},
{"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
]
}
```
2. Load a schema definition from a file by using [`avro.schema`](https://avro.apache.org/docs/current/getting-started-python/) or [`fastavro.schema`](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema).
If you use the [JSON definition](#method-2-json-definition) method to declare an `AvroSchema`, you need to:
- Use [Python dict](https://developers.google.com/edu/python/dict-files) to produce and consume messages, which is different from using the [Record](#method-1-record) method.
- Set the value of the `_record_cls` parameter to `None` when generating an `AvroSchema` object.
**Example**
```python
from fastavro.schema import load_schema
from pulsar.schema import *
schema_definition = load_schema("examples/company.avsc")
avro_schema = AvroSchema(None, schema_definition=schema_definition)
producer = client.create_producer(
topic=topic,
schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)
company = {
"name": "company-name" + str(i),
"address": 'xxx road xxx street ' + str(i),
"employees": [
{"name": "user" + str(i), "age": 20 + i},
{"name": "user" + str(i), "age": 30 + i},
{"name": "user" + str(i), "age": 35 + i},
],
"labels": {
"industry": "software" + str(i),
"scale": ">100",
"funds": "1000000.0"
}
}
producer.send(company)
msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
```
</TabItem>
<TabItem value="Go">
Suppose you have an `avroExampleStruct` class as follows, and you'd like to transmit it over a Pulsar topic.
```go
type avroExampleStruct struct {
ID int
Name string
}
```
1. Add an `avroSchemaDef` like this:
```go
var (
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
)
```
2. Create producer and consumer to send/receive messages:
```go
//Create producer and send message
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewAvroSchema(exampleSchemaDef, nil),
})
msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: avroExampleStruct{
ID: 10,
Name: "avroExampleStruct",
},
})
//Create Consumer and receive message
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewAvroSchema(exampleSchemaDef, nil),
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
message, err := consumer.Receive(context.Background())
```
</TabItem>
</Tabs>
````
### JSON
````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
<TabItem value="Java">
Similar to using `AvroSchema`, you can declare a `JsonSchema` by passing a class. The only difference is to use `JsonSchema` instead of `AvroSchema` when defining the schema type, as shown below. For how to use `AvroSchema` via record, see [Method 1 - Record](#method-1-record).
```java
static class SchemaDemo {
public String name;
public int age;
}
Producer<SchemaDemo> producer = pulsarClient.newProducer(Schema.JSON(SchemaDemo.class))
.topic("my-topic")
.create();
Consumer<SchemaDemo> consumer = pulsarClient.newConsumer(Schema.JSON(SchemaDemo.class))
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
SchemaDemo schemaDemo = new SchemaDemo();
schemaDemo.name = "puslar";
schemaDemo.age = 20;
producer.newMessage().value(schemaDemo).send();
Message<SchemaDemo> message = consumer.receive(5, TimeUnit.SECONDS);
```
</TabItem>
<TabItem value="C++">
To declare a `JSON` schema using C++, do the following:
1. Pass a JSON string like this:
```cpp
Std::string jsonSchema = R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo schemaInfo = SchemaInfo(JSON, "JSON", jsonSchema);
```
2. Create a producer and use it to send messages.
```cpp
client.createProducer("my-topic", ProducerConfiguration().setSchema(schemaInfo), producer);
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
producer.send(MessageBuilder().setContent(std::move(jsonData)).build());
```
3. Create consumer and receive message.
```cpp
Consumer consumer;
client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg);
```
</TabItem>
<TabItem value="Python">
You can declare a `JsonSchema` by passing a class that inherits from `pulsar.schema.Record` and defines the fields as class variables. This is similar to using `AvroSchema`. The only difference is to use `JsonSchema` instead of `AvroSchema` when defining schema type, as shown below. For how to use `AvroSchema` via record, see [#method-1-record).
```python
producer = client.create_producer(
'avro-schema-topic',
schema=JsonSchema(Example))
consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=JsonSchema(Example))
```
</TabItem>
<TabItem value="Go">
Suppose you have an `avroExampleStruct` class as follows, and you'd like to transmit it as JSON form over a Pulsar topic.
```go
type jsonExampleStruct struct {
ID int `json:"id"`
Name string `json:"name"`
}
```
1. Add a `jsonSchemaDef` like this:
```go
jsonSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
```
2. Create a producer/consumer to send/receive messages:
```go
//Create producer and send message
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Schema: pulsar.NewJSONSchema(jsonSchemaDef, nil),
})
msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: jsonExampleStruct{
ID: 10,
Name: "jsonExampleStruct",
},
})
//Create Consumer and receive message
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
Schema: pulsar.NewJSONSchema(jsonSchemaDef, nil),
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
message, err := consumer.Receive(context.Background())
```
</TabItem>
</Tabs>
````
### ProtobufNative
````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
<TabItem value="Java">
The following example shows how to create a producer/consumer with a ProtobufNative schema using Java.
1. Generate the `DemoMessage` class using Protobuf3 or later versions.
```protobuf
syntax = "proto3";
message DemoMessage {
string stringField = 1;
double doubleField = 2;
int32 intField = 6;
TestEnum testEnum = 4;
SubMessage nestedField = 5;
repeated string repeatedField = 10;
proto.external.ExternalMessage externalMessage = 11;
}
```
2. Create a producer/consumer to send/receive messages.
```java
Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
.topic("my-topic")
.create();
Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
SchemaDemo schemaDemo = new SchemaDemo();
schemaDemo.name = "puslar";
schemaDemo.age = 20;
producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
.setIntField(1).build()).send();
Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);
```
</TabItem>
<TabItem value="C++">
The following example shows how to create a producer/consumer with a ProtobufNative schema.
1. Generate the `User` class using Protobuf3 or later versions.
```protobuf
syntax = "proto3";
message User {
string name = 1;
int32 age = 2;
}
```
2. Include the `ProtobufNativeSchema.h` in your source code. Ensure the Protobuf dependency has been added to your project.
```cpp
#include <pulsar/ProtobufNativeSchema.h>
```
3. Create a producer to send a `User` instance.
```cpp
ProducerConfiguration producerConf;
producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
Producer producer;
client.createProducer("topic-protobuf", producerConf, producer);
User user;
user.set_name("my-name");
user.set_age(10);
std::string content;
user.SerializeToString(&content);
producer.send(MessageBuilder().setContent(content).build());
```
4. Create a consumer to receive a `User` instance.
```cpp
ConsumerConfiguration consumerConf;
consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor()));
consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
Consumer consumer;
client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer);
Message msg;
consumer.receive(msg);
User user2;
user2.ParseFromArray(msg.getData(), msg.getLength());
```
</TabItem>
</Tabs>
````
### Protobuf
````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Go","value":"Go"}]}>
<TabItem value="Java">
Constructing a protobuf schema using Java is similar to constructing a `ProtobufNative` schema. The only difference is to use `PROTOBUF` instead of `PROTOBUF_NATIVE` when defining schema type as shown below.
1. Generate the `DemoMessage` class using Protobuf3 or later versions.
```protobuf
syntax = "proto3";
message DemoMessage {
string stringField = 1;
double doubleField = 2;
int32 intField = 6;
TestEnum testEnum = 4;
SubMessage nestedField = 5;
repeated string repeatedField = 10;
proto.external.ExternalMessage externalMessage = 11;
}
```
2. Create a producer/consumer to send/receive messages.
```java
Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF(DemoMessage.class))
.topic("my-topic")
.create();
Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF(DemoMessage.class))
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
SchemaDemo schemaDemo = new SchemaDemo();
schemaDemo.name = "puslar";
schemaDemo.age = 20;
producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
.setIntField(1).build()).send();
Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);
```
</TabItem>
<TabItem value="C++">
Constructing a protobuf schema using C++ is similar to that using `JSON`. The only difference is to use `PROTOBUF` instead of `JSON` when defining the schema type as shown below.
```cpp
std::string jsonSchema =
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
SchemaInfo schemaInfo = SchemaInfo(pulsar::PROTOBUF, "PROTOBUF", jsonSchema);
```
1. Create a producer to send messages.
```cpp
Producer producer;
client.createProducer("my-topic", ProducerConfiguration().setSchema(schemaInfo), producer);
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
producer.send(MessageBuilder().setContent(std::move(jsonData)).build());
```
2. Create a consumer to receive messages.
```cpp
Consumer consumer;
client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
Message msg;
consumer.receive(msg);
```
</TabItem>
<TabItem value="Go">
Suppose you have a `protobufDemo` class as follows, and you'd like to transmit it in JSON form over a Pulsar topic.
```go
type protobufDemo struct {
Num int32 `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
Msf string `protobuf:"bytes,2,opt,name=msf,proto3" json:"msf,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
```
1. Add a `protoSchemaDef` like this:
```go
var (
protoSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"msf\",\"type\":\"string\"}]}"
)
```
2. Create a producer/consumer to send/receive messages:
```go
psProducer := pulsar.NewProtoSchema(protoSchemaDef, nil)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "proto",
Schema: psProducer,
})
msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &protobufDemo{
Num: 100,
Msf: "pulsar",
},
})
psConsumer := pulsar.NewProtoSchema(protoSchemaDef, nil)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "proto",
SubscriptionName: "sub-1",
Schema: psConsumer,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
msg, err := consumer.Receive(context.Background())
```
</TabItem>
</Tabs>
````
### Native Avro
This example shows how to construct a [native Avro schema](schema-understand.md#struct-schema).
```java
org.apache.avro.Schema nativeAvroSchema = … ;
Producer<byte[]> producer = pulsarClient.newProducer().topic("ingress").create();
byte[] content = … ;
producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();
```
### AUTO_PRODUCE
Suppose you have a Pulsar topic _P_, a producer processing messages from a Kafka topic _K_, an application reading the messages from _K_ and writing the messages to _P_.
This example shows how to construct an [AUTO_PRODUCE](schema-understand.md#auto-schema) schema to verify whether the bytes produced by _K_ can be sent to _P_.
```java
Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES())
.create();
byte[] kafkaMessageBytes = … ;
pulsarProducer.produce(kafkaMessageBytes);
```
### AUTO_CONSUME
Suppose you have a Pulsar topic _P_ and a consumer _MySQL_ that receives messages from _P_, and you want to check if these messages have the information that your application needs to count.
This example shows how to construct an [AUTO_CONSUME schema](schema-understand.md#auto-schema) to verify whether the bytes produced by _P_ can be sent to _MySQL_.
```java
Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
.subscribe();
Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();
record.getFields().forEach((field -> {
if (field.getName().equals("theNeedFieldName")) {
Object recordField = record.getField(field);
//Do some things
}
}));
```
## Customize schema storage
By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. Alternatively, you can use another storage system if needed.
To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces before [deploying custom schema storage](#deploy-custom-schema-storage):
* [SchemaStorage interface](#implement-schemastorage-interface)
* [SchemaStorageFactory interface](#implement-schemastoragefactory-interface)
### Implement `SchemaStorage` interface
The `SchemaStorage` interface has the following methods:
```java
public interface SchemaStorage {
// How schemas are updated
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
// How schemas are fetched from storage
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
// How schemas are deleted
CompletableFuture<SchemaVersion> delete(String key);
// Utility method for converting a schema version byte array to a SchemaVersion object
SchemaVersion versionFromBytes(byte[] version);
// Startup behavior for the schema storage client
void start() throws Exception;
// Shutdown behavior for the schema storage client
void close() throws Exception;
}
```
:::tip
For a complete example of **schema storage** implementation, see the [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class.
:::
### Implement `SchemaStorageFactory` interface
The `SchemaStorageFactory` interface has the following method:
```java
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
```
:::tip
For a complete example of **schema storage factory** implementation, see the [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class.
:::
### Deploy custom schema storage
To use your custom schema storage implementation, perform the following steps.
1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file.
2. Add the JAR file to the `lib` folder in your Pulsar binary or source distribution.
3. Change the `schemaRegistryStorageClassName` configuration in the `conf/broker.conf` file to your custom factory class.
4. Start Pulsar.