feat(security): encrypt user headers alongside message payload (#3040)

Server-side encryption protected message payloads and state
log entries but left user-defined headers in plaintext on
disk. Client-side encryption in the Rust and C# SDKs had
the same gap, encrypting only the payload.

Both server and all SDKs with client-side encryption now
encrypt and decrypt user headers using the same encryptor
as the payload. The integration test verifies no plaintext
headers appear on disk when encryption is enabled, and a
new unit test covers the client-side round-trip. The C#
SDK adds TryMapHeaders for graceful handling of encrypted
header bytes during deserialization.

BREAKING
diff --git a/Cargo.lock b/Cargo.lock
index d10cb6e..f8b5a94 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5183,7 +5183,7 @@
 
 [[package]]
 name = "iggy"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 dependencies = [
  "async-broadcast",
  "async-dropper",
@@ -5271,7 +5271,7 @@
 
 [[package]]
 name = "iggy-cli"
-version = "0.11.3-edge.1"
+version = "0.12.1-edge.1"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
@@ -5301,7 +5301,7 @@
 
 [[package]]
 name = "iggy-connectors"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "axum",
@@ -5353,7 +5353,7 @@
 
 [[package]]
 name = "iggy-mcp"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "axum",
  "axum-server",
@@ -5385,7 +5385,7 @@
 
 [[package]]
 name = "iggy_binary_protocol"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 dependencies = [
  "bytemuck",
  "bytes",
@@ -5395,7 +5395,7 @@
 
 [[package]]
 name = "iggy_common"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 dependencies = [
  "aes-gcm",
  "ahash 0.8.12",
@@ -5441,7 +5441,7 @@
 
 [[package]]
 name = "iggy_connector_elasticsearch_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "base64 0.22.1",
@@ -5460,7 +5460,7 @@
 
 [[package]]
 name = "iggy_connector_elasticsearch_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5479,7 +5479,7 @@
 
 [[package]]
 name = "iggy_connector_iceberg_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "arrow-json",
  "async-trait",
@@ -5499,7 +5499,7 @@
 
 [[package]]
 name = "iggy_connector_mongodb_sink"
-version = "0.3.0"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "humantime",
@@ -5515,7 +5515,7 @@
 
 [[package]]
 name = "iggy_connector_postgres_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5535,7 +5535,7 @@
 
 [[package]]
 name = "iggy_connector_postgres_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "base64 0.22.1",
@@ -5557,7 +5557,7 @@
 
 [[package]]
 name = "iggy_connector_quickwit_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5572,7 +5572,7 @@
 
 [[package]]
 name = "iggy_connector_random_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -5590,7 +5590,7 @@
 
 [[package]]
 name = "iggy_connector_sdk"
-version = "0.2.1-edge.1"
+version = "0.2.2-edge.1"
 dependencies = [
  "async-trait",
  "base64 0.22.1",
@@ -5620,7 +5620,7 @@
 
 [[package]]
 name = "iggy_connector_stdout_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 dependencies = [
  "async-trait",
  "dashmap",
@@ -9723,7 +9723,7 @@
 
 [[package]]
 name = "server"
-version = "0.7.3-edge.1"
+version = "0.7.4-edge.1"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
diff --git a/Cargo.toml b/Cargo.toml
index 5918468..ed714d5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -160,11 +160,11 @@
 iceberg = "0.9.0"
 iceberg-catalog-rest = "0.9.0"
 iceberg-storage-opendal = "0.9.0"
-iggy = { path = "core/sdk", version = "0.9.4-edge.1" }
-iggy-cli = { path = "core/cli", version = "0.11.3-edge.1" }
-iggy_binary_protocol = { path = "core/binary_protocol", version = "0.9.4-edge.1" }
-iggy_common = { path = "core/common", version = "0.9.4-edge.1" }
-iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.1-edge.1" }
+iggy = { path = "core/sdk", version = "0.9.5-edge.1" }
+iggy-cli = { path = "core/cli", version = "0.12.1-edge.1" }
+iggy_binary_protocol = { path = "core/binary_protocol", version = "0.9.5-edge.1" }
+iggy_common = { path = "core/common", version = "0.9.5-edge.1" }
+iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.2.2-edge.1" }
 integration = { path = "core/integration" }
 journal = { path = "core/journal" }
 js-sys = "0.3"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index b6100cd..cea57b4 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -451,24 +451,24 @@
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
 if_chain: 1.0.3, "Apache-2.0 OR MIT",
-iggy: 0.9.4-edge.1, "Apache-2.0",
+iggy: 0.9.5-edge.1, "Apache-2.0",
 iggy-bench: 0.4.1-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.6.3-edge.1, "Apache-2.0",
-iggy-cli: 0.11.3-edge.1, "Apache-2.0",
-iggy-connectors: 0.3.2-edge.1, "Apache-2.0",
-iggy-mcp: 0.3.2-edge.1, "Apache-2.0",
-iggy_binary_protocol: 0.9.4-edge.1, "Apache-2.0",
-iggy_common: 0.9.4-edge.1, "Apache-2.0",
-iggy_connector_elasticsearch_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_elasticsearch_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_iceberg_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0",
-iggy_connector_postgres_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_postgres_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_quickwit_sink: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_random_source: 0.3.2-edge.1, "Apache-2.0",
-iggy_connector_sdk: 0.2.1-edge.1, "Apache-2.0",
-iggy_connector_stdout_sink: 0.3.2-edge.1, "Apache-2.0",
+iggy-cli: 0.12.1-edge.1, "Apache-2.0",
+iggy-connectors: 0.3.3-edge.1, "Apache-2.0",
+iggy-mcp: 0.3.3-edge.1, "Apache-2.0",
+iggy_binary_protocol: 0.9.5-edge.1, "Apache-2.0",
+iggy_common: 0.9.5-edge.1, "Apache-2.0",
+iggy_connector_elasticsearch_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_elasticsearch_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_iceberg_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_mongodb_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_postgres_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_postgres_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_quickwit_sink: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_random_source: 0.3.3-edge.1, "Apache-2.0",
+iggy_connector_sdk: 0.2.2-edge.1, "Apache-2.0",
+iggy_connector_stdout_sink: 0.3.3-edge.1, "Apache-2.0",
 iggy_examples: 0.0.6, "Apache-2.0",
 ignore: 0.4.25, "MIT OR Unlicense",
 image: 0.25.10, "Apache-2.0 OR MIT",
@@ -843,7 +843,7 @@
 serde_yaml_ng: 0.10.0, "MIT",
 serial_test: 3.4.0, "MIT",
 serial_test_derive: 3.4.0, "MIT",
-server: 0.7.3-edge.1, "Apache-2.0",
+server: 0.7.4-edge.1, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
 sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 071e8ae..e013d59 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-mcp"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "MCP Server for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index b479ecf..51f3109 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_binary_protocol"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 description = "Wire protocol types and codec for the Iggy binary protocol. Shared between server and SDK."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml
index ea3c1fc..85694e3 100644
--- a/core/cli/Cargo.toml
+++ b/core/cli/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-cli"
-version = "0.11.3-edge.1"
+version = "0.12.1-edge.1"
 edition = "2024"
 authors = ["bartosz.ciesla@gmail.com"]
 repository = "https://github.com/apache/iggy"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 56e9974..a2fd8ae 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 [package]
 name = "iggy_common"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/common/src/commands/messages/send_messages.rs b/core/common/src/commands/messages/send_messages.rs
index 91a6aa4..2943340 100644
--- a/core/common/src/commands/messages/send_messages.rs
+++ b/core/common/src/commands/messages/send_messages.rs
@@ -203,12 +203,19 @@
                 let payload_base64 = BASE64.encode(msg_view.payload());
                 map.insert("payload", serde_json::to_value(payload_base64).unwrap());
 
-                if let Ok(Some(headers)) = msg_view.user_headers_map() {
-                    let entries: Vec<HeaderEntry> = headers
-                        .into_iter()
-                        .map(|(k, v)| HeaderEntry { key: k, value: v })
-                        .collect();
-                    map.insert("user_headers", serde_json::to_value(&entries).unwrap());
+                match msg_view.user_headers_map() {
+                    Ok(Some(headers)) => {
+                        let entries: Vec<HeaderEntry> = headers
+                            .into_iter()
+                            .map(|(k, v)| HeaderEntry { key: k, value: v })
+                            .collect();
+                        map.insert("user_headers", serde_json::to_value(&entries).unwrap());
+                    }
+                    _ if msg_view.user_headers().is_some() => {
+                        let raw_base64 = BASE64.encode(msg_view.user_headers().unwrap());
+                        map.insert("user_headers", serde_json::to_value(raw_base64).unwrap());
+                    }
+                    _ => {}
                 }
 
                 map
@@ -310,29 +317,38 @@
                                     .decode(payload)
                                     .map_err(|_| de::Error::custom("Invalid base64 payload"))?;
 
-                                let headers_map = if let Some(headers) = msg.get("user_headers") {
-                                    if headers.is_null() {
-                                        None
-                                    } else {
-                                        let entries: Vec<HeaderEntry> = serde_json::from_value(
-                                            headers.clone(),
-                                        )
-                                        .map_err(|e| {
-                                            de::Error::custom(format!(
-                                                "Invalid headers format: {e}"
-                                            ))
-                                        })?;
-                                        let mut map = HashMap::new();
-                                        for entry in entries {
-                                            map.insert(entry.key, entry.value);
+                                let (headers_map, raw_headers) =
+                                    if let Some(headers) = msg.get("user_headers") {
+                                        if headers.is_null() {
+                                            (None, None)
+                                        } else if let Some(base64_str) = headers.as_str() {
+                                            // Raw base64-encoded header bytes (e.g. client-side encrypted)
+                                            let raw = BASE64.decode(base64_str).map_err(|e| {
+                                                de::Error::custom(format!(
+                                                    "Invalid base64 headers: {e}"
+                                                ))
+                                            })?;
+                                            (None, Some(Bytes::from(raw)))
+                                        } else {
+                                            let entries: Vec<HeaderEntry> = serde_json::from_value(
+                                                headers.clone(),
+                                            )
+                                            .map_err(|e| {
+                                                de::Error::custom(format!(
+                                                    "Invalid headers format: {e}"
+                                                ))
+                                            })?;
+                                            let mut map = HashMap::new();
+                                            for entry in entries {
+                                                map.insert(entry.key, entry.value);
+                                            }
+                                            (Some(map), None)
                                         }
-                                        Some(map)
-                                    }
-                                } else {
-                                    None
-                                };
+                                    } else {
+                                        (None, None)
+                                    };
 
-                                let iggy_message = if let Some(headers) = headers_map {
+                                let mut iggy_message = if let Some(headers) = headers_map {
                                     IggyMessage::builder()
                                         .id(id)
                                         .payload(payload_bytes.into())
@@ -355,6 +371,11 @@
                                         })?
                                 };
 
+                                if let Some(raw) = raw_headers {
+                                    iggy_message.header.user_headers_length = raw.len() as u32;
+                                    iggy_message.user_headers = Some(raw);
+                                }
+
                                 iggy_messages.push(iggy_message);
                             }
 
diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs
index 12a2c1a..4fc214d 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -552,19 +552,25 @@
         let base64_payload = STANDARD.encode(&self.payload);
         state.serialize_field("payload", &base64_payload)?;
 
-        let entries: Vec<HeaderEntry> = if self.user_headers.is_some() {
-            let headers_map = self.user_headers_map().map_err(serde::ser::Error::custom)?;
-            if let Some(map) = headers_map {
-                map.into_iter()
-                    .map(|(key, value)| HeaderEntry { key, value })
-                    .collect()
-            } else {
-                Vec::new()
+        match self.user_headers.as_ref() {
+            Some(raw_bytes) => match self.user_headers_map() {
+                Ok(Some(map)) => {
+                    let entries: Vec<HeaderEntry> = map
+                        .into_iter()
+                        .map(|(key, value)| HeaderEntry { key, value })
+                        .collect();
+                    state.serialize_field("user_headers", &entries)?;
+                }
+                _ => {
+                    let base64_headers = STANDARD.encode(raw_bytes);
+                    state.serialize_field("user_headers", &base64_headers)?;
+                }
+            },
+            None => {
+                let empty: Vec<HeaderEntry> = Vec::new();
+                state.serialize_field("user_headers", &empty)?;
             }
-        } else {
-            Vec::new()
-        };
-        state.serialize_field("user_headers", &entries)?;
+        }
 
         state.end()
     }
@@ -595,6 +601,7 @@
                 let mut header: Option<IggyMessageHeader> = None;
                 let mut payload: Option<Bytes> = None;
                 let mut user_headers: Option<HashMap<HeaderKey, HeaderValue>> = None;
+                let mut raw_user_headers: Option<Bytes> = None;
 
                 while let Some(key) = map.next_key::<String>()? {
                     match key.as_str() {
@@ -610,12 +617,28 @@
                             payload = Some(Bytes::from(decoded));
                         }
                         "user_headers" => {
-                            let entries: Vec<HeaderEntry> = map.next_value()?;
-                            let mut headers_map = HashMap::new();
-                            for entry in entries {
-                                headers_map.insert(entry.key, entry.value);
+                            // Try as array of HeaderEntry first, fall back to base64 string
+                            let value: serde_json::Value = map.next_value()?;
+                            if let Some(base64_str) = value.as_str() {
+                                use base64::{Engine as _, engine::general_purpose::STANDARD};
+                                let decoded =
+                                    STANDARD.decode(base64_str.as_bytes()).map_err(|e| {
+                                        de::Error::custom(format!(
+                                            "Failed to decode base64 headers: {e}"
+                                        ))
+                                    })?;
+                                raw_user_headers = Some(Bytes::from(decoded));
+                            } else if value.is_array() {
+                                let entries: Vec<HeaderEntry> = serde_json::from_value(value)
+                                    .map_err(|e| {
+                                        de::Error::custom(format!("Invalid headers format: {e}"))
+                                    })?;
+                                let mut headers_map = HashMap::new();
+                                for entry in entries {
+                                    headers_map.insert(entry.key, entry.value);
+                                }
+                                user_headers = Some(headers_map);
                             }
-                            user_headers = Some(headers_map);
                         }
                         _ => {
                             let _ = map.next_value::<de::IgnoredAny>()?;
@@ -626,7 +649,11 @@
                 let header = header.ok_or_else(|| de::Error::missing_field("header"))?;
                 let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
 
-                let user_headers_bytes = user_headers.map(|headers| headers.to_bytes());
+                let user_headers_bytes = if let Some(raw) = raw_user_headers {
+                    Some(raw)
+                } else {
+                    user_headers.map(|headers| headers.to_bytes())
+                };
 
                 let user_headers_length = user_headers_bytes
                     .as_ref()
diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index fca62dd..24608cb 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -95,6 +95,7 @@
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::{HeaderKey, HeaderValue, IggyMessage};
 
     #[test]
     fn given_the_same_key_data_should_be_encrypted_and_decrypted_correctly() {
@@ -125,4 +126,73 @@
         let error = decrypted_data.err().unwrap();
         assert_eq!(error.as_code(), IggyError::CannotDecryptData.as_code());
     }
+
+    #[test]
+    fn message_payload_and_headers_should_encrypt_and_decrypt_correctly() {
+        use bytes::Bytes;
+        use std::collections::HashMap;
+
+        let key = [1; 32];
+        let encryptor = Aes256GcmEncryptor::new(&key).unwrap();
+
+        let mut headers = HashMap::new();
+        headers.insert(
+            HeaderKey::try_from("batch").unwrap(),
+            HeaderValue::from(1u64),
+        );
+        headers.insert(
+            HeaderKey::try_from("type").unwrap(),
+            HeaderValue::try_from("test-message").unwrap(),
+        );
+
+        let mut message = IggyMessage::builder()
+            .payload(Bytes::from("test payload data"))
+            .user_headers(headers)
+            .build()
+            .unwrap();
+
+        let original_payload = message.payload.clone();
+        let original_headers = message.user_headers.clone().unwrap();
+
+        message.payload = Bytes::from(encryptor.encrypt(&message.payload).unwrap());
+        message.header.payload_length = message.payload.len() as u32;
+
+        let encrypted_headers = encryptor.encrypt(&original_headers).unwrap();
+        message.header.user_headers_length = encrypted_headers.len() as u32;
+        message.user_headers = Some(Bytes::from(encrypted_headers));
+
+        assert_ne!(message.payload, original_payload);
+        assert_ne!(message.user_headers.as_ref().unwrap(), &original_headers);
+
+        let decrypted_payload = encryptor.decrypt(&message.payload).unwrap();
+        message.payload = Bytes::from(decrypted_payload);
+        message.header.payload_length = message.payload.len() as u32;
+
+        let decrypted_headers = encryptor
+            .decrypt(message.user_headers.as_ref().unwrap())
+            .unwrap();
+        message.header.user_headers_length = decrypted_headers.len() as u32;
+        message.user_headers = Some(Bytes::from(decrypted_headers));
+
+        assert_eq!(message.payload, original_payload);
+        assert_eq!(message.user_headers.as_ref().unwrap(), &original_headers);
+
+        let parsed = message.user_headers_map().unwrap().unwrap();
+        assert_eq!(
+            parsed
+                .get(&HeaderKey::try_from("batch").unwrap())
+                .unwrap()
+                .as_uint64()
+                .unwrap(),
+            1
+        );
+        assert_eq!(
+            parsed
+                .get(&HeaderKey::try_from("type").unwrap())
+                .unwrap()
+                .as_str()
+                .unwrap(),
+            "test-message"
+        );
+    }
 }
diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml
index 20d42de..8e78c32 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy-connectors"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Connectors runtime for Iggy message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 34d2425..97a508b 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_sdk"
-version = "0.2.1-edge.1"
+version = "0.2.2-edge.1"
 description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
index 40923ea..cb9b888 100644
--- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml
+++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_elasticsearch_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy Elasticsearch sink connector"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/iceberg_sink/Cargo.toml b/core/connectors/sinks/iceberg_sink/Cargo.toml
index 527a27d..e3ac0ae 100644
--- a/core/connectors/sinks/iceberg_sink/Cargo.toml
+++ b/core/connectors/sinks/iceberg_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_iceberg_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 edition = "2024"
 license = "Apache-2.0"
 keywords = ["iggy", "messaging", "streaming"]
diff --git a/core/connectors/sinks/mongodb_sink/Cargo.toml b/core/connectors/sinks/mongodb_sink/Cargo.toml
index 0a4305f..849c6f7 100644
--- a/core/connectors/sinks/mongodb_sink/Cargo.toml
+++ b/core/connectors/sinks/mongodb_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_mongodb_sink"
-version = "0.3.0"
+version = "0.3.3-edge.1"
 description = "Iggy MongoDB sink connector for storing stream messages into MongoDB database"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml b/core/connectors/sinks/postgres_sink/Cargo.toml
index 56ab685..04cc199 100644
--- a/core/connectors/sinks/postgres_sink/Cargo.toml
+++ b/core/connectors/sinks/postgres_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_postgres_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy PostgreSQL sink connector for storing stream messages into PostgreSQL database"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml b/core/connectors/sinks/quickwit_sink/Cargo.toml
index 3aea64b..cc54878 100644
--- a/core/connectors/sinks/quickwit_sink/Cargo.toml
+++ b/core/connectors/sinks/quickwit_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_quickwit_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml b/core/connectors/sinks/stdout_sink/Cargo.toml
index 270e525..df123b5 100644
--- a/core/connectors/sinks/stdout_sink/Cargo.toml
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_stdout_sink"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml b/core/connectors/sources/elasticsearch_source/Cargo.toml
index d1bb9c6..ed7365e 100644
--- a/core/connectors/sources/elasticsearch_source/Cargo.toml
+++ b/core/connectors/sources/elasticsearch_source/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_elasticsearch_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy Elasticsearch source connector"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml
index bbc82be..6281670 100644
--- a/core/connectors/sources/postgres_source/Cargo.toml
+++ b/core/connectors/sources/postgres_source/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_postgres_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy PostgreSQL source connector supporting CDC and table polling for message streaming platform"
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/connectors/sources/random_source/Cargo.toml b/core/connectors/sources/random_source/Cargo.toml
index 7a0eed4..b3e3d66 100644
--- a/core/connectors/sources/random_source/Cargo.toml
+++ b/core/connectors/sources/random_source/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_connector_random_source"
-version = "0.3.2-edge.1"
+version = "0.3.3-edge.1"
 description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/integration/src/harness/handle/client_builder.rs b/core/integration/src/harness/handle/client_builder.rs
index ef35b99..66998bb 100644
--- a/core/integration/src/harness/handle/client_builder.rs
+++ b/core/integration/src/harness/handle/client_builder.rs
@@ -72,6 +72,7 @@
     connection: ServerConnection,
     auto_login: Option<AutoLoginConfig>,
     tcp_nodelay: bool,
+    encryptor: Option<Arc<iggy_common::EncryptorKind>>,
 }
 
 impl ClientBuilder {
@@ -81,6 +82,7 @@
             connection,
             auto_login: None,
             tcp_nodelay: false,
+            encryptor: None,
         }
     }
 
@@ -102,6 +104,12 @@
         self
     }
 
+    /// Set the client-side encryptor for encrypting/decrypting message payloads and headers.
+    pub fn with_encryptor(mut self, encryptor: Arc<iggy_common::EncryptorKind>) -> Self {
+        self.encryptor = Some(encryptor);
+        self
+    }
+
     /// Connect to the server and optionally perform auto-login.
     pub async fn connect(self) -> Result<IggyClient, TestBinaryError> {
         let client = match self.transport {
@@ -168,7 +176,7 @@
         Ok(IggyClient::create(
             iggy::prelude::ClientWrapper::Tcp(client),
             None,
-            None,
+            self.encryptor.clone(),
         ))
     }
 
@@ -195,7 +203,7 @@
         Ok(IggyClient::create(
             iggy::prelude::ClientWrapper::Http(client),
             None,
-            None,
+            self.encryptor.clone(),
         ))
     }
 
@@ -231,7 +239,7 @@
         Ok(IggyClient::create(
             iggy::prelude::ClientWrapper::Quic(client),
             None,
-            None,
+            self.encryptor.clone(),
         ))
     }
 
@@ -282,7 +290,7 @@
         Ok(IggyClient::create(
             iggy::prelude::ClientWrapper::WebSocket(client),
             None,
-            None,
+            self.encryptor.clone(),
         ))
     }
 
diff --git a/core/integration/src/harness/orchestrator/harness.rs b/core/integration/src/harness/orchestrator/harness.rs
index 6e79058..6897b31 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -22,7 +22,8 @@
 use crate::harness::context::TestContext;
 use crate::harness::error::TestBinaryError;
 use crate::harness::handle::{
-    ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerHandle, ServerLogs,
+    ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerHandle,
+    ServerLogs,
 };
 use crate::harness::traits::{Restartable, TestBinary};
 use futures::executor::block_on;
@@ -271,19 +272,28 @@
         }
     }
 
