The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a map
operation in the Apache Pekko Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Apache Pekko Stream, it is easier to implement the desired error handling strategy as the examples below show.
Protocol Buffers offer a language-neutral, platform-neutral, extensible mechanism for serializing structured data and allow consumers and producers to rely on the message format.
The easiest way to use Protocol Buffers with Apache Pekko Connectors Kafka is to serialize and deserialize the Kafka message payload as a byte array and call the Protocol Buffers serialization and deserialization in a regular map
operator. To serialize the Protobuf-defined type Order
into a byte array use the .toByteArray()
method which gets generated by the Protobuf compiler.
Scala : @@ snip snip { #protobuf-imports #protobuf-serializer }
Java : @@ snip snip { #protobuf-imports #protobuf-serializer }
To de-serialize a Protocol Buffers message in a map
operator, convert the received byte array to the designated type with the generated parseFrom()
method.
This example uses resuming to react on data which can't be deserialized and ignores faulty elements.
Scala : @@ snip snip { #protobuf-imports #protobuf-deserializer }
Java : @@ snip snip { #protobuf-imports #protobuf-deserializer }
Serializing data to JSON text with Jackson in a map
operator will turn the object instance into a String which is used as value in the @javadocProducerRecord.
Java : @@ snip snip { #jackson-imports #jackson-serializer }
To de-serialize a JSON String with Jackson in a map
operator, extract the String and apply the Jackson object reader in a map
operator. Amend the map
operator with the extracted type as the object reader is not generic.
This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements.
Java : @@ snip snip { #jackson-imports #jackson-deserializer }
To de-serialize a JSON String with Spray JSON in a map
operator, extract the String and use the Spray-provided implicits parseJson
and convertTo
in a map
operator.
This example uses resuming to react on data which can't be parsed correctly and ignores faulty elements.
Scala : @@ snip snip { #spray-imports #spray-deser }
If you want to use Confluent's Schema Registry, you need to include the dependency on kafka-avro-serializer
as shown below. It is not available from Maven Central, that‘s why Confluent’s repository has to be specified. These examples use kafka-avro-seriazlizer
version $confluent.version$.
Maven : xml <project> ... <dependencies> ... <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>confluent.version (eg. 8.0.0)</version> </dependency> ... </dependencies> ... <repositories> <repository> <id>confluent-maven-repo</id> <name>Confluent Maven Repository</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories> ... </project>
sbt : scala libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 8.0.0 resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/",
Gradle : gradle dependencies { compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 8.0.0 } repositories { maven { url "https://packages.confluent.io/maven/" } }
To create serializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
to the serializer and that serializer is used in the @apidoc[ProducerSettings$].
Scala : @@ snip snip { #imports #serializer }
Java : @@ snip snip { #imports #serializer }
To create deserializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$].
Scala : @@ snip snip { #imports #de-serializer }
Java : @@ snip snip { #imports #de-serializer }