Replay Filtering

Replay filtering in SQE deals with removing replays using a lightweight mechanism from a data source that can't be written to using the standard transactional or opaque semantics. Currently, this means writing to Kafka and later removing replays while reading from the data source in downstream topologies, though the solution here can potentially be used with other data stores, streams and states. The solution to this problem is inspired by the Kafka idempotent producer proposal here.

How it works

You can read a more detailed explanation in the link above, but the idea is pretty simple. Let's say you have two topologies. The first reads from topic 1 and writes to topic 2. The second reads from topic 2 and writes to a transactional data store. The first topology can replay records and write them into Kafka multiple times. For example, this can happen if a batch failure happens.

What we want to do is ignore those replays in the second topology. We can do this if the first topology meets the following criteria:

  • Tuples are read and processed in the order they appear in the data source. For Kafka, this means reading and processing the records in offset order. If the spout emits a batch of data and emits it in a random order, then replays cannot be removed using this method.
  • Partitioning is deterministic and maintained between the data source, inside Storm, and in the data store. This means no re-partitioning of streams inside Storm. We include a special partitioner that determines the destination partition based on the source partition. If the destination topic has the same (or more) partitions, source partition should equal destination partition. If the destination has fewer partitions, then: destination partition = source partition % total # of destination partitions.

With the above, we are able to filter replayed messages by tracking each “highwater mark (offset)” per partition. We also include a PID that identifies the producer and source, which can be from a SQE topology or other producer. This allows us to track different highwater marks for different producers and sources. If we see a record with an offset that is below the current highwater mark for it‘s PID and partition, we know we’ve already processed that record and we can toss the replay. If instead the highwater mark is higher, then we emit it normally and update the highwater mark.

To track the needed information, SQE adds a stream metadata object to each tuple. This metadata includes a PID (represented as a long), a partition (integer), and a message offset (long). For a Kafka stream, the PID is generated by hashing the topology name, stream name, ZK hosts, and ZK path together. Even though partition and offset map directly to the same concepts in Kafka, it can be extended to other streams/spouts. For storage in Kafka, the stream metadata is converted to a size 20 byte array with PID, partition, offset order.

In the future, we may be able to extend this to (partition) local aggregations, as long as we can produce a single piece of metadata from multiple tuples (group on PID and partition, max on offset?) and maintain offset ordering for each aggregation batch. This is not supported at the moment, however.

How to enable replay filtering

State Options

Set the following options for the Kafka state to record the stream metadata and to ensure records are written to the appropriate partitions:

  • jw.sqe.state.kafka.keytype - StreamMetadata
  • jw.sqe.state.kafka.partitionClass - com.jwplayer.sqe.language.state.kafka.SourcePartitionPartitioner

Stream Options

If the stream metadata is included in the key of each record (records with NULL or invalid keys don't get filtered), all that needs to be done on the stream is to enable replay filtering:

  • jw.sqe.spout.kafka.filterReplays - true