-    /// Create a new client logged in as root for the specified transport.
     pub async fn root_client_for(
         &self,
         transport: TransportProtocol,
     ) -> Result<IggyClient, TestBinaryError> {
+        self.client_builder_for(transport)?
+            .with_root_login()
+            .connect()
+            .await
+    }
+
+    /// Create a new client logged in as root for the specified transport.
+    pub fn client_builder_for(
+        &self,
+        transport: TransportProtocol,
+    ) -> Result<ClientBuilder, TestBinaryError> {
         let server = self.servers.first().ok_or(TestBinaryError::MissingServer)?;
-        let builder = match transport {
-            TransportProtocol::Tcp => server.tcp_client()?,
-            TransportProtocol::Http => server.http_client()?,
-            TransportProtocol::Quic => server.quic_client()?,
-            TransportProtocol::WebSocket => server.websocket_client()?,
-        };
-        builder.with_root_login().connect().await
+        match transport {
+            TransportProtocol::Tcp => server.tcp_client(),
+            TransportProtocol::Http => server.http_client(),
+            TransportProtocol::Quic => server.quic_client(),
+            TransportProtocol::WebSocket => server.websocket_client(),
+        }
     }
 
     /// Create multiple root clients for the specified transport.
diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs b/core/integration/tests/server/scenarios/encryption_scenario.rs
index f03741c..6881f5f 100644
--- a/core/integration/tests/server/scenarios/encryption_scenario.rs
+++ b/core/integration/tests/server/scenarios/encryption_scenario.rs
@@ -18,36 +18,14 @@
 
 use bytes::Bytes;
 use iggy::prelude::*;
