This example illustrates how (with some minor additional implementation) the Iggy SDK can be used to compress and decompress messages in transit.
Details on how to run the examples for the Rust Iggy SDK can be found in the parent folder README.md.
Run the following commands
Start the server
cargo run --bin iggy-server -- --with-default-root-credentials
NOTE: In case the server was running before, make sure to run rm -rf local_data/ to delete server state data from prior runs.
Run the producer to write compressed messages to the server
cargo run --example message-headers-compression-producer
Run the consumer to read and decompress messages from the server
cargo run --example message-headers-compression-consumer
The compression and deccompression utilities are implemented in examples/rust/src/shared/codec.rs and used when sending messages to the server and reading them from the server.
First, define a stream and a topic name. The producer will first initiate the stream and the topic on that stream and then write the example messages to that topic within that stream. The consumer will use the names as identifier to read messages from that topic on that stream.
pub const STREAM_NAME: &str = "compression-stream"; pub const TOPIC_NAME: &str = "compression-topic";
Additionally, set a constant that defines the number of messages to be send to the server via the producer.
pub const NUM_MESSAGES: u32 = 1000;
In order to add functionality to compress and decompress messages during transit, we need to know what a message actually is and how it is implemented. Iggy implements two important types, that we need to know.
A message send to the server needs to be of type IggyMessage.
pub struct IggyMessage { /// Message metadata pub header: IggyMessageHeader, /// Message content pub payload: Bytes, /// Optional user-defined headers pub user_headers: Option<Bytes>, }
The important bits in context of this example are the payload and the user_headers. Payload is of type Bytes and corresponds to the actual message that we want to send to the server.
Let‘s suppose our example is an abstraction over a real world scenario, where some application sends it’s application logs to the iggy-server. This application is therefore the producer. We also have a monitoring service, that inspects the logs of our application to check for any service disruptions. So this monitoring service needs to read the logs from the iggy-server and is therefore the consumer.
Further suppose, the application logs are quite large and repetitive, since they follow a structured pattern (as logs usually do). It may be a good idea to reduce bandwidth by trading of some idle CPU time to compress the logs before sending them to the server. We go straight ahead, implement some compression functionalities and send the compressed messages to the server. If the monitoring service now consumes these messages we have a problem. The logs are still compressed. Even if we know that the messages are compressed we do not know how to decompress them because the algorithm that was used for compression is unknown.
This is where user_headers become handy. The definition above tells us, that user_headers are (optional) Bytes. But thats because finally everything is serialized before sending to the server. Looking at the implementation of IggyMessage we see that user_headers are a serialized HashMap with Iggy specific types HeaderKey and HeaderValue. So the user_headers are basically a set of metadata defined by us, the user, using a key and a value.
Thus, for the compression scenario the user_headers can be used to signal to a consumer that a message was compressed before it was sent to the server. The key to highlight message compression in this example is defined as:
pub const COMPRESSION_HEADER_KEY: &str = "iggy-compression";
By reading the user_headers and finding the “iggy-compression” key a consumer now knows, that the message was compressed. But it's still not transparent how it can be decompressed. We can use the HeaderValue to store the information on how to decompress the message, e.g. using a Codec.
So the HeaderKey is used to indicate that a message was compressed before transit and the HeaderValue indicates how it was compressed. Using this idea we can implement a Codec that is shared between the consumer and producer.
The Codec is an enum listing all the available algorithms to compress and decompress.
pub enum Codec { None, Lz4, }
Further, we implement three methods
header_key: Returns the HeaderKey that defines if a message was compressed or not. The consumer uses it to look for the “iggy-compression” HeaderKey when inspecting the user_headers of a message.to_header_value: Generates a HeaderValue from the specific Codec instance.from_header_value: Resolves a HeaderValue into a Codec. This is used in the consumer. After the “iggy-compression” HeaderKey was found in the user_headers we can obtain the HeaderValue, from which we obtain the Codec type using this method and thereby gain access to the decompress method.impl Codec { pub fn header_key() -> HeaderKey { HeaderKey::new(COMPRESSION_HEADER_KEY) .expect("COMPRESSION_HEADER_KEY is an InvalidHeaderKey.") } pub fn to_header_value(&self) -> HeaderValue { HeaderValue::from_str(&self.to_string()).expect("failed generating HeaderValue.") } pub fn from_header_value(value: &HeaderValue) -> Self { let name = value .as_str() .expect("could not convert HeaderValue into str."); Self::from_str(name).expect("compression algorithm not available.") }
The other two methods implement the compression and decompression logic, which is specifc to the actual Codec instance, dependent on the enum's variant. The example Codec implements two. None, where data is not compressed and Lz4 (using the lz4_flex crate). Note, that this can be easily extended to more algorithms. It might be reasonable to limit the number of bytes that can be decompressed to avoid large memory footprints, or even crashing the consumer. The decompress method, therefore takes one more byte as defined by the MAX_PAYLOAD_SIZE which is 64MB. If the decoder read the full MAX_PAYLOAD_SIZE + 1 bytes, the payload exceeds the limit and the program panics. Note, that only the Lz4 branch in the match statement applies this logic. This is safe, because an IggyMessage ensures that the payload does not exceed MAX_PAYLOAD_SIZE, when using the builder. A compressed message that meets the limit of MAX_PAYLOAD_SIZE, however, can decompress into much more bytes. In a productive environment panics would be replaced with informative errors that can be properly handled. You would most likely want to continue reading messages from the server, even if one of them exceeds the limit.
impl Codec { pub fn compress(&self, data: &[u8]) -> Vec<u8> { match self { Codec::None => Ok(data.to_vec()), Codec::Lz4 => { let mut compressed_data = Vec::new(); let mut encoder = FrameEncoder::new(&mut compressed_data); encoder .write_all(data) .expect("Cannot write into buffer using Lz4 compression."); encoder.finish().expect("Cannot finish Lz4 compression."); Ok(compressed_data) } } } pub fn decompress(&self, data: &[u8]) -> Vec<u8> { match self { Codec::None => data.to_vec(), Codec::Lz4 => { let decoder = FrameDecoder::new(data); let mut decompressed_data = Vec::new(); let bytes_read = decoder .take(MAX_PAYLOAD_SIZE as u64 + 1) .read_to_end(&mut decompressed_data) .expect("Cannot decode payload using Lz4."); if bytes_read > MAX_PAYLOAD_SIZE as usize { panic!("Decompressed message exceeds MAX_PAYLOAD_SIZE!") } decompressed_data } } } }
The example /producer/main.rs sets up a basic client that connects via TCP to a running iggy-server. Since the plain server on start-up does not have any, it creates a stream and a topic to which it writes the compressed messages.
This is how the Codec described above is used to setup the user_headers entry to signal message compression.
let codec = Codec::Lz4; let key = Codec::header_key(); let value = codec.to_header_value(); let compression_headers = HashMap::from([(key, value)]);
The builder interface for the IggyMessage type is used to construct a message. Using the user_headers method sets the user_headers of that IggyMessage. Note, that this method sets or overwrites the user_headers of that message with the provided HashMap. Extending an existing header would require using the user_headers_map method and appending the returned HashMap.
let msg = IggyMessage::builder() .payload(compressed_bytes) .user_headers(compression_headers.clone()) .build() .expect("IggyMessage should be buildable.");
Once all messages are generated they are send to the server.
The example /consumer/main.rs requires, that ../message-compression/producer/main.rs was executed before. It sets up a client that connects via TCP to a running iggy-server and reads messages from the same stream and topic that was used by the producer.
The core piece of the consumer is the while loop which awaits messages from the stream and topic. Note, the example terminates once the NUM_MESSAGES compressed messages were consumed.
while let Some(message) = consumer.next().await { // message handling }
Within that loop we make use of the handle_payload_compression method. Every ReceivedMessage is processed by that method. A ReceivedMessage is a type that has an IggyMessage and two additional fields (which do not concern us here).
pub struct ReceivedMessage { pub message: IggyMessage, pub current_offset: u64, pub partition_id: u32, }
The method decompresses the message payload. It first checks if the “iggy-compression” key is present in the user_headers of the IggyMessage that would indicate that the message is compressed. If that is not the case, the function returns Ok(()) right away. In that case there is nothing to do. For the case where the “iggy-compression” key is present in the user_headers, a Codec is setup from the algorithm that was used to compress the message. Note, that .get_user_header takes the “iggy-compression” key and if it is present returns the HeaderValue, which is the algorithm as we have defined above. So at that point, codec is Codec::Lz4. The next step then is to decompress the payload and update the payload length attribute of the IggyMessage metadata since it changed. In a next and final step we update the user_headers. Since the message is now decompressed, the user_headers entry that signals compression should be removed. If the compression key-value pair was the only user header the user_headers are set to None, otherwise we just remove the compression key-value pair from the HashMap.
fn handle_payload_compression(msg: &mut ReceivedMessage) -> Result<(), IggyError> { if let Ok(Some(algorithm)) = msg.message.get_user_header(&Codec::header_key()) { let codec = Codec::from_header_value(&algorithm); let decompressed_payload = codec.decompress(&msg.message.payload)?; msg.message.payload = Bytes::from(decompressed_payload); msg.message.header.payload_length = msg.message.payload.len() as u32; if let Ok(Some(mut headers_map)) = msg.message.user_headers_map() { headers_map.remove(&Codec::header_key()); let headers_bytes = headers_map.to_bytes(); msg.message.header.user_headers_length = headers_bytes.len() as u32; msg.message.user_headers = if headers_map.is_empty() { None } else { Some(headers_bytes) }; } } Ok(()) }
When executing the program, the consumed and decompressed messages will be printed to console.