blob: 0fdc410b01ae25cc286147194d4197fa9ec7e211 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DKIM-ID Generator Code
"""
import base64
import hmac
from typing import List, Optional, Set, Tuple
# Types
Headers = List[List[bytes]]
# Headers from RFC 4871, the precursor to RFC 6376
# libopendkim lacks: sender, message-id, mime-version, content-type,
# content-transfer-encoding, content-id, content-description,
# resent-message-id, dkim-signature
rfc4871_subset: Set[bytes] = {
b"from",
b"sender",
b"reply-to",
b"subject",
b"date",
b"message-id",
b"to",
b"cc",
b"mime-version",
b"content-type",
b"content-transfer-encoding",
b"content-id",
b"content-description",
b"resent-date",
b"resent-from",
b"resent-sender",
b"resent-to",
b"resent-cc",
b"resent-message-id",
b"in-reply-to",
b"references",
b"list-id",
b"list-help",
b"list-unsubscribe",
b"list-subscribe",
b"list-post",
b"list-owner",
b"list-archive",
b"dkim-signature",
}
# Authenticity headers from RFC 8617
rfc4871_and_rfc8617_subset: Set[bytes] = rfc4871_subset | {
b"arc-authentication-results",
b"arc-message-signature",
b"arc-seal",
}
def rfc5322_endings(data: bytes) -> bytes:
r"""
Convert bytes to RFC 5322 ending normal form.
In ending normal form, bare CR and LF are converted to CRLF.
>>> rfc5322_endings(b"CR \r LF \n CRLF \r\n")
b'CR \r\n LF \r\n CRLF \r\n'
"""
# v
# [^\r]\n -> [^\r]\r\n
# v
# \r[^\n] -> \r\n[^\n]
# v
# \r$ -> \r\n$
CR: int = 0x0D
LF: int = 0x0A
this: int
prev: Optional[int] = None
output: bytearray = bytearray()
for this in data:
if (this == LF) and (prev != CR):
output.extend(b"\r\n")
elif (prev == CR) and (this != LF):
output.extend(b"\n")
output.append(this)
else:
output.append(this)
prev = this
if prev == CR:
output.append(LF)
return bytes(output)
def rfc6376_split(
suffix: Optional[bytes],
) -> Tuple[Headers, Optional[bytes]]:
r"""
Parse an RFC 5322 message into headers and body.
Does not perform any normalisation or canonicalisation.
The returned body is None if no CRLF CRLF boundary is found.
>>> rfc6376_split(b"To: Recipient\r\n\r\nBody")
([[b'To', b' Recipient\r\n']], b'Body')
"""
headers: Headers = []
while suffix:
if suffix.startswith(b"\r\n"):
return (headers, suffix[2:])
parts: List[bytes] = suffix.split(b"\r\n", 1)
line: bytes = parts.pop(0)
# len(line) > 0 due to suffix.startswith(b"\r\n") above
if line[:1] not in {b"\t", b" "}:
headers.append((line.split(b":", 1) + [b""])[:2])
else:
if not headers:
headers.append([b"", b""])
headers[-1][1] += line
if parts:
headers[-1][1] += b"\r\n"
else:
return (headers, None)
suffix = parts[0]
return (headers, None)
def rfc6376_shrink_head(data: bytes) -> bytes:
# Auxiliary function, not for external use
data = data.replace(b"\t", b" ")
return b" ".join(d for d in data.split(b" ") if d)
def rfc6376_relaxed_head(headers: Headers) -> Headers:
r"""
Perform RFC 6376 DKIM relaxed header canonicalisation.
>>> rfc6376_relaxed_head([[b"TO ", b" Recipient\temail "]])
[[b'to', b'Recipient email']]
>>> rfc6376_relaxed_head([[b" \t.\r\n", b"\t\r\n\f"]])
[[b'.', b'\x0c']]
"""
output: Headers = []
k: bytes
v: bytes
for (k, v) in headers:
# Step 1: Header field names to lowercase
k = k.lower()
# Step 2: Unfold all header field value continuations
# First, save CRLF for later restoration
crlf: bool = v.endswith(b"\r\n")
if crlf is True:
v = v[:-2]
# Then, remove all CR and LF from name and value
# It is important to do this in k too, due to an edge case
k = k.replace(b"\r", b"")
k = k.replace(b"\n", b"")
v = v.replace(b"\r", b"")
v = v.replace(b"\n", b"")
# Step 3: Convert WSP+ to space
# Step 4: Remove trailing WSP from unfolded field values
# Step 5: Remove trailing header name WSP, and leading value WSP
# Also removes leading WSP from header names
k = rfc6376_shrink_head(k)
v = rfc6376_shrink_head(v)
# Restore potential earlier saved CRLF
if crlf is True:
v = v + b"\r\n"
output.append([k, v])
return output
def rfc6376_simple_body(body: bytes) -> bytes:
r"""
Perform RFC 6376 DKIM simple body canonicalisation.
>>> rfc6376_simple_body(b"")
b'\r\n'
>>> rfc6376_simple_body(b"\r\n\r\n")
b'\r\n'
>>> rfc6376_simple_body(b".")
b'.'
"""
# In DKIM simple body, an empty body becomes CRLF
body = body or b"\r\n"
while body.endswith(b"\r\n\r\n"):
body = body[:-2]
return body
def rfc6376_simple_holistic(
headers: Headers, body: Optional[bytes]
) -> Tuple[Headers, bytes]:
r"""
Perform RFC 6376 DKIM simple body canonicalisation holistically.
This appends CRLF to headers if necessary, if there was no body.
>>> rfc6376_simple_holistic([[b"Key", b"Value"]], None)
([[b'Key', b'Value\r\n']], b'\r\n')
"""
# Note: This modifies headers in place
# There may be no body, but canonicalisation synthesizes one
# Therefore we may need to add CRLF to the last header value
if body is None:
if headers:
if not headers[-1][1].endswith(b"\r\n"):
headers[-1][1] += b"\r\n"
return (headers, b"\r\n")
return (headers, rfc6376_simple_body(body))
def rfc6376_split_canon(
data: bytes,
head_subset: Optional[Set[bytes]] = None,
head_canon: bool = False,
body_canon: bool = False,
) -> Tuple[Headers, Optional[bytes]]:
r"""
Parse an RFC 5322 message into headers and body.
Performs RFC 5322 normalisation, and optional canonicalisation.
>>> rfc6376_split_canon(b"To: Recipient\r\n\r\nBody", head_canon=True)
([[b'to', b'Recipient\r\n']], b'Body')
"""
# Convert to RFC 5322 ending normal form
suffix: bytes = rfc5322_endings(data)
# Parse the message without any canonicalisation
headers: Headers
body: Optional[bytes]
headers, body = rfc6376_split(suffix)
# Optional head canonicalisation (DKIM relaxed)
if head_canon is True:
headers = rfc6376_relaxed_head(headers)
# Optional header subsetting
if head_subset is not None:
headers = [kv for kv in headers if kv[0].lower() in head_subset]
# Optional body canonicalisation (DKIM simple)
if body_canon is True:
# The body result is now guaranteed to be bytes
# Which makes this function polymorphic really
# This is not reflected in its type signature
headers, body = rfc6376_simple_holistic(headers, body)
return (headers, body)
def rfc6376_join(headers: Headers, body: Optional[bytes] = None) -> bytes:
r"""
Combines rfc6376_split* output into an RFC 5322 message.
>>> rfc6376_join([[b'To', b' Recipient\r\n']], b'Body')
b'To: Recipient\r\n\r\nBody'
"""
signable: bytes = b"".join([b":".join(header) for header in headers])
if body is not None:
# In some cases, the headers may not end with \r\n
# This happens when the message abruptly ends in headers
# E.g. "" or "k:v"
# This means we have to attach it here for the rest to make sense
# if not signable.endswith(b"\r\n"):
# signable += b"\r\n"
# An empty body becomes b"\r\n" in DKIM canonicalisation
# Therefore we can either have None or b"\r\n" here
# But body cannot be b""
signable += b"\r\n" + body
return signable
def rfc6376_reformed(data: bytes) -> bytes:
r"""
Splits and then combines an RFC 5322 message.
Performs light normalisation.
Does not normalise line endings, and does not canonicalise.
>>> rfc6376_reformed(b"To")
b'To:'
>>> rfc6376_reformed(b"To: Recipient\n")
b'To: Recipient\n'
"""
headers: Headers
body: Optional[bytes]
headers, body = rfc6376_split(data)
return rfc6376_join(headers, body)
def rfc6376_reformed_canon(
data: bytes,
head_subset: Optional[Set[bytes]] = None,
head_canon: bool = False,
body_canon: bool = False,
lid: Optional[bytes] = None,
) -> Tuple[bytes, bytes]:
r"""
Splits and then combines an RFC 5322 message.
Performs RFC 5322 normalisation, and optional canonicalisation.
>>> rfc6376_reformed_canon(b"To: Recipient\n\nBody", head_canon=True)
(b'', b'to:Recipient\r\n\r\nBody')
"""
headers: Headers
body: Optional[bytes]
headers, body = rfc6376_split_canon(
data,
head_subset=head_subset,
head_canon=head_canon,
body_canon=body_canon,
)
# print(headers, body)
k: bytes
v: bytes
# If any List-Id header is set to the LID, lid is empty
for (k, v) in headers:
if k == b"list-id":
if v == lid:
lid = None
break
# Construct hashable bytes from the parsed message
return (lid or b"", rfc6376_join(headers, body))
def rfc6376_rascal(
data: bytes, lid: Optional[bytes] = None
) -> Tuple[bytes, bytes]:
r"""
Performs RFC 5322 normalisation.
Performs RFC 6376 DKIM relaxed/simple canonicalisation.
Selects a subset of headers based on RFC 4871.
>>> rfc6376_rascal(b"To: Recipient\nOther: Value\n\n")
(b'', b'to:Recipient\r\n\r\n\r\n')
"""
return rfc6376_reformed_canon(
data,
head_subset=rfc4871_subset,
head_canon=True,
body_canon=True,
lid=lid,
)
def pibble32(data: bytes) -> str:
r"""
Base32 encodes bytes with alphabet 0-9 b-d f-h j-t v-z.
>>> pibble32(b"\xca\xfe\xc0\xff\xee")
'sczd1zzg'
"""
table: bytes = bytes.maketrans(
b"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567",
b"0123456789bcdfghjklmnopqrstvwxyz",
)
encoded: bytes = base64.b32encode(data)
return str(encoded.translate(table), "ascii")
def unpibble32(text: str) -> bytes:
r"""
Base32 decodes bytes with alphabet 0-9 b-d f-h j-t v-z.
>>> unpibble32("sczd1zzg")
b'\xca\xfe\xc0\xff\xee'
"""
encoded: bytes = bytes(text, "ascii")
table: bytes = bytes.maketrans(
b"0123456789bcdfghjklmnopqrstvwxyz",
b"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567",
)
return base64.b32decode(encoded.translate(table))
def dkim_id(data: bytes, lid: Optional[bytes] = None) -> str:
"""
The DKIM-ID is the custom base32 encoded truncated SHA-256 HMAC
As this is fixed length, padding is removed from the output
>>> dkim_id(b"")
'8fgp2do75oqo6qd08vs4p7dpp1gj4vjn'
"""
hashable: bytes
lid, hashable = rfc6376_rascal(data, lid)
digest_256: bytes = hmac.digest(lid, hashable, "sha256")
truncated_bits: int = 160
return pibble32(digest_256[: truncated_bits // 8])
def main() -> None:
from sys import argv
from typing import BinaryIO
if len(argv) == 2:
f: BinaryIO
with open(argv[1], "rb") as f:
print(dkim_id(f.read()))
elif len(argv) == 3: # add lid
f: BinaryIO
with open(argv[1], "rb") as f:
print(dkim_id(f.read(), argv[2].encode('utf-8')))
else:
from sys import stdin
print(dkim_id(stdin.buffer.read()))
if __name__ == "__main__":
main()