+use iggy_common::TransportProtocol;
 use integration::harness::{TestHarness, TestServerConfig};
 use serial_test::parallel;
 use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
 use test_case::test_matrix;
 
-fn encryption_enabled() -> bool {
-    true
-}
-
-fn encryption_disabled() -> bool {
-    false
-}
-
-fn build_server_config(encryption: bool) -> TestServerConfig {
-    let mut extra_envs = HashMap::new();
-
-    if encryption {
-        extra_envs.insert(
-            "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(),
-            "true".to_string(),
-        );
-        extra_envs.insert(
-            "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(),
-            "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(),
-        );
-    }
-
-    TestServerConfig::builder().extra_envs(extra_envs).build()
-}
-
 #[test_matrix(
     [encryption_enabled(), encryption_disabled()]
 )]
@@ -128,6 +106,43 @@
 
     tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
 
+    // Verify on-disk encryption of headers and payload
+    let data_path = harness.server().data_path();
+    let log_files = find_log_files(&data_path);
+    assert!(
+        !log_files.is_empty(),
+        "Expected at least one .log segment file on disk"
+    );
+    let all_raw_bytes: Vec<u8> = log_files
+        .iter()
+        .flat_map(|p| std::fs::read(p).unwrap())
+        .collect();
+    let contains_plaintext_header = all_raw_bytes
+        .windows(b"test-message".len())
+        .any(|w| w == b"test-message");
+    let contains_plaintext_payload = all_raw_bytes
+        .windows(b"Message batch 1".len())
+        .any(|w| w == b"Message batch 1");
+    if encryption {
+        assert!(
+            !contains_plaintext_header,
+            "When encryption is enabled, header values must NOT appear as plaintext on disk"
+        );
+        assert!(
+            !contains_plaintext_payload,
+            "When encryption is enabled, payload must NOT appear as plaintext on disk"
+        );
+    } else {
+        assert!(
+            contains_plaintext_header,
+            "When encryption is disabled, header values should appear as plaintext on disk"
+        );
+        assert!(
+            contains_plaintext_payload,
+            "When encryption is disabled, payload should appear as plaintext on disk"
+        );
+    }
+
     let initial_stats = client.get_stats().await.unwrap();
     let initial_messages_count = initial_stats.messages_count;
     let initial_messages_size = initial_stats.messages_size_bytes;
