feat(security): encrypt user headers alongside message payload (#3040)
Server-side encryption protected message payloads and state
log entries but left user-defined headers in plaintext on
disk. Client-side encryption in the Rust and C# SDKs had
the same gap, encrypting only the payload.
Both server and all SDKs with client-side encryption now
encrypt and decrypt user headers using the same encryptor
as the payload. The integration test verifies no plaintext
headers appear on disk when encryption is enabled, and a
new unit test covers the client-side round-trip. The C#
SDK adds TryMapHeaders for graceful handling of encrypted
header bytes during deserialization.
BREAKING
diff --git a/Cargo.lock b/Cargo.lock
index d10cb6e..f8b5a94 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5183,7 +5183,7 @@
[[package]]
name = "iggy"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
dependencies = [
"async-broadcast",
"async-dropper",
@@ -5271,7 +5271,7 @@
[[package]]
name = "iggy-cli"
-version = "0.11.3-edge.1"
+version = "0.12.1-edge.1"
dependencies = [
"ahash 0.8.12",
"anyhow",
@@ -5301,7 +5301,7 @@
[[package]]
name = "iggy-connectors"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"axum",
@@ -5353,7 +5353,7 @@
[[package]]
name = "iggy-mcp"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"axum",
"axum-server",
@@ -5385,7 +5385,7 @@
[[package]]
name = "iggy_binary_protocol"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
dependencies = [
"bytemuck",
"bytes",
@@ -5395,7 +5395,7 @@
[[package]]
name = "iggy_common"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
dependencies = [
"aes-gcm",
"ahash 0.8.12",
@@ -5441,7 +5441,7 @@
[[package]]
name = "iggy_connector_elasticsearch_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -5460,7 +5460,7 @@
[[package]]
name = "iggy_connector_elasticsearch_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -5479,7 +5479,7 @@
[[package]]
name = "iggy_connector_iceberg_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"arrow-json",
"async-trait",
@@ -5499,7 +5499,7 @@
[[package]]
name = "iggy_connector_mongodb_sink"
-version = "0.3.0"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"humantime",
@@ -5515,7 +5515,7 @@
[[package]]
name = "iggy_connector_postgres_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -5535,7 +5535,7 @@
[[package]]
name = "iggy_connector_postgres_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -5557,7 +5557,7 @@
[[package]]
name = "iggy_connector_quickwit_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -5572,7 +5572,7 @@
[[package]]
name = "iggy_connector_random_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -5590,7 +5590,7 @@
[[package]]
name = "iggy_connector_sdk"
-version = "0.2.1-edge.1"
+version = "0.2.2-edge.1"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -5620,7 +5620,7 @@
[[package]]
name = "iggy_connector_stdout_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -9723,7 +9723,7 @@
[[package]]
name = "server"
-version = "0.7.3-edge.1"
+version = "0.7.4-edge.1"
dependencies = [
"ahash 0.8.12",
"anyhow",
diff --git a/Cargo.toml b/Cargo.toml
index 5918468..ed714d5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -160,11 +160,11 @@
iceberg = "0.9.0"
iceberg-catalog-rest = "0.9.0"
iceberg-storage-opendal = "0.9.0"
-iggy = { path = "core/sdk", version = "0.9.4-edge.1" }
-iggy-cli = { path = "core/cli", version = "0.11.3-edge.1" }
-iggy_binary_protocol = { path = "core/binary_protocol", version = "0.9.4-edge.1" }
-iggy_common = { path = "core/common", version = "0.9.4-edge.1" }
-iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.1-edge.1" }
+iggy = { path = "core/sdk", version = "0.9.5-edge.1" }
+iggy-cli = { path = "core/cli", version = "0.12.1-edge.1" }
+iggy_binary_protocol = { path = "core/binary_protocol", version = "0.9.5-edge.1" }
+iggy_common = { path = "core/common", version = "0.9.5-edge.1" }
+iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.2-edge.1" }
integration = { path = "core/integration" }
journal = { path = "core/journal" }
js-sys = "0.3"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index b6100cd..cea57b4 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -451,24 +451,24 @@
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
if_chain: 1.0.3, "Apache-2.0 OR MIT",
-iggy: 0.9.4-edge.1, "Apache-2.0",
+iggy: 0.9.5-edge.1, "Apache-2.0",
iggy-bench: 0.4.1-edge.1, "Apache-2.0",
iggy-bench-dashboard-server: 0.6.3-edge.1, "Apache-2.0",
-iggy-cli: 0.11.3-edge.1, "Apache-2.0",
-iggy-connectors: 0.3.2-edge.1, "Apache-2.0",
-iggy-mcp: 0.3.2-edge.1, "Apache-2.0",
-iggy_binary_protocol: 0.9.4-edge.1, "Apache-2.0",
-iggy_common: 0.9.4-edge.1, "Apache-2.0",
-iggy_connector_elasticsearch_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_elasticsearch_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_iceberg_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0",
-iggy_connector_postgres_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_postgres_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_quickwit_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_random_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_sdk: 0.2.1-edge.1, "Apache-2.0",
-iggy_connector_stdout_sink: 0.3.2-edge.1, "Apache-2.0",
+iggy-cli: 0.12.1-edge.1, "Apache-2.0",
+iggy-connectors: 0.3.3-edge.1, "Apache-2.0",
+iggy-mcp: 0.3.3-edge.1, "Apache-2.0",
+iggy_binary_protocol: 0.9.5-edge.1, "Apache-2.0",
+iggy_common: 0.9.5-edge.1, "Apache-2.0",
+iggy_connector_elasticsearch_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_elasticsearch_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_iceberg_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_mongodb_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_postgres_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_postgres_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_quickwit_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_random_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_sdk: 0.2.2-edge.1, "Apache-2.0",
+iggy_connector_stdout_sink: 0.3.3-edge.1, "Apache-2.0",
iggy_examples: 0.0.6, "Apache-2.0",
ignore: 0.4.25, "MIT OR Unlicense",
image: 0.25.10, "Apache-2.0 OR MIT",
@@ -843,7 +843,7 @@
serde_yaml_ng: 0.10.0, "MIT",
serial_test: 3.4.0, "MIT",
serial_test_derive: 3.4.0, "MIT",
-server: 0.7.3-edge.1, "Apache-2.0",
+server: 0.7.4-edge.1, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 071e8ae..e013d59 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-mcp"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "MCP Server for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index b479ecf..51f3109 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_binary_protocol"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
description = "Wire protocol types and codec for the Iggy binary protocol. Shared between server and SDK."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml
index ea3c1fc..85694e3 100644
--- a/core/cli/Cargo.toml
+++ b/core/cli/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-cli"
-version = "0.11.3-edge.1"
+version = "0.12.1-edge.1"
edition = "2024"
authors = ["bartosz.ciesla@gmail.com"]
repository = "https://github.com/apache/iggy"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 56e9974..a2fd8ae 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
name = "iggy_common"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/common/src/commands/messages/send_messages.rs b/core/common/src/commands/messages/send_messages.rs
index 91a6aa4..2943340 100644
--- a/core/common/src/commands/messages/send_messages.rs
+++ b/core/common/src/commands/messages/send_messages.rs
@@ -203,12 +203,19 @@
let payload_base64 = BASE64.encode(msg_view.payload());
map.insert("payload", serde_json::to_value(payload_base64).unwrap());
- if let Ok(Some(headers)) = msg_view.user_headers_map() {
- let entries: Vec<HeaderEntry> = headers
- .into_iter()
- .map(|(k, v)| HeaderEntry { key: k, value: v })
- .collect();
- map.insert("user_headers", serde_json::to_value(&entries).unwrap());
+ match msg_view.user_headers_map() {
+ Ok(Some(headers)) => {
+ let entries: Vec<HeaderEntry> = headers
+ .into_iter()
+ .map(|(k, v)| HeaderEntry { key: k, value: v })
+ .collect();
+ map.insert("user_headers", serde_json::to_value(&entries).unwrap());
+ }
+ _ if msg_view.user_headers().is_some() => {
+ let raw_base64 = BASE64.encode(msg_view.user_headers().unwrap());
+ map.insert("user_headers", serde_json::to_value(raw_base64).unwrap());
+ }
+ _ => {}
}
map
@@ -310,29 +317,38 @@
.decode(payload)
.map_err(|_| de::Error::custom("Invalid base64 payload"))?;
- let headers_map = if let Some(headers) = msg.get("user_headers") {
- if headers.is_null() {
- None
- } else {
- let entries: Vec<HeaderEntry> = serde_json::from_value(
- headers.clone(),
- )
- .map_err(|e| {
- de::Error::custom(format!(
- "Invalid headers format: {e}"
- ))
- })?;
- let mut map = HashMap::new();
- for entry in entries {
- map.insert(entry.key, entry.value);
+ let (headers_map, raw_headers) =
+ if let Some(headers) = msg.get("user_headers") {
+ if headers.is_null() {
+ (None, None)
+ } else if let Some(base64_str) = headers.as_str() {
+ // Raw base64-encoded header bytes (e.g. client-side encrypted)
+ let raw = BASE64.decode(base64_str).map_err(|e| {
+ de::Error::custom(format!(
+ "Invalid base64 headers: {e}"
+ ))
+ })?;
+ (None, Some(Bytes::from(raw)))
+ } else {
+ let entries: Vec<HeaderEntry> = serde_json::from_value(
+ headers.clone(),
+ )
+ .map_err(|e| {
+ de::Error::custom(format!(
+ "Invalid headers format: {e}"
+ ))
+ })?;
+ let mut map = HashMap::new();
+ for entry in entries {
+ map.insert(entry.key, entry.value);
+ }
+ (Some(map), None)
}
- Some(map)
- }
- } else {
- None
- };
+ } else {
+ (None, None)
+ };
- let iggy_message = if let Some(headers) = headers_map {
+ let mut iggy_message = if let Some(headers) = headers_map {
IggyMessage::builder()
.id(id)
.payload(payload_bytes.into())
@@ -355,6 +371,11 @@
})?
};
+ if let Some(raw) = raw_headers {
+ iggy_message.header.user_headers_length = raw.len() as u32;
+ iggy_message.user_headers = Some(raw);
+ }
+
iggy_messages.push(iggy_message);
}
diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs
index 12a2c1a..4fc214d 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -552,19 +552,25 @@
let base64_payload = STANDARD.encode(&self.payload);
state.serialize_field("payload", &base64_payload)?;
- let entries: Vec<HeaderEntry> = if self.user_headers.is_some() {
- let headers_map = self.user_headers_map().map_err(serde::ser::Error::custom)?;
- if let Some(map) = headers_map {
- map.into_iter()
- .map(|(key, value)| HeaderEntry { key, value })
- .collect()
- } else {
- Vec::new()
+ match self.user_headers.as_ref() {
+ Some(raw_bytes) => match self.user_headers_map() {
+ Ok(Some(map)) => {
+ let entries: Vec<HeaderEntry> = map
+ .into_iter()
+ .map(|(key, value)| HeaderEntry { key, value })
+ .collect();
+ state.serialize_field("user_headers", &entries)?;
+ }
+ _ => {
+ let base64_headers = STANDARD.encode(raw_bytes);
+ state.serialize_field("user_headers", &base64_headers)?;
+ }
+ },
+ None => {
+ let empty: Vec<HeaderEntry> = Vec::new();
+ state.serialize_field("user_headers", &empty)?;
}
- } else {
- Vec::new()
- };
- state.serialize_field("user_headers", &entries)?;
+ }
state.end()
}
@@ -595,6 +601,7 @@
let mut header: Option<IggyMessageHeader> = None;
let mut payload: Option<Bytes> = None;
let mut user_headers: Option<HashMap<HeaderKey, HeaderValue>> = None;
+ let mut raw_user_headers: Option<Bytes> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
@@ -610,12 +617,28 @@
payload = Some(Bytes::from(decoded));
}
"user_headers" => {
- let entries: Vec<HeaderEntry> = map.next_value()?;
- let mut headers_map = HashMap::new();
- for entry in entries {
- headers_map.insert(entry.key, entry.value);
+ // Try as array of HeaderEntry first, fall back to base64 string
+ let value: serde_json::Value = map.next_value()?;
+ if let Some(base64_str) = value.as_str() {
+ use base64::{Engine as _, engine::general_purpose::STANDARD};
+ let decoded =
+ STANDARD.decode(base64_str.as_bytes()).map_err(|e| {
+ de::Error::custom(format!(
+ "Failed to decode base64 headers: {e}"
+ ))
+ })?;
+ raw_user_headers = Some(Bytes::from(decoded));
+ } else if value.is_array() {
+ let entries: Vec<HeaderEntry> = serde_json::from_value(value)
+ .map_err(|e| {
+ de::Error::custom(format!("Invalid headers format: {e}"))
+ })?;
+ let mut headers_map = HashMap::new();
+ for entry in entries {
+ headers_map.insert(entry.key, entry.value);
+ }
+ user_headers = Some(headers_map);
}
- user_headers = Some(headers_map);
}
_ => {
let _ = map.next_value::<de::IgnoredAny>()?;
@@ -626,7 +649,11 @@
let header = header.ok_or_else(|| de::Error::missing_field("header"))?;
let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
- let user_headers_bytes = user_headers.map(|headers| headers.to_bytes());
+ let user_headers_bytes = if let Some(raw) = raw_user_headers {
+ Some(raw)
+ } else {
+ user_headers.map(|headers| headers.to_bytes())
+ };
let user_headers_length = user_headers_bytes
.as_ref()
diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index fca62dd..24608cb 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -95,6 +95,7 @@
#[cfg(test)]
mod tests {
use super::*;
+ use crate::{HeaderKey, HeaderValue, IggyMessage};
#[test]
fn given_the_same_key_data_should_be_encrypted_and_decrypted_correctly() {
@@ -125,4 +126,73 @@
let error = decrypted_data.err().unwrap();
assert_eq!(error.as_code(), IggyError::CannotDecryptData.as_code());
}
+
+ #[test]
+ fn message_payload_and_headers_should_encrypt_and_decrypt_correctly() {
+ use bytes::Bytes;
+ use std::collections::HashMap;
+
+ let key = [1; 32];
+ let encryptor = Aes256GcmEncryptor::new(&key).unwrap();
+
+ let mut headers = HashMap::new();
+ headers.insert(
+ HeaderKey::try_from("batch").unwrap(),
+ HeaderValue::from(1u64),
+ );
+ headers.insert(
+ HeaderKey::try_from("type").unwrap(),
+ HeaderValue::try_from("test-message").unwrap(),
+ );
+
+ let mut message = IggyMessage::builder()
+ .payload(Bytes::from("test payload data"))
+ .user_headers(headers)
+ .build()
+ .unwrap();
+
+ let original_payload = message.payload.clone();
+ let original_headers = message.user_headers.clone().unwrap();
+
+ message.payload = Bytes::from(encryptor.encrypt(&message.payload).unwrap());
+ message.header.payload_length = message.payload.len() as u32;
+
+ let encrypted_headers = encryptor.encrypt(&original_headers).unwrap();
+ message.header.user_headers_length = encrypted_headers.len() as u32;
+ message.user_headers = Some(Bytes::from(encrypted_headers));
+
+ assert_ne!(message.payload, original_payload);
+ assert_ne!(message.user_headers.as_ref().unwrap(), &original_headers);
+
+ let decrypted_payload = encryptor.decrypt(&message.payload).unwrap();
+ message.payload = Bytes::from(decrypted_payload);
+ message.header.payload_length = message.payload.len() as u32;
+
+ let decrypted_headers = encryptor
+ .decrypt(message.user_headers.as_ref().unwrap())
+ .unwrap();
+ message.header.user_headers_length = decrypted_headers.len() as u32;
+ message.user_headers = Some(Bytes::from(decrypted_headers));
+
+ assert_eq!(message.payload, original_payload);
+ assert_eq!(message.user_headers.as_ref().unwrap(), &original_headers);
+
+ let parsed = message.user_headers_map().unwrap().unwrap();
+ assert_eq!(
+ parsed
+ .get(&HeaderKey::try_from("batch").unwrap())
+ .unwrap()
+ .as_uint64()
+ .unwrap(),
+ 1
+ );
+ assert_eq!(
+ parsed
+ .get(&HeaderKey::try_from("type").unwrap())
+ .unwrap()
+ .as_str()
+ .unwrap(),
+ "test-message"
+ );
+ }
}
diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml
index 20d42de..8e78c32 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-connectors"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Connectors runtime for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 34d2425..97a508b 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_sdk"
-version = "0.2.1-edge.1"
+version = "0.2.2-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
index 40923ea..cb9b888 100644
--- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml
+++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_elasticsearch_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Iggy Elasticsearch sink connector"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/iceberg_sink/Cargo.toml b/core/connectors/sinks/iceberg_sink/Cargo.toml
index 527a27d..e3ac0ae 100644
--- a/core/connectors/sinks/iceberg_sink/Cargo.toml
+++ b/core/connectors/sinks/iceberg_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_iceberg_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming"]
diff --git a/core/connectors/sinks/mongodb_sink/Cargo.toml b/core/connectors/sinks/mongodb_sink/Cargo.toml
index 0a4305f..849c6f7 100644
--- a/core/connectors/sinks/mongodb_sink/Cargo.toml
+++ b/core/connectors/sinks/mongodb_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_mongodb_sink"
-version = "0.3.0"
+version = "0.3.3-edge.1"
description = "Iggy MongoDB sink connector for storing stream messages into MongoDB database"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml b/core/connectors/sinks/postgres_sink/Cargo.toml
index 56ab685..04cc199 100644
--- a/core/connectors/sinks/postgres_sink/Cargo.toml
+++ b/core/connectors/sinks/postgres_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_postgres_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Iggy PostgreSQL sink connector for storing stream messages into PostgreSQL database"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml b/core/connectors/sinks/quickwit_sink/Cargo.toml
index 3aea64b..cc54878 100644
--- a/core/connectors/sinks/quickwit_sink/Cargo.toml
+++ b/core/connectors/sinks/quickwit_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_quickwit_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml b/core/connectors/sinks/stdout_sink/Cargo.toml
index 270e525..df123b5 100644
--- a/core/connectors/sinks/stdout_sink/Cargo.toml
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_stdout_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml b/core/connectors/sources/elasticsearch_source/Cargo.toml
index d1bb9c6..ed7365e 100644
--- a/core/connectors/sources/elasticsearch_source/Cargo.toml
+++ b/core/connectors/sources/elasticsearch_source/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_elasticsearch_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Iggy Elasticsearch source connector"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml
index bbc82be..6281670 100644
--- a/core/connectors/sources/postgres_source/Cargo.toml
+++ b/core/connectors/sources/postgres_source/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_postgres_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Iggy PostgreSQL source connector supporting CDC and table polling for message streaming platform"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sources/random_source/Cargo.toml b/core/connectors/sources/random_source/Cargo.toml
index 7a0eed4..b3e3d66 100644
--- a/core/connectors/sources/random_source/Cargo.toml
+++ b/core/connectors/sources/random_source/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_random_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/integration/src/harness/handle/client_builder.rs b/core/integration/src/harness/handle/client_builder.rs
index ef35b99..66998bb 100644
--- a/core/integration/src/harness/handle/client_builder.rs
+++ b/core/integration/src/harness/handle/client_builder.rs
@@ -72,6 +72,7 @@
connection: ServerConnection,
auto_login: Option<AutoLoginConfig>,
tcp_nodelay: bool,
+ encryptor: Option<Arc<iggy_common::EncryptorKind>>,
}
impl ClientBuilder {
@@ -81,6 +82,7 @@
connection,
auto_login: None,
tcp_nodelay: false,
+ encryptor: None,
}
}
@@ -102,6 +104,12 @@
self
}
+ /// Set the client-side encryptor for encrypting/decrypting message payloads and headers.
+ pub fn with_encryptor(mut self, encryptor: Arc<iggy_common::EncryptorKind>) -> Self {
+ self.encryptor = Some(encryptor);
+ self
+ }
+
/// Connect to the server and optionally perform auto-login.
pub async fn connect(self) -> Result<IggyClient, TestBinaryError> {
let client = match self.transport {
@@ -168,7 +176,7 @@
Ok(IggyClient::create(
iggy::prelude::ClientWrapper::Tcp(client),
None,
- None,
+ self.encryptor.clone(),
))
}
@@ -195,7 +203,7 @@
Ok(IggyClient::create(
iggy::prelude::ClientWrapper::Http(client),
None,
- None,
+ self.encryptor.clone(),
))
}
@@ -231,7 +239,7 @@
Ok(IggyClient::create(
iggy::prelude::ClientWrapper::Quic(client),
None,
- None,
+ self.encryptor.clone(),
))
}
@@ -282,7 +290,7 @@
Ok(IggyClient::create(
iggy::prelude::ClientWrapper::WebSocket(client),
None,
- None,
+ self.encryptor.clone(),
))
}
diff --git a/core/integration/src/harness/orchestrator/harness.rs b/core/integration/src/harness/orchestrator/harness.rs
index 6e79058..6897b31 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -22,7 +22,8 @@
use crate::harness::context::TestContext;
use crate::harness::error::TestBinaryError;
use crate::harness::handle::{
- ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerHandle, ServerLogs,
+ ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerHandle,
+ ServerLogs,
};
use crate::harness::traits::{Restartable, TestBinary};
use futures::executor::block_on;
@@ -271,19 +272,28 @@
}
}
- /// Create a new client logged in as root for the specified transport.
pub async fn root_client_for(
&self,
transport: TransportProtocol,
) -> Result<IggyClient, TestBinaryError> {
+ self.client_builder_for(transport)?
+ .with_root_login()
+ .connect()
+ .await
+ }
+
+ /// Create a new client logged in as root for the specified transport.
+ pub fn client_builder_for(
+ &self,
+ transport: TransportProtocol,
+ ) -> Result<ClientBuilder, TestBinaryError> {
let server = self.servers.first().ok_or(TestBinaryError::MissingServer)?;
- let builder = match transport {
- TransportProtocol::Tcp => server.tcp_client()?,
- TransportProtocol::Http => server.http_client()?,
- TransportProtocol::Quic => server.quic_client()?,
- TransportProtocol::WebSocket => server.websocket_client()?,
- };
- builder.with_root_login().connect().await
+ match transport {
+ TransportProtocol::Tcp => server.tcp_client(),
+ TransportProtocol::Http => server.http_client(),
+ TransportProtocol::Quic => server.quic_client(),
+ TransportProtocol::WebSocket => server.websocket_client(),
+ }
}
/// Create multiple root clients for the specified transport.
diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs b/core/integration/tests/server/scenarios/encryption_scenario.rs
index f03741c..6881f5f 100644
--- a/core/integration/tests/server/scenarios/encryption_scenario.rs
+++ b/core/integration/tests/server/scenarios/encryption_scenario.rs
@@ -18,36 +18,14 @@
use bytes::Bytes;
use iggy::prelude::*;
+use iggy_common::TransportProtocol;
use integration::harness::{TestHarness, TestServerConfig};
use serial_test::parallel;
use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
use test_case::test_matrix;
-fn encryption_enabled() -> bool {
- true
-}
-
-fn encryption_disabled() -> bool {
- false
-}
-
-fn build_server_config(encryption: bool) -> TestServerConfig {
- let mut extra_envs = HashMap::new();
-
- if encryption {
- extra_envs.insert(
- "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(),
- "true".to_string(),
- );
- extra_envs.insert(
- "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(),
- "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(),
- );
- }
-
- TestServerConfig::builder().extra_envs(extra_envs).build()
-}
-
#[test_matrix(
[encryption_enabled(), encryption_disabled()]
)]
@@ -128,6 +106,43 @@
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+ // Verify on-disk encryption of headers and payload
+ let data_path = harness.server().data_path();
+ let log_files = find_log_files(&data_path);
+ assert!(
+ !log_files.is_empty(),
+ "Expected at least one .log segment file on disk"
+ );
+ let all_raw_bytes: Vec<u8> = log_files
+ .iter()
+ .flat_map(|p| std::fs::read(p).unwrap())
+ .collect();
+ let contains_plaintext_header = all_raw_bytes
+ .windows(b"test-message".len())
+ .any(|w| w == b"test-message");
+ let contains_plaintext_payload = all_raw_bytes
+ .windows(b"Message batch 1".len())
+ .any(|w| w == b"Message batch 1");
+ if encryption {
+ assert!(
+ !contains_plaintext_header,
+ "When encryption is enabled, header values must NOT appear as plaintext on disk"
+ );
+ assert!(
+ !contains_plaintext_payload,
+ "When encryption is enabled, payload must NOT appear as plaintext on disk"
+ );
+ } else {
+ assert!(
+ contains_plaintext_header,
+ "When encryption is disabled, header values should appear as plaintext on disk"
+ );
+ assert!(
+ contains_plaintext_payload,
+ "When encryption is disabled, payload should appear as plaintext on disk"
+ );
+ }
+
let initial_stats = client.get_stats().await.unwrap();
let initial_messages_count = initial_stats.messages_count;
let initial_messages_size = initial_stats.messages_size_bytes;
@@ -317,3 +332,190 @@
initial_messages_size.as_bytes_u64()
);
}
+
+#[test_matrix(
+ [TransportProtocol::Tcp, TransportProtocol::Http, TransportProtocol::Quic, TransportProtocol::WebSocket]
+)]
+#[tokio::test]
+#[parallel]
+async fn should_encrypt_and_decrypt_headers_with_client_side_encryption(
+ transport: TransportProtocol,
+) {
+ let mut harness = TestHarness::builder()
+ .server(TestServerConfig::default())
+ .build()
+ .unwrap();
+
+ harness.start().await.unwrap();
+
+ let setup_client = harness.tcp_root_client().await.unwrap();
+ let stream_name = format!("client-enc-{transport}");
+ let topic_name = "enc-topic";
+
+ setup_client.create_stream(&stream_name).await.unwrap();
+ setup_client
+ .create_topic(
+ &Identifier::named(&stream_name).unwrap(),
+ topic_name,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ let encryptor = Arc::new(EncryptorKind::Aes256Gcm(
+ Aes256GcmEncryptor::new(&[42u8; 32]).unwrap(),
+ ));
+
+ let encrypting_client = harness
+ .client_builder_for(transport)
+ .unwrap()
+ .with_encryptor(encryptor)
+ .with_root_login()
+ .connect()
+ .await
+ .unwrap();
+
+ let stream_id = Identifier::named(&stream_name).unwrap();
+ let topic_id = Identifier::named(topic_name).unwrap();
+
+ let mut messages = Vec::new();
+ for i in 0..10i64 {
+ let mut headers = HashMap::new();
+ headers.insert(HeaderKey::try_from("index").unwrap(), HeaderValue::from(i));
+ headers.insert(
+ HeaderKey::try_from("transport").unwrap(),
+ HeaderValue::try_from(transport.to_string().as_str()).unwrap(),
+ );
+
+ messages.push(
+ IggyMessage::builder()
+ .id((i + 1) as u128)
+ .payload(Bytes::from(format!("client encrypted msg {i}")))
+ .user_headers(headers)
+ .build()
+ .unwrap(),
+ );
+ }
+
+ encrypting_client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+
+ let polled = encrypting_client
+ .poll_messages(
+ &stream_id,
+ &topic_id,
+ Some(0),
+ &Consumer::default(),
+ &PollingStrategy::offset(0),
+ 10,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(polled.messages.len(), 10);
+ for (i, msg) in polled.messages.iter().enumerate() {
+ assert_eq!(
+ std::str::from_utf8(&msg.payload).unwrap(),
+ format!("client encrypted msg {i}")
+ );
+
+ let headers = msg.user_headers_map().unwrap().unwrap();
+ assert_eq!(
+ headers
+ .get(&HeaderKey::try_from("index").unwrap())
+ .unwrap()
+ .as_int64()
+ .unwrap(),
+ i as i64
+ );
+ assert_eq!(
+ headers
+ .get(&HeaderKey::try_from("transport").unwrap())
+ .unwrap()
+ .as_str()
+ .unwrap(),
+ transport.to_string().as_str()
+ );
+ }
+
+ let client_without_encryptor = harness.root_client_for(transport).await.unwrap();
+ let polled_without_decryption = client_without_encryptor
+ .poll_messages(
+ &stream_id,
+ &topic_id,
+ Some(0),
+ &Consumer::default(),
+ &PollingStrategy::offset(0),
+ 10,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(polled_without_decryption.messages.len(), 10);
+ for msg in &polled_without_decryption.messages {
+ let payload_text = std::str::from_utf8(&msg.payload).unwrap_or("");
+ assert!(
+ !payload_text.starts_with("client encrypted msg"),
+ "Payload must not be readable without the encryptor"
+ );
+
+ let headers = msg.user_headers_map().unwrap();
+ assert!(
+ headers.is_none(),
+ "Headers must not be parseable without the encryptor"
+ );
+ }
+}
+
+fn encryption_enabled() -> bool {
+ true
+}
+
+fn encryption_disabled() -> bool {
+ false
+}
+
+fn build_server_config(encryption: bool) -> TestServerConfig {
+ let mut extra_envs = HashMap::new();
+
+ if encryption {
+ extra_envs.insert(
+ "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(),
+ "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(),
+ );
+ }
+
+ TestServerConfig::builder().extra_envs(extra_envs).build()
+}
+
+fn find_log_files(dir: &Path) -> Vec<PathBuf> {
+ let mut result = Vec::new();
+ if let Ok(entries) = std::fs::read_dir(dir) {
+ for entry in entries.flatten() {
+ let path = entry.path();
+ if path.is_dir() {
+ result.extend(find_log_files(&path));
+ } else if path.extension().and_then(|e| e.to_str()) == Some("log") {
+ result.push(path);
+ }
+ }
+ }
+ result
+}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index cbd4050..3627c87 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/sdk/src/clients/binary_message.rs b/core/sdk/src/clients/binary_message.rs
index 63f064d..462c60e 100644
--- a/core/sdk/src/clients/binary_message.rs
+++ b/core/sdk/src/clients/binary_message.rs
@@ -61,6 +61,12 @@
let payload = encryptor.decrypt(&message.payload)?;
message.payload = Bytes::from(payload);
message.header.payload_length = message.payload.len() as u32;
+
+ if let Some(ref user_headers) = message.user_headers {
+ let decrypted_headers = encryptor.decrypt(user_headers)?;
+ message.header.user_headers_length = decrypted_headers.len() as u32;
+ message.user_headers = Some(Bytes::from(decrypted_headers));
+ }
}
}
@@ -82,6 +88,12 @@
for message in &mut *messages {
message.payload = Bytes::from(encryptor.encrypt(&message.payload)?);
message.header.payload_length = message.payload.len() as u32;
+
+ if let Some(ref user_headers) = message.user_headers {
+ let encrypted_headers = encryptor.encrypt(user_headers)?;
+ message.header.user_headers_length = encrypted_headers.len() as u32;
+ message.user_headers = Some(Bytes::from(encrypted_headers));
+ }
}
}
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 37b2032..17d2c42 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -981,12 +981,12 @@
} else {
if let Some(ref encryptor) = self.encryptor {
for message in &mut polled_messages.messages {
+ let offset = message.header.offset;
let payload = encryptor.decrypt(&message.payload);
if let Err(error) = payload {
self.poll_future = None;
error!(
- "Failed to decrypt the message payload at offset: {}, partition ID: {}",
- message.header.offset, partition_id
+ "Failed to decrypt the message payload at offset: {offset}, partition ID: {partition_id}",
);
return Poll::Ready(Some(Err(error)));
}
@@ -994,6 +994,21 @@
let payload = payload.unwrap();
message.payload = Bytes::from(payload);
message.header.payload_length = message.payload.len() as u32;
+
+ if let Some(ref user_headers) = message.user_headers {
+ let decrypted_headers = encryptor.decrypt(user_headers);
+ if let Err(error) = decrypted_headers {
+ self.poll_future = None;
+ error!(
+ "Failed to decrypt the message user headers at offset: {offset}, partition ID: {partition_id}",
+ );
+ return Poll::Ready(Some(Err(error)));
+ }
+ let decrypted_headers = decrypted_headers.unwrap();
+ message.header.user_headers_length =
+ decrypted_headers.len() as u32;
+ message.user_headers = Some(Bytes::from(decrypted_headers));
+ }
}
}
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index badd909..e17fafc 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -293,6 +293,12 @@
for message in messages {
message.payload = Bytes::from(encryptor.encrypt(&message.payload)?);
message.header.payload_length = message.payload.len() as u32;
+
+ if let Some(ref user_headers) = message.user_headers {
+ let encrypted_headers = encryptor.encrypt(user_headers)?;
+ message.header.user_headers_length = encrypted_headers.len() as u32;
+ message.user_headers = Some(Bytes::from(encrypted_headers));
+ }
}
}
Ok(())
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 62027ca..d26d3fe 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "server"
-version = "0.7.3-edge.1"
+version = "0.7.4-edge.1"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs
index a59f68d..21e3420 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -559,25 +559,46 @@
let mut position = 0;
for message in batch.iter() {
+ let mut header = message.header().to_header();
+ let offset = header.offset;
let payload = encryptor.decrypt(message.payload());
match payload {
Ok(payload) => {
// Update the header with the decrypted payload length
- let mut header = message.header().to_header();
header.payload_length = payload.len() as u32;
+ // Decrypt user headers if present
+ let decrypted_user_headers = if let Some(user_headers) =
+ message.user_headers()
+ {
+ match encryptor.decrypt(user_headers) {
+ Ok(decrypted) => {
+ header.user_headers_length = decrypted.len() as u32;
+ Some(decrypted)
+ }
+ Err(error) => {
+ error!(
+ "Cannot decrypt the message user headers at offset: {offset}. Error: {error}"
+ );
+ continue;
+ }
+ }
+ } else {
+ None
+ };
+
decrypted_messages.extend_from_slice(&header.to_bytes());
decrypted_messages.extend_from_slice(&payload);
- if let Some(user_headers) = message.user_headers() {
+ if let Some(ref user_headers) = decrypted_user_headers {
decrypted_messages.extend_from_slice(user_headers);
}
position += IGGY_MESSAGE_HEADER_SIZE
+ payload.len()
- + message.header().user_headers_length();
+ + header.user_headers_length as usize;
indexes.insert(0, position as u32, 0);
}
Err(error) => {
- error!("Cannot decrypt the message. Error: {}", error);
+ error!("Cannot decrypt the message at offset: {offset}. Error: {error}",);
continue;
}
}
@@ -604,7 +625,7 @@
for message in batch.iter() {
let header = message.header().to_header();
- let user_headers_length = header.user_headers_length;
+ let offset = header.offset;
let payload_bytes = message.payload();
let user_headers_bytes = message.user_headers();
@@ -614,18 +635,38 @@
let mut updated_header = header;
updated_header.payload_length = encrypted_payload.len() as u32;
+ // Encrypt user headers if present
+ let encrypted_user_headers = if let Some(user_headers_bytes) =
+ user_headers_bytes
+ {
+ match encryptor.encrypt(user_headers_bytes) {
+ Ok(encrypted) => {
+ updated_header.user_headers_length = encrypted.len() as u32;
+ Some(encrypted)
+ }
+ Err(error) => {
+ error!(
+ "Cannot encrypt the message user headers at offset: {offset}. Error: {error}"
+ );
+ continue;
+ }
+ }
+ } else {
+ None
+ };
+
encrypted_messages.extend_from_slice(&updated_header.to_bytes());
encrypted_messages.extend_from_slice(&encrypted_payload);
- if let Some(user_headers_bytes) = user_headers_bytes {
- encrypted_messages.extend_from_slice(user_headers_bytes);
+ if let Some(ref encrypted_user_headers) = encrypted_user_headers {
+ encrypted_messages.extend_from_slice(encrypted_user_headers);
}
position += IGGY_MESSAGE_HEADER_SIZE
+ encrypted_payload.len()
- + user_headers_length as usize;
+ + updated_header.user_headers_length as usize;
indexes.insert(0, position as u32, 0);
}
Err(error) => {
- error!("Cannot encrypt the message. Error: {}", error);
+ error!("Cannot encrypt the message at offset: {offset}. Error: {error}",);
continue;
}
}
diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs
new file mode 100644
index 0000000..0b8e98b
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text;
+using Apache.Iggy.Consumers;
+using Apache.Iggy.Encryption;
+using Apache.Iggy.Enums;
+using Apache.Iggy.Headers;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Kinds;
+using Apache.Iggy.Mappers;
+using Apache.Iggy.Messages;
+using Apache.Iggy.Publishers;
+using Apache.Iggy.Tests.Integrations.Fixtures;
+using Shouldly;
+using Partitioning = Apache.Iggy.Kinds.Partitioning;
+
+namespace Apache.Iggy.Tests.Integrations;
+
+public class HeaderEncryptionIntegrationTests
+{
+ [ClassDataSource<IggyServerFixture>(Shared = SharedType.PerAssembly)]
+ public required IggyServerFixture Fixture { get; init; }
+
+ [Test]
+ [MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
+ public async Task SendMessages_WithEncryptedHeaders_Should_NotBeReadableWithoutDecryptor(Protocol protocol)
+ {
+ var client = protocol == Protocol.Tcp
+ ? await Fixture.CreateTcpClient()
+ : await Fixture.CreateHttpClient();
+
+ var encryptor = CreateEncryptor();
+ var testStream = await CreateTestStream(client, protocol);
+ var streamId = Identifier.String(testStream.StreamId);
+ var topicId = Identifier.String(testStream.TopicId);
+
+ // Send message with encrypted headers via publisher
+ var publisher = IggyPublisherBuilder
+ .Create(client, streamId, topicId)
+ .WithPartitioning(Partitioning.PartitionId(0))
+ .WithEncryptor(encryptor)
+ .Build();
+
+ await publisher.InitAsync();
+
+ var headers = CreateTestHeaders();
+ var messages = new List<Message>
+ {
+ new(Guid.NewGuid(), Encoding.UTF8.GetBytes("encrypted payload"), headers)
+ };
+
+ await publisher.SendMessagesAsync(messages);
+ await publisher.DisposeAsync();
+
+ // Poll with a normal client (no decryptor)
+ var polled = await client.PollMessagesAsync(
+ streamId, topicId, 0,
+ Consumer.New(0),
+ PollingStrategy.Next(),
+ 1, false);
+
+ polled.Messages.Count.ShouldBe(1);
+ var msg = polled.Messages[0];
+
+ // Payload should be encrypted (not readable as plaintext)
+ Encoding.UTF8.GetString(msg.Payload).ShouldNotBe("encrypted payload");
+
+ // RawUserHeaders should contain encrypted bytes (both TCP and HTTP)
+ msg.RawUserHeaders.ShouldNotBeNull();
+ msg.RawUserHeaders!.Length.ShouldBeGreaterThan(0);
+
+ // Manually decrypt and verify payload
+ var decryptedPayload = encryptor.Decrypt(msg.Payload);
+ Encoding.UTF8.GetString(decryptedPayload).ShouldBe("encrypted payload");
+
+ // Manually decrypt and verify headers
+ var decryptedHeaderBytesResult = encryptor.Decrypt(msg.RawUserHeaders);
+ var decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytesResult);
+ decryptedHeaders.Count.ShouldBe(3);
+
+ var typeHeader = decryptedHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() }];
+ Encoding.UTF8.GetString(typeHeader.Value).ShouldBe("test-message");
+ }
+
+ [Test]
+ [MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
+ public async Task ReceiveAsync_WithDecryptor_Should_DecryptHeadersCorrectly(Protocol protocol)
+ {
+ var client = protocol == Protocol.Tcp
+ ? await Fixture.CreateTcpClient()
+ : await Fixture.CreateHttpClient();
+
+ var encryptor = CreateEncryptor();
+ var testStream = await CreateTestStream(client, protocol);
+ var streamId = Identifier.String(testStream.StreamId);
+ var topicId = Identifier.String(testStream.TopicId);
+
+ // Send encrypted message via publisher
+ var publisher = IggyPublisherBuilder
+ .Create(client, streamId, topicId)
+ .WithPartitioning(Partitioning.PartitionId(0))
+ .WithEncryptor(encryptor)
+ .Build();
+
+ await publisher.InitAsync();
+
+ var headers = CreateTestHeaders();
+ var messages = new List<Message>
+ {
+ new(Guid.NewGuid(), Encoding.UTF8.GetBytes("consumer test payload"), headers)
+ };
+
+ await publisher.SendMessagesAsync(messages);
+ await publisher.DisposeAsync();
+
+ // Poll with IggyConsumer that has decryptor configured
+ var consumer = IggyConsumerBuilder
+ .Create(client, streamId, topicId, Consumer.New(1))
+ .WithPollingStrategy(PollingStrategy.Next())
+ .WithBatchSize(1)
+ .WithPartitionId(0)
+ .WithAutoCommitMode(AutoCommitMode.Disabled)
+ .WithDecryptor(encryptor)
+ .Build();
+
+ await consumer.InitAsync();
+
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ ReceivedMessage? received = null;
+
+ await foreach (var msg in consumer.ReceiveAsync(cts.Token))
+ {
+ received = msg;
+ break;
+ }
+
+ await consumer.DisposeAsync();
+
+ // Message should be successfully decrypted
+ received.ShouldNotBeNull();
+ received!.Status.ShouldBe(MessageStatus.Success);
+
+ // Payload should be decrypted
+ Encoding.UTF8.GetString(received.Message.Payload).ShouldBe("consumer test payload");
+
+ // Headers should be decrypted and parsed
+ received.Message.UserHeaders.ShouldNotBeNull();
+ received.Message.UserHeaders!.Count.ShouldBe(3);
+
+ var batchHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "batch"u8.ToArray() }];
+ BitConverter.ToUInt64(batchHeader.Value).ShouldBe(1UL);
+
+ var typeHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() }];
+ Encoding.UTF8.GetString(typeHeader.Value).ShouldBe("test-message");
+
+ var encHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "encrypted"u8.ToArray() }];
+ encHeader.Value[0].ShouldBe((byte)1);
+ }
+
+ private static AesMessageEncryptor CreateEncryptor()
+ {
+ return new AesMessageEncryptor(AesMessageEncryptor.GenerateKey());
+ }
+
+ private static Dictionary<HeaderKey, HeaderValue> CreateTestHeaders()
+ {
+ return new Dictionary<HeaderKey, HeaderValue>
+ {
+ {
+ new HeaderKey { Kind = HeaderKind.String, Value = "batch"u8.ToArray() },
+ new HeaderValue { Kind = HeaderKind.Uint64, Value = BitConverter.GetBytes(1UL) }
+ },
+ {
+ new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() },
+ new HeaderValue { Kind = HeaderKind.String, Value = "test-message"u8.ToArray() }
+ },
+ {
+ new HeaderKey { Kind = HeaderKind.String, Value = "encrypted"u8.ToArray() },
+ new HeaderValue { Kind = HeaderKind.Bool, Value = [1] }
+ },
+ };
+ }
+
+ private async Task<TestStreamInfo> CreateTestStream(IIggyClient client, Protocol protocol)
+ {
+ var streamId = $"enc_stream_{Guid.NewGuid()}_{protocol.ToString().ToLowerInvariant()}";
+ var topicId = "enc_topic";
+
+ await client.CreateStreamAsync(streamId);
+ await client.CreateTopicAsync(Identifier.String(streamId), topicId, 1);
+
+ return new TestStreamInfo(streamId, topicId);
+ }
+
+ private record TestStreamInfo(string StreamId, string TopicId);
+}
diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
index 2c5a1b8..76264f9 100644
--- a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
+++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
@@ -21,8 +21,10 @@
using Apache.Iggy.Contracts;
using Apache.Iggy.Enums;
using Apache.Iggy.Exceptions;
+using Apache.Iggy.Headers;
using Apache.Iggy.IggyClient;
using Apache.Iggy.Kinds;
+using Apache.Iggy.Mappers;
using Apache.Iggy.Utils;
using Microsoft.Extensions.Logging;
@@ -380,11 +382,19 @@
try
{
var decryptedPayload = _config.MessageEncryptor.Decrypt(message.Payload);
+
+ Dictionary<HeaderKey, HeaderValue>? decryptedHeaders = null;
+ if (message.RawUserHeaders is { Length: > 0 })
+ {
+ var decryptedHeaderBytes = _config.MessageEncryptor.Decrypt(message.RawUserHeaders);
+ decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytes);
+ }
+
processedMessage = new MessageResponse
{
Header = message.Header,
Payload = decryptedPayload,
- UserHeaders = message.UserHeaders
+ UserHeaders = decryptedHeaders
};
}
catch (Exception ex)
diff --git a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
index f39ddd8..9a5eb55 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
@@ -25,6 +25,7 @@
/// <summary>
/// Response from the server containing a message payload.
/// </summary>
+[JsonConverter(typeof(MessageResponseConverter))]
public sealed class MessageResponse
{
/// <summary>
@@ -40,7 +41,12 @@
/// <summary>
/// Headers defined by the user.
/// </summary>
- [JsonPropertyName("user_headers")]
- [JsonConverter(typeof(UserHeadersConverter))]
- public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; init; }
+ public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; }
+
+ /// <summary>
+ /// Raw user header bytes before deserialization.
+ /// Used internally for decrypting encrypted headers.
+ /// </summary>
+ [JsonIgnore]
+ internal byte[]? RawUserHeaders { get; set; }
}
diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
index 3501117..2e2c939 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
@@ -410,7 +410,7 @@
var msgSize = 0;
foreach (var message in messages)
{
- var headersBytes = GetHeadersBytes(message.UserHeaders);
+ var headersBytes = message.RawUserHeaders ?? GetHeadersBytes(message.UserHeaders);
BinaryPrimitives.WriteUInt64LittleEndian(bytes[position..(position + 8)], message.Header.Checksum);
BinaryPrimitives.WriteUInt128LittleEndian(bytes[(position + 8)..(position + 24)], message.Header.Id);
BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 24)..(position + 32)], message.Header.Offset);
@@ -548,7 +548,7 @@
// return bytes;
// }
- private static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? headers)
+ internal static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? headers)
{
if (headers == null)
{
diff --git a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
index fb31d48..9232422 100644
--- a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
+++ b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
@@ -7,7 +7,7 @@
<TargetFrameworks>net8.0;net10.0</TargetFrameworks>
<AssemblyName>Apache.Iggy</AssemblyName>
<RootNamespace>Apache.Iggy</RootNamespace>
- <Version>0.7.1-edge.3</Version>
+ <Version>0.7.2-edge.1</Version>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
diff --git a/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs b/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs
index ad8d721..f319aa3 100644
--- a/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs
+++ b/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs
@@ -35,7 +35,14 @@
WriteMessageId(writer, value.Header.Id);
WritePayload(writer, value.Payload);
- WriteHeaders(writer, value.UserHeaders);
+ if (value.RawUserHeaders is not null)
+ {
+ writer.WriteBase64String("user_headers", value.RawUserHeaders);
+ }
+ else
+ {
+ WriteHeaders(writer, value.UserHeaders);
+ }
writer.WriteEndObject();
}
diff --git a/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs b/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs
new file mode 100644
index 0000000..9cb9821
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Headers;
+using Apache.Iggy.Messages;
+
+namespace Apache.Iggy.JsonConverters;
+
+internal sealed class MessageResponseConverter : JsonConverter<MessageResponse>
+{
+ private static readonly UserHeadersConverter HeadersConverter = new();
+
+ public override MessageResponse Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ if (reader.TokenType != JsonTokenType.StartObject)
+ {
+ throw new JsonException("Expected start of object for MessageResponse.");
+ }
+
+ MessageHeader? header = null;
+ byte[]? payload = null;
+ Dictionary<HeaderKey, HeaderValue>? userHeaders = null;
+ byte[]? rawUserHeaders = null;
+
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndObject)
+ {
+ break;
+ }
+
+ if (reader.TokenType != JsonTokenType.PropertyName)
+ {
+ throw new JsonException("Expected property name.");
+ }
+
+ var propertyName = reader.GetString();
+ reader.Read();
+
+ switch (propertyName)
+ {
+ case "header":
+ header = JsonSerializer.Deserialize<MessageHeader>(ref reader, options);
+ break;
+ case "payload":
+ payload = reader.GetBytesFromBase64();
+ break;
+ case "user_headers":
+ if (reader.TokenType == JsonTokenType.String)
+ {
+ rawUserHeaders = reader.GetBytesFromBase64();
+ }
+ else if (reader.TokenType == JsonTokenType.Null)
+ {
+ userHeaders = null;
+ }
+ else
+ {
+ userHeaders = HeadersConverter.Read(ref reader, typeof(Dictionary<HeaderKey, HeaderValue>), options);
+ }
+ break;
+ default:
+ reader.Skip();
+ break;
+ }
+ }
+
+ return new MessageResponse
+ {
+ Header = header ?? throw new JsonException("Missing 'header' field."),
+ Payload = payload ?? throw new JsonException("Missing 'payload' field."),
+ UserHeaders = userHeaders,
+ RawUserHeaders = rawUserHeaders
+ };
+ }
+
+ public override void Write(Utf8JsonWriter writer, MessageResponse value, JsonSerializerOptions options)
+ {
+ JsonSerializer.Serialize(writer, value, options);
+ }
+}
diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
index 6c3435d..cb530a5 100644
--- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
+++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
@@ -336,8 +336,7 @@
};
}
- internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload,
- Func<byte[], byte[]>? decryptor = null)
+ internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload)
{
var length = payload.Length;
var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[..4]);
@@ -362,13 +361,22 @@
var payloadLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 52)..(position + 56)]);
var reserved = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 56)..(position + 64)]);
- Dictionary<HeaderKey, HeaderValue>? headers = headersLength switch
+ var wireHeadersLength = headersLength;
+ byte[]? rawUserHeaders = null;
+ Dictionary<HeaderKey, HeaderValue>? headers;
+ if (headersLength == 0)
{
- 0 => null,
- > 0 => MapHeaders(
- payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)]),
- < 0 => throw new ArgumentOutOfRangeException()
- };
+ headers = null;
+ }
+ else if (headersLength < 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+ else
+ {
+ rawUserHeaders = payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)].ToArray();
+ headers = TryMapHeaders(rawUserHeaders);
+ }
var payloadRangeStart = position + 64;
var payloadRangeEnd = position + 64 + payloadLength;
@@ -399,9 +407,8 @@
Reserved = reserved
},
UserHeaders = headers,
- Payload = decryptor is not null
- ? decryptor(messagePayload[..payloadSliceLen])
- : messagePayload[..payloadSliceLen]
+ RawUserHeaders = rawUserHeaders,
+ Payload = messagePayload[..payloadSliceLen]
});
}
finally
@@ -409,7 +416,7 @@
ArrayPool<byte>.Shared.Return(messagePayload);
}
- position += 64 + payloadLength + headersLength;
+ position += 64 + payloadLength + wireHeadersLength;
if (position + PropertiesSize >= length)
{
break;
@@ -424,14 +431,14 @@
};
}
- private static Dictionary<HeaderKey, HeaderValue> MapHeaders(ReadOnlySpan<byte> payload)
+ internal static Dictionary<HeaderKey, HeaderValue> MapHeaders(ReadOnlySpan<byte> payload)
{
var headers = new Dictionary<HeaderKey, HeaderValue>();
var position = 0;
while (position < payload.Length)
{
- var keyKind = MapHeaderKind(payload, position);
+ var keyKind = MapHeaderKind(payload[position]);
position++;
var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
@@ -444,7 +451,7 @@
var keyValue = payload[position..(position + keyLength)].ToArray();
position += keyLength;
- var valueKind = MapHeaderKind(payload, position);
+ var valueKind = MapHeaderKind(payload[position]);
position++;
var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
@@ -464,9 +471,62 @@
return headers;
}
- private static HeaderKind MapHeaderKind(ReadOnlySpan<byte> payload, int position)
+ internal static Dictionary<HeaderKey, HeaderValue>? TryMapHeaders(ReadOnlySpan<byte> payload)
{
- var headerKind = payload[position] switch
+ if (payload.Length == 0 || payload[0] is 0 or > 15)
+ {
+ return null;
+ }
+
+ var headers = new Dictionary<HeaderKey, HeaderValue>();
+ var position = 0;
+
+ while (position < payload.Length)
+ {
+ if (!TryMapHeaderKind(payload[position], out var keyKind))
+ return null;
+ position++;
+
+ if (position + 4 > payload.Length)
+ return null;
+ var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
+ if (keyLength is <= 0 or > 255)
+ return null;
+
+ position += 4;
+ if (position + keyLength > payload.Length)
+ return null;
+ var keyValue = payload[position..(position + keyLength)].ToArray();
+ position += keyLength;
+
+ if (position >= payload.Length)
+ return null;
+ if (!TryMapHeaderKind(payload[position], out var valueKind))
+ return null;
+ position++;
+
+ if (position + 4 > payload.Length)
+ return null;
+ var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
+ if (valueLength is <= 0 or > 255)
+ return null;
+
+ position += 4;
+ if (position + valueLength > payload.Length)
+ return null;
+ ReadOnlySpan<byte> value = payload[position..(position + valueLength)];
+ position += valueLength;
+
+ headers[new HeaderKey { Kind = keyKind, Value = keyValue }] =
+ new HeaderValue { Kind = valueKind, Value = value.ToArray() };
+ }
+
+ return headers;
+ }
+
+ private static HeaderKind MapHeaderKind(byte value)
+ {
+ return value switch
{
1 => HeaderKind.Raw,
2 => HeaderKind.String,
@@ -483,9 +543,19 @@
13 => HeaderKind.Uint128,
14 => HeaderKind.Float,
15 => HeaderKind.Double,
- _ => throw new ArgumentOutOfRangeException()
+ _ => throw new ArgumentOutOfRangeException(nameof(value), value, null)
};
- return headerKind;
+ }
+
+ private static bool TryMapHeaderKind(byte value, out HeaderKind kind)
+ {
+ if (value is >= 1 and <= 15)
+ {
+ kind = MapHeaderKind(value);
+ return true;
+ }
+ kind = default;
+ return false;
}
internal static IReadOnlyList<StreamResponse> MapStreams(ReadOnlySpan<byte> payload)
diff --git a/foreign/csharp/Iggy_SDK/Messages/Message.cs b/foreign/csharp/Iggy_SDK/Messages/Message.cs
index 1a0e7e6..dc3c7dd 100644
--- a/foreign/csharp/Iggy_SDK/Messages/Message.cs
+++ b/foreign/csharp/Iggy_SDK/Messages/Message.cs
@@ -46,6 +46,12 @@
public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; }
/// <summary>
+ /// Pre-serialized (possibly encrypted) user headers bytes.
+ /// When set, this takes precedence over <see cref="UserHeaders"/> during serialization.
+ /// </summary>
+ internal byte[]? RawUserHeaders { get; set; }
+
+ /// <summary>
/// Default constructor.
/// </summary>
public Message()
diff --git a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
index 1b95c9d..b850916 100644
--- a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
+++ b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
@@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+using Apache.Iggy.Contracts.Tcp;
using Apache.Iggy.Enums;
using Apache.Iggy.Exceptions;
+using Apache.Iggy.Headers;
using Apache.Iggy.IggyClient;
using Apache.Iggy.Messages;
using Microsoft.Extensions.Logging;
@@ -310,7 +312,7 @@
/// <summary>
/// Encrypts all messages in the list using the configured message encryptor, if available.
- /// Updates the payload length in the message header after encryption.
+ /// Updates the payload and user headers lengths in the message header after encryption.
/// </summary>
/// <param name="messages">The messages to encrypt.</param>
private void EncryptMessages(IList<Message> messages)
@@ -324,6 +326,15 @@
{
message.Payload = _config.MessageEncryptor.Encrypt(message.Payload);
message.Header.PayloadLength = message.Payload.Length;
+
+ if (message.UserHeaders is { Count: > 0 })
+ {
+ var headerBytes = TcpContracts.GetHeadersBytes(message.UserHeaders);
+ var encryptedHeaderBytes = _config.MessageEncryptor.Encrypt(headerBytes);
+ message.RawUserHeaders = encryptedHeaderBytes;
+ message.UserHeaders = null;
+ message.Header.UserHeadersLength = encryptedHeaderBytes.Length;
+ }
}
}
}
diff --git a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
index c99337b..626442a 100644
--- a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
+++ b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
@@ -49,6 +49,12 @@
foreach (var message in messages)
{
bytesCount += 16 + 64 + message.Payload.Length;
+ if (message.RawUserHeaders is not null)
+ {
+ bytesCount += message.RawUserHeaders.Length;
+ continue;
+ }
+
if (message.UserHeaders is null)
{
continue;
diff --git a/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs
new file mode 100644
index 0000000..53758ca
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Contracts.Tcp;
+using Apache.Iggy.Encryption;
+using Apache.Iggy.Headers;
+namespace Apache.Iggy.Tests.MapperTests;
+
+public class HeaderEncryptionTests
+{
+ [Fact]
+ public void Headers_should_survive_encrypt_decrypt_roundtrip()
+ {
+ var key = Encryption.AesMessageEncryptor.GenerateKey();
+ var encryptor = new Encryption.AesMessageEncryptor(key);
+
+ var originalHeaders = new Dictionary<Headers.HeaderKey, Headers.HeaderValue>
+ {
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "batch"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Uint64, Value = BitConverter.GetBytes(1UL) } },
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "type"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.String, Value = "test-message"u8.ToArray() } },
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "encrypted"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Bool, Value = [1] } },
+ };
+
+ var headerBytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(originalHeaders);
+ Assert.NotEmpty(headerBytes);
+
+ var encrypted = encryptor.Encrypt(headerBytes);
+ Assert.NotEqual(headerBytes, encrypted);
+ Assert.True(encrypted.Length > headerBytes.Length);
+
+ // Encrypted bytes must not parse as valid headers
+ var parsed = Mappers.BinaryMapper.TryMapHeaders(encrypted);
+ Assert.Null(parsed);
+
+ var decrypted = encryptor.Decrypt(encrypted);
+ Assert.Equal(headerBytes, decrypted);
+
+ var roundTripped = Mappers.BinaryMapper.MapHeaders(decrypted);
+ Assert.Equal(originalHeaders.Count, roundTripped.Count);
+
+ foreach (var (key2, value) in originalHeaders)
+ {
+ Assert.True(roundTripped.ContainsKey(key2));
+ Assert.Equal(value.Kind, roundTripped[key2].Kind);
+ Assert.Equal(value.Value, roundTripped[key2].Value);
+ }
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_invalid_first_byte()
+ {
+ var random = new byte[64];
+ Random.Shared.NextBytes(random);
+ random[0] = 0;
+
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(random));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_first_byte_above_range()
+ {
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 16, 0, 0, 0 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_empty_payload()
+ {
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders([]));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_truncated_key_length()
+ {
+ // Valid header kind byte but not enough bytes for key length
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_zero_key_length()
+ {
+ // Valid kind, then key length = 0 (invalid)
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 0, 0, 0, 0 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_key_length_exceeding_payload()
+ {
+ // Valid kind, key length = 100 but only a few bytes remain
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 100, 0, 0, 0, 1, 2 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_truncated_value_kind()
+ {
+ // Valid kind(2=String), key_len=1, key='A', then no value kind byte
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_invalid_value_kind()
+ {
+ // Valid kind(2), key_len=1, key='A', invalid value kind=0
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 0 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_truncated_value_length()
+ {
+ // Valid kind(2), key_len=1, key='A', valid value kind(2), then not enough for value length
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 2, 1 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_zero_value_length()
+ {
+ // Valid kind(2), key_len=1, key='A', valid value kind(2), value_len=0 (invalid)
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 2, 0, 0, 0, 0 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_null_on_value_length_exceeding_payload()
+ {
+ // Valid kind(2), key_len=1, key='A', valid value kind(2), value_len=100 but not enough bytes
+ Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 2, 100, 0, 0, 0 }));
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_valid_headers_on_plaintext()
+ {
+ var headers = new Dictionary<Headers.HeaderKey, Headers.HeaderValue>
+ {
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "key"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.String, Value = "value"u8.ToArray() } },
+ };
+
+ var bytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(headers);
+ var result = Mappers.BinaryMapper.TryMapHeaders(bytes);
+
+ Assert.NotNull(result);
+ Assert.Single(result);
+ }
+
+ [Fact]
+ public void TryMapHeaders_returns_valid_for_all_header_kinds()
+ {
+ var headers = new Dictionary<Headers.HeaderKey, Headers.HeaderValue>
+ {
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.Raw, Value = "k1"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Raw, Value = [0xFF] } },
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k2"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Bool, Value = [1] } },
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k3"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Int32, Value = BitConverter.GetBytes(42) } },
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k4"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Uint64, Value = BitConverter.GetBytes(99UL) } },
+ { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k5"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Double, Value = BitConverter.GetBytes(3.14) } },
+ };
+
+ var bytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(headers);
+ var result = Mappers.BinaryMapper.TryMapHeaders(bytes);
+
+ Assert.NotNull(result);
+ Assert.Equal(5, result.Count);
+ }
+}
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 9ce0d52..2e6e385 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "apache-iggy"
-version = "0.7.4-dev1"
+version = "0.7.5-dev1"
edition = "2024"
authors = ["Iggy Committers <dev@iggy.apache.org>"]
license = "Apache-2.0"
@@ -28,7 +28,7 @@
[dependencies]
bytes = "1.11.1"
futures = "0.3.32"
-iggy = { path = "../../core/sdk", version = "0.9.4-edge.1" }
+iggy = { path = "../../core/sdk", version = "0.9.5-edge.1" }
pyo3 = "0.28.2"
pyo3-async-runtimes = { version = "0.28.0", features = [
"attributes",
diff --git a/foreign/python/pyproject.toml b/foreign/python/pyproject.toml
index ba57ad4..0122961 100644
--- a/foreign/python/pyproject.toml
+++ b/foreign/python/pyproject.toml
@@ -22,7 +22,7 @@
[project]
name = "apache-iggy"
requires-python = ">=3.10"
-version = "0.7.4.dev1"
+version = "0.7.5.dev1"
description = "Apache Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
readme = "README.md"
license = { file = "LICENSE" }