| # Serialization |
| |
| The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a `map` operation in the Apache Pekko Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Apache Pekko Stream, it is easier to implement the desired error handling strategy as the examples below show. |
| |
| |
| ## Protocol buffers |
| |
| [Protocol Buffers](https://developers.google.com/protocol-buffers) offer a language-neutral, platform-neutral, extensible mechanism for serializing structured data and allow consumers and producers to rely on the message format. |
| |
| The easiest way to use Protocol Buffers with Apache Pekko Connectors Kafka is to serialize and deserialize the Kafka message payload as a byte array and call the Protocol Buffers serialization and deserialization in a regular `map` operator. To serialize the Protobuf-defined type `Order` into a byte array use the `.toByteArray()` method which gets generated by the Protobuf compiler. |
| |
| Scala |
| : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SerializationSpec.scala) { #protobuf-imports #protobuf-serializer } |
| |
| Java |
| : @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #protobuf-imports #protobuf-serializer } |
| |
| |
| To de-serialize a Protocol Buffers message in a `map` operator, convert the received byte array to the designated type with the generated `parseFrom()` method. |
| |
| This example uses resuming to react on data which can't be deserialized and ignores faulty elements. |
| |
| Scala |
| : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SerializationSpec.scala) { #protobuf-imports #protobuf-deserializer } |
| |
| Java |
| : @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #protobuf-imports #protobuf-deserializer } |
| |
| |
| ## Jackson JSON |
| |
| Serializing data to JSON text with [Jackson](https://github.com/FasterXML/jackson) in a `map` operator will turn the object instance into a String which is used as value in the @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord). |
| |
| Java |
| : @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #jackson-imports #jackson-serializer } |
| |
| |
| To de-serialize a JSON String with Jackson in a `map` operator, extract the String and apply the Jackson object reader in a `map` operator. Amend the `map` operator with the extracted type as the object reader is not generic. |
| |
| This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements. |
| |
| Java |
| : @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SerializationTest.java) { #jackson-imports #jackson-deserializer } |
| |
| |
| ## Spray JSON |
| |
| To de-serialize a JSON String with [Spray JSON](https://github.com/spray/spray-json) in a `map` operator, extract the String and use the Spray-provided implicits `parseJson` and `convertTo` in a `map` operator. |
| |
| This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements. |
| |
| Scala |
| : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SerializationSpec.scala) { #spray-imports #spray-deser } |
| |
| |
| ## Avro with Schema Registry |
| |
| If you want to use [Confluent's Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html), you need to include the dependency on `kafka-avro-serializer` as shown below. It is not available from Maven Central, that's why Confluent's repository has to be specified. These examples use `kafka-avro-seriazlizer` version $confluent.version$. |
| |
| Maven |
| : ```xml |
| <project> |
| ... |
| <dependencies> |
| ... |
| <dependency> |
| <groupId>io.confluent</groupId> |
| <artifactId>kafka-avro-serializer</artifactId> |
| <version>confluent.version (eg. 8.0.0)</version> |
| </dependency> |
| ... |
| </dependencies> |
| ... |
| <repositories> |
| <repository> |
| <id>confluent-maven-repo</id> |
| <name>Confluent Maven Repository</name> |
| <url>https://packages.confluent.io/maven/</url> |
| </repository> |
| </repositories> |
| ... |
| </project> |
| ``` |
| |
| sbt |
| : ```scala |
| libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 8.0.0 |
| resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/", |
| ``` |
| |
| Gradle |
| : ```gradle |
| dependencies { |
| compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 8.0.0 |
| } |
| repositories { |
| maven { |
| url "https://packages.confluent.io/maven/" |
| } |
| } |
| ``` |
| |
| |
| ### Producer |
| |
| To create serializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and that serializer is used in the @apidoc[ProducerSettings$]. |
| |
| Scala |
| : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #serializer } |
| |
| Java |
| : @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SchemaRegistrySerializationTest.java) { #imports #serializer } |
| |
| |
| |
| ### Consumer |
| |
| To create deserializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$]. |
| |
| Scala |
| : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #de-serializer } |
| |
| Java |
| : @@ snip [snip](/java-tests/src/test/java/docs/javadsl/SchemaRegistrySerializationTest.java) { #imports #de-serializer } |