@@ -317,3 +332,190 @@
         initial_messages_size.as_bytes_u64()
     );
 }
+
+#[test_matrix(
+    [TransportProtocol::Tcp, TransportProtocol::Http, TransportProtocol::Quic, TransportProtocol::WebSocket]
+)]
+#[tokio::test]
+#[parallel]
+async fn should_encrypt_and_decrypt_headers_with_client_side_encryption(
+    transport: TransportProtocol,
+) {
+    let mut harness = TestHarness::builder()
+        .server(TestServerConfig::default())
+        .build()
+        .unwrap();
+
+    harness.start().await.unwrap();
+
+    let setup_client = harness.tcp_root_client().await.unwrap();
+    let stream_name = format!("client-enc-{transport}");
+    let topic_name = "enc-topic";
+
+    setup_client.create_stream(&stream_name).await.unwrap();
+    setup_client
+        .create_topic(
+            &Identifier::named(&stream_name).unwrap(),
+            topic_name,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    let encryptor = Arc::new(EncryptorKind::Aes256Gcm(
+        Aes256GcmEncryptor::new(&[42u8; 32]).unwrap(),
+    ));
+
+    let encrypting_client = harness
+        .client_builder_for(transport)
+        .unwrap()
+        .with_encryptor(encryptor)
+        .with_root_login()
+        .connect()
+        .await
+        .unwrap();
+
+    let stream_id = Identifier::named(&stream_name).unwrap();
+    let topic_id = Identifier::named(topic_name).unwrap();
+
+    let mut messages = Vec::new();
+    for i in 0..10i64 {
+        let mut headers = HashMap::new();
+        headers.insert(HeaderKey::try_from("index").unwrap(), HeaderValue::from(i));
+        headers.insert(
+            HeaderKey::try_from("transport").unwrap(),
+            HeaderValue::try_from(transport.to_string().as_str()).unwrap(),
+        );
+
+        messages.push(
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(format!("client encrypted msg {i}")))
+                .user_headers(headers)
+                .build()
+                .unwrap(),
+        );
+    }
+
+    encrypting_client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .unwrap();
+
+    let polled = encrypting_client
+        .poll_messages(
+            &stream_id,
+            &topic_id,
+            Some(0),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            10,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled.messages.len(), 10);
+    for (i, msg) in polled.messages.iter().enumerate() {
+        assert_eq!(
+            std::str::from_utf8(&msg.payload).unwrap(),
+            format!("client encrypted msg {i}")
+        );
+
+        let headers = msg.user_headers_map().unwrap().unwrap();
+        assert_eq!(
+            headers
+                .get(&HeaderKey::try_from("index").unwrap())
+                .unwrap()
+                .as_int64()
+                .unwrap(),
+            i as i64
+        );
+        assert_eq!(
+            headers
+                .get(&HeaderKey::try_from("transport").unwrap())
+                .unwrap()
+                .as_str()
+                .unwrap(),
+            transport.to_string().as_str()
+        );
+    }
+
+    let client_without_encryptor = harness.root_client_for(transport).await.unwrap();
+    let polled_without_decryption = client_without_encryptor
+        .poll_messages(
+            &stream_id,
+            &topic_id,
+            Some(0),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            10,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_without_decryption.messages.len(), 10);
+    for msg in &polled_without_decryption.messages {
+        let payload_text = std::str::from_utf8(&msg.payload).unwrap_or("");
+        assert!(
+            !payload_text.starts_with("client encrypted msg"),
+            "Payload must not be readable without the encryptor"
+        );
+
+        let headers = msg.user_headers_map().unwrap();
+        assert!(
+            headers.is_none(),
+            "Headers must not be parseable without the encryptor"
+        );
+    }
+}
+
+fn encryption_enabled() -> bool {
+    true
+}
+
+fn encryption_disabled() -> bool {
+    false
+}
+
+fn build_server_config(encryption: bool) -> TestServerConfig {
+    let mut extra_envs = HashMap::new();
+
+    if encryption {
+        extra_envs.insert(
+            "IGGY_SYSTEM_ENCRYPTION_ENABLED".to_string(),
+            "true".to_string(),
+        );
+        extra_envs.insert(
+            "IGGY_SYSTEM_ENCRYPTION_KEY".to_string(),
+            "/rvT1xP4V8u1EAhk4xDdqzqM2UOPXyy9XYkl4uRShgE=".to_string(),
+        );
+    }
+
+    TestServerConfig::builder().extra_envs(extra_envs).build()
+}
+
+fn find_log_files(dir: &Path) -> Vec<PathBuf> {
+    let mut result = Vec::new();
+    if let Ok(entries) = std::fs::read_dir(dir) {
+        for entry in entries.flatten() {
+            let path = entry.path();
+            if path.is_dir() {
+                result.extend(find_log_files(&path));
+            } else if path.extension().and_then(|e| e.to_str()) == Some("log") {
+                result.push(path);
+            }
+        }
+    }
+    result
+}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index cbd4050..3627c87 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.9.4-edge.1"
+version = "0.9.5-edge.1"
 description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/sdk/src/clients/binary_message.rs b/core/sdk/src/clients/binary_message.rs
index 63f064d..462c60e 100644
--- a/core/sdk/src/clients/binary_message.rs
+++ b/core/sdk/src/clients/binary_message.rs
@@ -61,6 +61,12 @@
                 let payload = encryptor.decrypt(&message.payload)?;
                 message.payload = Bytes::from(payload);
                 message.header.payload_length = message.payload.len() as u32;
+
+                if let Some(ref user_headers) = message.user_headers {
+                    let decrypted_headers = encryptor.decrypt(user_headers)?;
+                    message.header.user_headers_length = decrypted_headers.len() as u32;
+                    message.user_headers = Some(Bytes::from(decrypted_headers));
+                }
             }
         }
 
