Serialization

The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a map operation in the Apache Pekko Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Apache Pekko Stream, it is easier to implement the desired error handling strategy as the examples below show.

Protocol buffers

Protocol Buffers offer a language-neutral, platform-neutral, extensible mechanism for serializing structured data and allow consumers and producers to rely on the message format.

The easiest way to use Protocol Buffers with Apache Pekko Connectors Kafka is to serialize and deserialize the Kafka message payload as a byte array and call the Protocol Buffers serialization and deserialization in a regular map operator. To serialize the Protobuf-defined type Order into a byte array use the .toByteArray() method which gets generated by the Protobuf compiler.

Scala : @@ snip snip { #protobuf-imports #protobuf-serializer }

Java : @@ snip snip { #protobuf-imports #protobuf-serializer }

To de-serialize a Protocol Buffers message in a map operator, convert the received byte array to the designated type with the generated parseFrom() method.

This example uses resuming to react on data which can't be deserialized and ignores faulty elements.

Scala : @@ snip snip { #protobuf-imports #protobuf-deserializer }

Java : @@ snip snip { #protobuf-imports #protobuf-deserializer }

Jackson JSON

Serializing data to JSON text with Jackson in a map operator will turn the object instance into a String which is used as value in the @javadocProducerRecord.

Java : @@ snip snip { #jackson-imports #jackson-serializer }

To de-serialize a JSON String with Jackson in a map operator, extract the String and apply the Jackson object reader in a map operator. Amend the map operator with the extracted type as the object reader is not generic.

This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements.

Java : @@ snip snip { #jackson-imports #jackson-deserializer }

Spray JSON

To de-serialize a JSON String with Spray JSON in a map operator, extract the String and use the Spray-provided implicits parseJson and convertTo in a map operator.

This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements.

Scala : @@ snip snip { #spray-imports #spray-deser }

Avro with Schema Registry

If you want to use Confluent's Schema Registry, you need to include the dependency on kafka-avro-serializer as shown below. It is not available from Maven Central, that‘s why Confluent’s repository has to be specified. These examples use kafka-avro-seriazlizer version $confluent.version$.

Maven : xml <project> ... <dependencies> ... <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>confluent.version (eg. 8.0.0)</version> </dependency> ... </dependencies> ... <repositories> <repository> <id>confluent-maven-repo</id> <name>Confluent Maven Repository</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories> ... </project>

sbt : scala libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 8.0.0 resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/",

Gradle : gradle dependencies { compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 8.0.0 } repositories { maven { url "https://packages.confluent.io/maven/" } }

Producer

To create serializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to the serializer and that serializer is used in the @apidoc[ProducerSettings$].

Scala : @@ snip snip { #imports #serializer }

Java : @@ snip snip { #imports #serializer }

Consumer

To create deserializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$].

Scala : @@ snip snip { #imports #de-serializer }

Java : @@ snip snip { #imports #de-serializer }