blob: f6ca3079e405de19602e9c1aff2ba0913d85fd32 [file] [log] [blame] [view]
# Serialization
The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a `map` operation in the Apache Pekko Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Apache Pekko Stream, it is easier to implement the desired error handling strategy as the examples below show.
## Protocol buffers
[Protocol Buffers](https://developers.google.com/protocol-buffers) offer a language-neutral, platform-neutral, extensible mechanism for serializing structured data and allow consumers and producers to rely on the message format.
The easiest way to use Protocol Buffers with Apache Pekko Connectors Kafka is to serialize and deserialize the Kafka message payload as a byte array and call the Protocol Buffers serialization and deserialization in a regular `map` operator. To serialize the Protobuf-defined type `Order` into a byte array use the `.toByteArray()` method which gets generated by the Protobuf compiler.
Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SerializationSpec.scala) { #protobuf-imports #protobuf-serializer }
Java
: @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #protobuf-imports #protobuf-serializer }
To de-serialize a Protocol Buffers message in a `map` operator, convert the received byte array to the designated type with the generated `parseFrom()` method.
This example uses resuming to react on data which can't be deserialized and ignores faulty elements.
Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SerializationSpec.scala) { #protobuf-imports #protobuf-deserializer }
Java
: @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #protobuf-imports #protobuf-deserializer }
## Jackson JSON
Serializing data to JSON text with [Jackson](https://github.com/FasterXML/jackson) in a `map` operator will turn the object instance into a String which is used as value in the @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord).
Java
: @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #jackson-imports #jackson-serializer }
To de-serialize a JSON String with Jackson in a `map` operator, extract the String and apply the Jackson object reader in a `map` operator. Amend the `map` operator with the extracted type as the object reader is not generic.
This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements.
Java
: @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #jackson-imports #jackson-deserializer }
## Spray JSON
To de-serialize a JSON String with [Spray JSON](https://github.com/spray/spray-json) in a `map` operator, extract the String and use the Spray-provided implicits `parseJson` and `convertTo` in a `map` operator.
This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements.
Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SerializationSpec.scala) { #spray-imports #spray-deser }
## Avro with Schema Registry
If you want to use [Confluent's Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html), you need to include the dependency on `kafka-avro-serializer` as shown below. It is not available from Maven Central, that's why Confluent's repository has to be specified. These examples use `kafka-avro-seriazlizer` version $confluent.version$.
Maven
: ```xml
<project>
...
<dependencies>
...
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>confluent.version (eg. 8.0.0)</version>
</dependency>
...
</dependencies>
...
<repositories>
<repository>
<id>confluent-maven-repo</id>
<name>Confluent Maven Repository</name>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
...
</project>
```
sbt
: ```scala
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 8.0.0
resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/",
```
Gradle
: ```gradle
dependencies {
compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 8.0.0
}
repositories {
maven {
url "https://packages.confluent.io/maven/"
}
}
```
### Producer
To create serializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and that serializer is used in the @apidoc[ProducerSettings$].
Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #serializer }
Java
: @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SchemaRegistrySerializationTest.java) { #imports #serializer }
### Consumer
To create deserializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$].
Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #de-serializer }
Java
: @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SchemaRegistrySerializationTest.java) { #imports #de-serializer }