@@ -82,6 +88,12 @@
             for message in &mut *messages {
                 message.payload = Bytes::from(encryptor.encrypt(&message.payload)?);
                 message.header.payload_length = message.payload.len() as u32;
+
+                if let Some(ref user_headers) = message.user_headers {
+                    let encrypted_headers = encryptor.encrypt(user_headers)?;
+                    message.header.user_headers_length = encrypted_headers.len() as u32;
+                    message.user_headers = Some(Bytes::from(encrypted_headers));
+                }
             }
         }
 
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 37b2032..17d2c42 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -981,12 +981,12 @@
                     } else {
                         if let Some(ref encryptor) = self.encryptor {
                             for message in &mut polled_messages.messages {
+                                let offset = message.header.offset;
                                 let payload = encryptor.decrypt(&message.payload);
                                 if let Err(error) = payload {
                                     self.poll_future = None;
                                     error!(
-                                        "Failed to decrypt the message payload at offset: {}, partition ID: {}",
-                                        message.header.offset, partition_id
+                                        "Failed to decrypt the message payload at offset: {offset}, partition ID: {partition_id}",
                                     );
                                     return Poll::Ready(Some(Err(error)));
                                 }
@@ -994,6 +994,21 @@
                                 let payload = payload.unwrap();
                                 message.payload = Bytes::from(payload);
                                 message.header.payload_length = message.payload.len() as u32;
+
+                                if let Some(ref user_headers) = message.user_headers {
+                                    let decrypted_headers = encryptor.decrypt(user_headers);
+                                    if let Err(error) = decrypted_headers {
+                                        self.poll_future = None;
+                                        error!(
+                                            "Failed to decrypt the message user headers at offset: {offset}, partition ID: {partition_id}",
+                                        );
+                                        return Poll::Ready(Some(Err(error)));
+                                    }
+                                    let decrypted_headers = decrypted_headers.unwrap();
+                                    message.header.user_headers_length =
+                                        decrypted_headers.len() as u32;
+                                    message.user_headers = Some(Bytes::from(decrypted_headers));
+                                }
                             }
                         }
 
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index badd909..e17fafc 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -293,6 +293,12 @@
             for message in messages {
                 message.payload = Bytes::from(encryptor.encrypt(&message.payload)?);
                 message.header.payload_length = message.payload.len() as u32;
+
+                if let Some(ref user_headers) = message.user_headers {
+                    let encrypted_headers = encryptor.encrypt(user_headers)?;
+                    message.header.user_headers_length = encrypted_headers.len() as u32;
+                    message.user_headers = Some(Bytes::from(encrypted_headers));
+                }
             }
         }
         Ok(())
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 62027ca..d26d3fe 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "server"
-version = "0.7.3-edge.1"
+version = "0.7.4-edge.1"
 edition = "2024"
 license = "Apache-2.0"
 
diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs
index a59f68d..21e3420 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -559,25 +559,46 @@
             let mut position = 0;
 
             for message in batch.iter() {
+                let mut header = message.header().to_header();
+                let offset = header.offset;
                 let payload = encryptor.decrypt(message.payload());
                 match payload {
                     Ok(payload) => {
                         // Update the header with the decrypted payload length
-                        let mut header = message.header().to_header();
                         header.payload_length = payload.len() as u32;
 
+                        // Decrypt user headers if present
+                        let decrypted_user_headers = if let Some(user_headers) =
+                            message.user_headers()
+                        {
+                            match encryptor.decrypt(user_headers) {
+                                Ok(decrypted) => {
+                                    header.user_headers_length = decrypted.len() as u32;
+                                    Some(decrypted)
+                                }
+                                Err(error) => {
+                                    error!(
+                                        "Cannot decrypt the message user headers at offset: {offset}. Error: {error}"
+                                    );
+                                    continue;
+                                }
+                            }
+                        } else {
+                            None
+                        };
+
                         decrypted_messages.extend_from_slice(&header.to_bytes());
                         decrypted_messages.extend_from_slice(&payload);
-                        if let Some(user_headers) = message.user_headers() {
+                        if let Some(ref user_headers) = decrypted_user_headers {
                             decrypted_messages.extend_from_slice(user_headers);
                         }
                         position += IGGY_MESSAGE_HEADER_SIZE
                             + payload.len()
-                            + message.header().user_headers_length();
+                            + header.user_headers_length as usize;
                         indexes.insert(0, position as u32, 0);
                     }
                     Err(error) => {
-                        error!("Cannot decrypt the message. Error: {}", error);
+                        error!("Cannot decrypt the message at offset: {offset}. Error: {error}",);
                         continue;
                     }
                 }
@@ -604,7 +625,7 @@
 
         for message in batch.iter() {
             let header = message.header().to_header();
-            let user_headers_length = header.user_headers_length;
+            let offset = header.offset;
             let payload_bytes = message.payload();
             let user_headers_bytes = message.user_headers();
 
@@ -614,18 +635,38 @@
                     let mut updated_header = header;
                     updated_header.payload_length = encrypted_payload.len() as u32;
 
+                    // Encrypt user headers if present
+                    let encrypted_user_headers = if let Some(user_headers_bytes) =
+                        user_headers_bytes
+                    {
+                        match encryptor.encrypt(user_headers_bytes) {
+                            Ok(encrypted) => {
+                                updated_header.user_headers_length = encrypted.len() as u32;
+                                Some(encrypted)
+                            }
+                            Err(error) => {
+                                error!(
+                                    "Cannot encrypt the message user headers at offset: {offset}. Error: {error}"
+                                );
+                                continue;
+                            }
+                        }
+                    } else {
+                        None
+                    };
+
                     encrypted_messages.extend_from_slice(&updated_header.to_bytes());
                     encrypted_messages.extend_from_slice(&encrypted_payload);
-                    if let Some(user_headers_bytes) = user_headers_bytes {
-                        encrypted_messages.extend_from_slice(user_headers_bytes);
+                    if let Some(ref encrypted_user_headers) = encrypted_user_headers {
+                        encrypted_messages.extend_from_slice(encrypted_user_headers);
                     }
                     position += IGGY_MESSAGE_HEADER_SIZE
                         + encrypted_payload.len()
-                        + user_headers_length as usize;
+                        + updated_header.user_headers_length as usize;
                     indexes.insert(0, position as u32, 0);
                 }
                 Err(error) => {
-                    error!("Cannot encrypt the message. Error: {}", error);
+                    error!("Cannot encrypt the message at offset: {offset}. Error: {error}",);
                     continue;
                 }
             }
diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs
new file mode 100644
index 0000000..0b8e98b
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text;
+using Apache.Iggy.Consumers;
+using Apache.Iggy.Encryption;
+using Apache.Iggy.Enums;
+using Apache.Iggy.Headers;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Kinds;
+using Apache.Iggy.Mappers;
+using Apache.Iggy.Messages;
+using Apache.Iggy.Publishers;
+using Apache.Iggy.Tests.Integrations.Fixtures;
+using Shouldly;
+using Partitioning = Apache.Iggy.Kinds.Partitioning;
+
+namespace Apache.Iggy.Tests.Integrations;
+
+public class HeaderEncryptionIntegrationTests
+{
+    [ClassDataSource<IggyServerFixture>(Shared = SharedType.PerAssembly)]
+    public required IggyServerFixture Fixture { get; init; }
+
+    [Test]
+    [MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
+    public async Task SendMessages_WithEncryptedHeaders_Should_NotBeReadableWithoutDecryptor(Protocol protocol)
+    {
+        var client = protocol == Protocol.Tcp
+            ? await Fixture.CreateTcpClient()
+            : await Fixture.CreateHttpClient();
+
+        var encryptor = CreateEncryptor();
+        var testStream = await CreateTestStream(client, protocol);
+        var streamId = Identifier.String(testStream.StreamId);
+        var topicId = Identifier.String(testStream.TopicId);
+
+        // Send message with encrypted headers via publisher
+        var publisher = IggyPublisherBuilder
+            .Create(client, streamId, topicId)
+            .WithPartitioning(Partitioning.PartitionId(0))
+            .WithEncryptor(encryptor)
+            .Build();
+
+        await publisher.InitAsync();
+
+        var headers = CreateTestHeaders();
+        var messages = new List<Message>
+        {
+            new(Guid.NewGuid(), Encoding.UTF8.GetBytes("encrypted payload"), headers)
+        };
+
+        await publisher.SendMessagesAsync(messages);
+        await publisher.DisposeAsync();
+
+        // Poll with a normal client (no decryptor)
+        var polled = await client.PollMessagesAsync(
+            streamId, topicId, 0,
+            Consumer.New(0),
+            PollingStrategy.Next(),
+            1, false);
+
+        polled.Messages.Count.ShouldBe(1);
+        var msg = polled.Messages[0];
+
+        // Payload should be encrypted (not readable as plaintext)
+        Encoding.UTF8.GetString(msg.Payload).ShouldNotBe("encrypted payload");
+
+        // RawUserHeaders should contain encrypted bytes (both TCP and HTTP)
+        msg.RawUserHeaders.ShouldNotBeNull();
+        msg.RawUserHeaders!.Length.ShouldBeGreaterThan(0);
+
+        // Manually decrypt and verify payload
+        var decryptedPayload = encryptor.Decrypt(msg.Payload);
+        Encoding.UTF8.GetString(decryptedPayload).ShouldBe("encrypted payload");
+
+        // Manually decrypt and verify headers
+        var decryptedHeaderBytesResult = encryptor.Decrypt(msg.RawUserHeaders);
+        var decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytesResult);
+        decryptedHeaders.Count.ShouldBe(3);
+
+        var typeHeader = decryptedHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() }];
+        Encoding.UTF8.GetString(typeHeader.Value).ShouldBe("test-message");
+    }
+
+    [Test]
+    [MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
+    public async Task ReceiveAsync_WithDecryptor_Should_DecryptHeadersCorrectly(Protocol protocol)
+    {
+        var client = protocol == Protocol.Tcp
+            ? await Fixture.CreateTcpClient()
+            : await Fixture.CreateHttpClient();
+
+        var encryptor = CreateEncryptor();
+        var testStream = await CreateTestStream(client, protocol);
+        var streamId = Identifier.String(testStream.StreamId);
+        var topicId = Identifier.String(testStream.TopicId);
+
+        // Send encrypted message via publisher
+        var publisher = IggyPublisherBuilder
+            .Create(client, streamId, topicId)
+            .WithPartitioning(Partitioning.PartitionId(0))
+            .WithEncryptor(encryptor)
+            .Build();
+
+        await publisher.InitAsync();
+
+        var headers = CreateTestHeaders();
+        var messages = new List<Message>
+        {
+            new(Guid.NewGuid(), Encoding.UTF8.GetBytes("consumer test payload"), headers)
+        };
+
+        await publisher.SendMessagesAsync(messages);
+        await publisher.DisposeAsync();
+
+        // Poll with IggyConsumer that has decryptor configured
+        var consumer = IggyConsumerBuilder
+            .Create(client, streamId, topicId, Consumer.New(1))
+            .WithPollingStrategy(PollingStrategy.Next())
+            .WithBatchSize(1)
+            .WithPartitionId(0)
+            .WithAutoCommitMode(AutoCommitMode.Disabled)
+            .WithDecryptor(encryptor)
+            .Build();
+
+        await consumer.InitAsync();
+
+        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+        ReceivedMessage? received = null;
+
+        await foreach (var msg in consumer.ReceiveAsync(cts.Token))
+        {
+            received = msg;
+            break;
+        }
+
+        await consumer.DisposeAsync();
+
+        // Message should be successfully decrypted
+        received.ShouldNotBeNull();
+        received!.Status.ShouldBe(MessageStatus.Success);
+
+        // Payload should be decrypted
+        Encoding.UTF8.GetString(received.Message.Payload).ShouldBe("consumer test payload");
+
+        // Headers should be decrypted and parsed
+        received.Message.UserHeaders.ShouldNotBeNull();
+        received.Message.UserHeaders!.Count.ShouldBe(3);
+
+        var batchHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "batch"u8.ToArray() }];
+        BitConverter.ToUInt64(batchHeader.Value).ShouldBe(1UL);
+
+        var typeHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() }];
+        Encoding.UTF8.GetString(typeHeader.Value).ShouldBe("test-message");
+
+        var encHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "encrypted"u8.ToArray() }];
+        encHeader.Value[0].ShouldBe((byte)1);
+    }
+
+    private static AesMessageEncryptor CreateEncryptor()
+    {
+        return new AesMessageEncryptor(AesMessageEncryptor.GenerateKey());
+    }
+
+    private static Dictionary<HeaderKey, HeaderValue> CreateTestHeaders()
+    {
+        return new Dictionary<HeaderKey, HeaderValue>
+        {
+            {
+                new HeaderKey { Kind = HeaderKind.String, Value = "batch"u8.ToArray() },
+                new HeaderValue { Kind = HeaderKind.Uint64, Value = BitConverter.GetBytes(1UL) }
+            },
+            {
+                new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() },
+                new HeaderValue { Kind = HeaderKind.String, Value = "test-message"u8.ToArray() }
+            },
+            {
+                new HeaderKey { Kind = HeaderKind.String, Value = "encrypted"u8.ToArray() },
+                new HeaderValue { Kind = HeaderKind.Bool, Value = [1] }
+            },
+        };
+    }
+
+    private async Task<TestStreamInfo> CreateTestStream(IIggyClient client, Protocol protocol)
+    {
+        var streamId = $"enc_stream_{Guid.NewGuid()}_{protocol.ToString().ToLowerInvariant()}";
+        var topicId = "enc_topic";
+
+        await client.CreateStreamAsync(streamId);
+        await client.CreateTopicAsync(Identifier.String(streamId), topicId, 1);
+
+        return new TestStreamInfo(streamId, topicId);
+    }
+
+    private record TestStreamInfo(string StreamId, string TopicId);
+}
diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
index 2c5a1b8..76264f9 100644
--- a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
+++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs
@@ -21,8 +21,10 @@
 using Apache.Iggy.Contracts;
 using Apache.Iggy.Enums;
 using Apache.Iggy.Exceptions;
+using Apache.Iggy.Headers;
 using Apache.Iggy.IggyClient;
 using Apache.Iggy.Kinds;
+using Apache.Iggy.Mappers;
 using Apache.Iggy.Utils;
 using Microsoft.Extensions.Logging;
 
@@ -380,11 +382,19 @@
                     try
                     {
                         var decryptedPayload = _config.MessageEncryptor.Decrypt(message.Payload);
+
+                        Dictionary<HeaderKey, HeaderValue>? decryptedHeaders = null;
+                        if (message.RawUserHeaders is { Length: > 0 })
+                        {
+                            var decryptedHeaderBytes = _config.MessageEncryptor.Decrypt(message.RawUserHeaders);
+                            decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytes);
+                        }
+
                         processedMessage = new MessageResponse
                         {
                             Header = message.Header,
                             Payload = decryptedPayload,
-                            UserHeaders = message.UserHeaders
+                            UserHeaders = decryptedHeaders
                         };
                     }
                     catch (Exception ex)
diff --git a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
index f39ddd8..9a5eb55 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs
@@ -25,6 +25,7 @@
 /// <summary>
 ///     Response from the server containing a message payload.
 /// </summary>
+[JsonConverter(typeof(MessageResponseConverter))]
 public sealed class MessageResponse
 {
     /// <summary>
@@ -40,7 +41,12 @@
     /// <summary>
     ///     Headers defined by the user.
     /// </summary>
-    [JsonPropertyName("user_headers")]
-    [JsonConverter(typeof(UserHeadersConverter))]
-    public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; init; }
+    public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; }
+
+    /// <summary>
+    ///     Raw user header bytes before deserialization.
+    ///     Used internally for decrypting encrypted headers.
+    /// </summary>
+    [JsonIgnore]
+    internal byte[]? RawUserHeaders { get; set; }
 }
diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
index 3501117..2e2c939 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
@@ -410,7 +410,7 @@
         var msgSize = 0;
         foreach (var message in messages)
         {
-            var headersBytes = GetHeadersBytes(message.UserHeaders);
+            var headersBytes = message.RawUserHeaders ?? GetHeadersBytes(message.UserHeaders);
             BinaryPrimitives.WriteUInt64LittleEndian(bytes[position..(position + 8)], message.Header.Checksum);
             BinaryPrimitives.WriteUInt128LittleEndian(bytes[(position + 8)..(position + 24)], message.Header.Id);
             BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 24)..(position + 32)], message.Header.Offset);
@@ -548,7 +548,7 @@
     //     return bytes;
     // }
 
-    private static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? headers)
+    internal static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue>? headers)
     {
         if (headers == null)
         {
diff --git a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
index fb31d48..9232422 100644
--- a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
+++ b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj
@@ -7,7 +7,7 @@
         <TargetFrameworks>net8.0;net10.0</TargetFrameworks>
         <AssemblyName>Apache.Iggy</AssemblyName>
         <RootNamespace>Apache.Iggy</RootNamespace>
-        <Version>0.7.1-edge.3</Version>
+        <Version>0.7.2-edge.1</Version>
         <GenerateDocumentationFile>true</GenerateDocumentationFile>
     </PropertyGroup>
 
diff --git a/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs b/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs
index ad8d721..f319aa3 100644
--- a/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs
+++ b/foreign/csharp/Iggy_SDK/JsonConverters/MessageConverter.cs
@@ -35,7 +35,14 @@
 
         WriteMessageId(writer, value.Header.Id);
         WritePayload(writer, value.Payload);
-        WriteHeaders(writer, value.UserHeaders);
+        if (value.RawUserHeaders is not null)
+        {
+            writer.WriteBase64String("user_headers", value.RawUserHeaders);
+        }
+        else
+        {
+            WriteHeaders(writer, value.UserHeaders);
+        }
 
         writer.WriteEndObject();
     }
diff --git a/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs b/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs
new file mode 100644
index 0000000..9cb9821
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Headers;
+using Apache.Iggy.Messages;
+
+namespace Apache.Iggy.JsonConverters;
+
+internal sealed class MessageResponseConverter : JsonConverter<MessageResponse>
+{
+    private static readonly UserHeadersConverter HeadersConverter = new();
+
+    public override MessageResponse Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+    {
+        if (reader.TokenType != JsonTokenType.StartObject)
+        {
+            throw new JsonException("Expected start of object for MessageResponse.");
+        }
+
+        MessageHeader? header = null;
+        byte[]? payload = null;
+        Dictionary<HeaderKey, HeaderValue>? userHeaders = null;
+        byte[]? rawUserHeaders = null;
+
+        while (reader.Read())
+        {
+            if (reader.TokenType == JsonTokenType.EndObject)
+            {
+                break;
+            }
+
+            if (reader.TokenType != JsonTokenType.PropertyName)
+            {
+                throw new JsonException("Expected property name.");
+            }
+
+            var propertyName = reader.GetString();
+            reader.Read();
+
+            switch (propertyName)
+            {
+                case "header":
+                    header = JsonSerializer.Deserialize<MessageHeader>(ref reader, options);
+                    break;
+                case "payload":
+                    payload = reader.GetBytesFromBase64();
+                    break;
+                case "user_headers":
+                    if (reader.TokenType == JsonTokenType.String)
+                    {
+                        rawUserHeaders = reader.GetBytesFromBase64();
+                    }
+                    else if (reader.TokenType == JsonTokenType.Null)
+                    {
+                        userHeaders = null;
+                    }
+                    else
+                    {
+                        userHeaders = HeadersConverter.Read(ref reader, typeof(Dictionary<HeaderKey, HeaderValue>), options);
+                    }
+                    break;
+                default:
+                    reader.Skip();
+                    break;
+            }
+        }
+
+        return new MessageResponse
+        {
+            Header = header ?? throw new JsonException("Missing 'header' field."),
+            Payload = payload ?? throw new JsonException("Missing 'payload' field."),
+            UserHeaders = userHeaders,
+            RawUserHeaders = rawUserHeaders
+        };
+    }
+
+    public override void Write(Utf8JsonWriter writer, MessageResponse value, JsonSerializerOptions options)
+    {
+        JsonSerializer.Serialize(writer, value, options);
+    }
+}
diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
index 6c3435d..cb530a5 100644
--- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
+++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs
@@ -336,8 +336,7 @@
         };
     }
 
-    internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload,
-        Func<byte[], byte[]>? decryptor = null)
+    internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload)
     {
         var length = payload.Length;
         var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[..4]);
@@ -362,13 +361,22 @@
             var payloadLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 52)..(position + 56)]);
             var reserved = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 56)..(position + 64)]);
 
-            Dictionary<HeaderKey, HeaderValue>? headers = headersLength switch
+            var wireHeadersLength = headersLength;
+            byte[]? rawUserHeaders = null;
+            Dictionary<HeaderKey, HeaderValue>? headers;
+            if (headersLength == 0)
             {
-                0 => null,
-                > 0 => MapHeaders(
-                    payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)]),
-                < 0 => throw new ArgumentOutOfRangeException()
-            };
+                headers = null;
+            }
+            else if (headersLength < 0)
+            {
+                throw new ArgumentOutOfRangeException();
+            }
+            else
+            {
+                rawUserHeaders = payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)].ToArray();
+                headers = TryMapHeaders(rawUserHeaders);
+            }
 
             var payloadRangeStart = position + 64;
             var payloadRangeEnd = position + 64 + payloadLength;
@@ -399,9 +407,8 @@
                         Reserved = reserved
                     },
                     UserHeaders = headers,
-                    Payload = decryptor is not null
-                        ? decryptor(messagePayload[..payloadSliceLen])
-                        : messagePayload[..payloadSliceLen]
+                    RawUserHeaders = rawUserHeaders,
+                    Payload = messagePayload[..payloadSliceLen]
                 });
             }
             finally
@@ -409,7 +416,7 @@
                 ArrayPool<byte>.Shared.Return(messagePayload);
             }
 
-            position += 64 + payloadLength + headersLength;
+            position += 64 + payloadLength + wireHeadersLength;
             if (position + PropertiesSize >= length)
             {
                 break;
@@ -424,14 +431,14 @@
         };
     }
 
-    private static Dictionary<HeaderKey, HeaderValue> MapHeaders(ReadOnlySpan<byte> payload)
+    internal static Dictionary<HeaderKey, HeaderValue> MapHeaders(ReadOnlySpan<byte> payload)
     {
         var headers = new Dictionary<HeaderKey, HeaderValue>();
         var position = 0;
 
         while (position < payload.Length)
         {
-            var keyKind = MapHeaderKind(payload, position);
+            var keyKind = MapHeaderKind(payload[position]);
             position++;
 
             var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
@@ -444,7 +451,7 @@
             var keyValue = payload[position..(position + keyLength)].ToArray();
             position += keyLength;
 
-            var valueKind = MapHeaderKind(payload, position);
+            var valueKind = MapHeaderKind(payload[position]);
             position++;
 
             var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
@@ -464,9 +471,62 @@
         return headers;
     }
 
-    private static HeaderKind MapHeaderKind(ReadOnlySpan<byte> payload, int position)
+    internal static Dictionary<HeaderKey, HeaderValue>? TryMapHeaders(ReadOnlySpan<byte> payload)
     {
-        var headerKind = payload[position] switch
+        if (payload.Length == 0 || payload[0] is 0 or > 15)
+        {
+            return null;
+        }
+
+        var headers = new Dictionary<HeaderKey, HeaderValue>();
+        var position = 0;
+
+        while (position < payload.Length)
+        {
+            if (!TryMapHeaderKind(payload[position], out var keyKind))
+                return null;
+            position++;
+
+            if (position + 4 > payload.Length)
+                return null;
+            var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
+            if (keyLength is <= 0 or > 255)
+                return null;
+
+            position += 4;
+            if (position + keyLength > payload.Length)
+                return null;
+            var keyValue = payload[position..(position + keyLength)].ToArray();
+            position += keyLength;
+
+            if (position >= payload.Length)
+                return null;
+            if (!TryMapHeaderKind(payload[position], out var valueKind))
+                return null;
+            position++;
+
+            if (position + 4 > payload.Length)
+                return null;
+            var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
+            if (valueLength is <= 0 or > 255)
+                return null;
+
+            position += 4;
+            if (position + valueLength > payload.Length)
+                return null;
+            ReadOnlySpan<byte> value = payload[position..(position + valueLength)];
+            position += valueLength;
+
+            headers[new HeaderKey { Kind = keyKind, Value = keyValue }] =
+                new HeaderValue { Kind = valueKind, Value = value.ToArray() };
+        }
+
+        return headers;
+    }
+
+    private static HeaderKind MapHeaderKind(byte value)
+    {
+        return value switch
         {
             1 => HeaderKind.Raw,
             2 => HeaderKind.String,
@@ -483,9 +543,19 @@
             13 => HeaderKind.Uint128,
             14 => HeaderKind.Float,
             15 => HeaderKind.Double,
-            _ => throw new ArgumentOutOfRangeException()
+            _ => throw new ArgumentOutOfRangeException(nameof(value), value, null)
         };
-        return headerKind;
+    }
+
+    private static bool TryMapHeaderKind(byte value, out HeaderKind kind)
+    {
+        if (value is >= 1 and <= 15)
+        {
+            kind = MapHeaderKind(value);
+            return true;
+        }
+        kind = default;
+        return false;
     }
 
     internal static IReadOnlyList<StreamResponse> MapStreams(ReadOnlySpan<byte> payload)
diff --git a/foreign/csharp/Iggy_SDK/Messages/Message.cs b/foreign/csharp/Iggy_SDK/Messages/Message.cs
index 1a0e7e6..dc3c7dd 100644
--- a/foreign/csharp/Iggy_SDK/Messages/Message.cs
+++ b/foreign/csharp/Iggy_SDK/Messages/Message.cs
@@ -46,6 +46,12 @@
     public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; set; }
 
     /// <summary>
+    ///     Pre-serialized (possibly encrypted) user headers bytes.
+    ///     When set, this takes precedence over <see cref="UserHeaders"/> during serialization.
+    /// </summary>
+    internal byte[]? RawUserHeaders { get; set; }
+
+    /// <summary>
     ///     Default constructor.
     /// </summary>
     public Message()
diff --git a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
index 1b95c9d..b850916 100644
--- a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
+++ b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using Apache.Iggy.Contracts.Tcp;
 using Apache.Iggy.Enums;
 using Apache.Iggy.Exceptions;
+using Apache.Iggy.Headers;
 using Apache.Iggy.IggyClient;
 using Apache.Iggy.Messages;
 using Microsoft.Extensions.Logging;
@@ -310,7 +312,7 @@
 
     /// <summary>
     ///     Encrypts all messages in the list using the configured message encryptor, if available.
-    ///     Updates the payload length in the message header after encryption.
+    ///     Updates the payload and user headers lengths in the message header after encryption.
     /// </summary>
     /// <param name="messages">The messages to encrypt.</param>
     private void EncryptMessages(IList<Message> messages)
@@ -324,6 +326,15 @@
         {
             message.Payload = _config.MessageEncryptor.Encrypt(message.Payload);
             message.Header.PayloadLength = message.Payload.Length;
+
+            if (message.UserHeaders is { Count: > 0 })
+            {
+                var headerBytes = TcpContracts.GetHeadersBytes(message.UserHeaders);
+                var encryptedHeaderBytes = _config.MessageEncryptor.Encrypt(headerBytes);
+                message.RawUserHeaders = encryptedHeaderBytes;
+                message.UserHeaders = null;
+                message.Header.UserHeadersLength = encryptedHeaderBytes.Length;
+            }
         }
     }
 }
diff --git a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
index c99337b..626442a 100644
--- a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
+++ b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs
@@ -49,6 +49,12 @@
         foreach (var message in messages)
         {
             bytesCount += 16 + 64 + message.Payload.Length;
+            if (message.RawUserHeaders is not null)
+            {
+                bytesCount += message.RawUserHeaders.Length;
+                continue;
+            }
+
             if (message.UserHeaders is null)
             {
                 continue;
diff --git a/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs
new file mode 100644
index 0000000..53758ca
--- /dev/null
+++ b/foreign/csharp/Iggy_SDK_Tests/MapperTests/HeaderEncryptionTests.cs
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Contracts.Tcp;
+using Apache.Iggy.Encryption;
+using Apache.Iggy.Headers;
+namespace Apache.Iggy.Tests.MapperTests;
+
+public class HeaderEncryptionTests
+{
+    [Fact]
+    public void Headers_should_survive_encrypt_decrypt_roundtrip()
+    {
+        var key = Encryption.AesMessageEncryptor.GenerateKey();
+        var encryptor = new Encryption.AesMessageEncryptor(key);
+
+        var originalHeaders = new Dictionary<Headers.HeaderKey, Headers.HeaderValue>
+        {
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "batch"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Uint64, Value = BitConverter.GetBytes(1UL) } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "type"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.String, Value = "test-message"u8.ToArray() } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "encrypted"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Bool, Value = [1] } },
+        };
+
+        var headerBytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(originalHeaders);
+        Assert.NotEmpty(headerBytes);
+
+        var encrypted = encryptor.Encrypt(headerBytes);
+        Assert.NotEqual(headerBytes, encrypted);
+        Assert.True(encrypted.Length > headerBytes.Length);
+
+        // Encrypted bytes must not parse as valid headers
+        var parsed = Mappers.BinaryMapper.TryMapHeaders(encrypted);
+        Assert.Null(parsed);
+
+        var decrypted = encryptor.Decrypt(encrypted);
+        Assert.Equal(headerBytes, decrypted);
+
+        var roundTripped = Mappers.BinaryMapper.MapHeaders(decrypted);
+        Assert.Equal(originalHeaders.Count, roundTripped.Count);
+
+        foreach (var (key2, value) in originalHeaders)
+        {
+            Assert.True(roundTripped.ContainsKey(key2));
+            Assert.Equal(value.Kind, roundTripped[key2].Kind);
+            Assert.Equal(value.Value, roundTripped[key2].Value);
+        }
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_invalid_first_byte()
+    {
+        var random = new byte[64];
+        Random.Shared.NextBytes(random);
+        random[0] = 0;
+
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(random));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_first_byte_above_range()
+    {
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 16, 0, 0, 0 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_empty_payload()
+    {
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders([]));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_truncated_key_length()
+    {
+        // Valid header kind byte but not enough bytes for key length
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_zero_key_length()
+    {
+        // Valid kind, then key length = 0 (invalid)
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 0, 0, 0, 0 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_key_length_exceeding_payload()
+    {
+        // Valid kind, key length = 100 but only a few bytes remain
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 100, 0, 0, 0, 1, 2 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_truncated_value_kind()
+    {
+        // Valid kind(2=String), key_len=1, key='A', then no value kind byte
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_invalid_value_kind()
+    {
+        // Valid kind(2), key_len=1, key='A', invalid value kind=0
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 0 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_truncated_value_length()
+    {
+        // Valid kind(2), key_len=1, key='A', valid value kind(2), then not enough for value length
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 2, 1 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_zero_value_length()
+    {
+        // Valid kind(2), key_len=1, key='A', valid value kind(2), value_len=0 (invalid)
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 2, 0, 0, 0, 0 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_null_on_value_length_exceeding_payload()
+    {
+        // Valid kind(2), key_len=1, key='A', valid value kind(2), value_len=100 but not enough bytes
+        Assert.Null(Mappers.BinaryMapper.TryMapHeaders(new byte[] { 2, 1, 0, 0, 0, 65, 2, 100, 0, 0, 0 }));
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_valid_headers_on_plaintext()
+    {
+        var headers = new Dictionary<Headers.HeaderKey, Headers.HeaderValue>
+        {
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "key"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.String, Value = "value"u8.ToArray() } },
+        };
+
+        var bytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(headers);
+        var result = Mappers.BinaryMapper.TryMapHeaders(bytes);
+
+        Assert.NotNull(result);
+        Assert.Single(result);
+    }
+
+    [Fact]
+    public void TryMapHeaders_returns_valid_for_all_header_kinds()
+    {
+        var headers = new Dictionary<Headers.HeaderKey, Headers.HeaderValue>
+        {
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.Raw, Value = "k1"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Raw, Value = [0xFF] } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k2"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Bool, Value = [1] } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k3"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Int32, Value = BitConverter.GetBytes(42) } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k4"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Uint64, Value = BitConverter.GetBytes(99UL) } },
+            { new Headers.HeaderKey { Kind = Headers.HeaderKind.String, Value = "k5"u8.ToArray() }, new Headers.HeaderValue { Kind = Headers.HeaderKind.Double, Value = BitConverter.GetBytes(3.14) } },
+        };
+
+        var bytes = Contracts.Tcp.TcpContracts.GetHeadersBytes(headers);
+        var result = Mappers.BinaryMapper.TryMapHeaders(bytes);
+
+        Assert.NotNull(result);
+        Assert.Equal(5, result.Count);
+    }
+}
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 9ce0d52..2e6e385 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "apache-iggy"
-version = "0.7.4-dev1"
+version = "0.7.5-dev1"
 edition = "2024"
 authors = ["Iggy Committers <dev@iggy.apache.org>"]
 license = "Apache-2.0"
@@ -28,7 +28,7 @@
 [dependencies]
 bytes = "1.11.1"
 futures = "0.3.32"
-iggy = { path = "../../core/sdk", version = "0.9.4-edge.1" }
+iggy = { path = "../../core/sdk", version = "0.9.5-edge.1" }
 pyo3 = "0.28.2"
 pyo3-async-runtimes = { version = "0.28.0", features = [
     "attributes",
diff --git a/foreign/python/pyproject.toml b/foreign/python/pyproject.toml
index ba57ad4..0122961 100644
--- a/foreign/python/pyproject.toml
+++ b/foreign/python/pyproject.toml
@@ -22,7 +22,7 @@
 [project]
 name = "apache-iggy"
 requires-python = ">=3.10"
-version = "0.7.4.dev1"
+version = "0.7.5.dev1"
 description = "Apache Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
 readme = "README.md"
 license = { file = "LICENSE" }