Skip to content
Snippets Groups Projects
Commit 841919a3 authored by Nicolas Dandrimont's avatar Nicolas Dandrimont
Browse files

inbound_email: add support for signed email addresses

These utilities allow us to generate addresses of the form
`<localpart>+<integer>.<signature>@<domain>`, where the integer is the
primary key of a given object for which we want to track email
exchanges. The signature prevents the addresses from being forged, that
is, the addresses have to be explicitly generated and displayed by the
web app to be discovered.

The counterpart function retrieves all relevant email addresses from the
list of recipients of an email message, and validates the signatures to
recover the integer values that won't have been tampered with.

These new utilities are expected to be used in the views and signal
handlers pertaining to processing of inbound email messages.
parent 120076ee
No related branches found
No related tags found
1 merge request!1064inbound_email: add support for signed email addresses
......@@ -6,7 +6,13 @@
from dataclasses import dataclass
from email.headerregistry import Address
from email.message import EmailMessage
from typing import List, Optional
import logging
from typing import List, Optional, Set
from django.core.signing import Signer
from django.utils.crypto import constant_time_compare
logger = logging.getLogger(__name__)
def extract_recipients(message: EmailMessage) -> List[Address]:
......@@ -78,3 +84,83 @@ def recipient_matches(message: EmailMessage, address: str) -> List[AddressMatch]
ret.append(match)
return ret
ADDRESS_SIGNER_SEP = "."
"""Separator for email address signatures"""
def get_address_signer(salt: str) -> Signer:
"""Get a signer for the given seed"""
return Signer(salt=salt, sep=ADDRESS_SIGNER_SEP)
def get_address_for_pk(salt: str, base_address: str, pk: int) -> str:
"""Get the email address that will be able to receive messages to be logged in
this request."""
if "@" not in base_address:
raise ValueError("Base address needs to contain an @")
username, domain = base_address.split("@")
extension = get_address_signer(salt).sign(str(pk))
return f"{username}+{extension}@{domain}"
def get_pk_from_extension(salt: str, extension: str) -> int:
"""Retrieve the primary key for the given inbound address extension.
We reimplement `Signer.unsign`, because the extension can be casemapped at any
point in the email chain (even though email is, theoretically, case sensitive),
so we have to compare lowercase versions of both the extension and the
signature...
Raises ValueError if the signature couldn't be verified.
"""
value, signature = extension.rsplit(ADDRESS_SIGNER_SEP, 1)
expected_signature = get_address_signer(salt).signature(value)
if not constant_time_compare(signature.lower(), expected_signature.lower()):
raise ValueError(f"Invalid signature for extension {extension}")
return int(value)
def get_pks_from_message(
salt: str, base_address: str, message: EmailMessage
) -> Set[int]:
"""Retrieve the set of primary keys that were successfully decoded from the
recipients of the ``message`` matching ``base_address``.
This uses :func:`recipient_matches` to retrieve all the recipient addresses matching
``base_address``, then :func:`get_pk_from_extension` to decode the primary key and
verify the signature for every extension. To generate relevant email addresses, use
:func:`get_address_for_pk` with the same ``base_address`` and ``salt``.
Returns:
the set of primary keys that were successfully decoded from the recipients of the
``message``
"""
ret: Set[int] = set()
for match in recipient_matches(message, base_address):
extension = match.extension
if extension is None:
logger.debug(
"Recipient address %s cannot be matched to a request, ignoring",
match.recipient.addr_spec,
)
continue
try:
ret.add(get_pk_from_extension(salt, extension))
except ValueError:
logger.debug(
"Recipient address %s failed validation", match.recipient.addr_spec
)
continue
return ret
......@@ -130,3 +130,114 @@ def test_recipient_matches_casemapping():
matches = utils.recipient_matches(message, "match@example.com")
assert matches
assert matches[0].extension == "weirdCaseMapping"
def test_get_address_for_pk():
salt = "test_salt"
pks = [1, 10, 1000]
base_address = "base@example.com"
addresses = {
pk: utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk)
for pk in pks
}
assert len(set(addresses.values())) == len(addresses)
for pk, address in addresses.items():
localpart, _, domain = address.partition("@")
base_localpart, _, extension = localpart.partition("+")
assert domain == "example.com"
assert base_localpart == "base"
assert extension.startswith(f"{pk}.")
def test_get_address_for_pk_salt():
pk = 1000
base_address = "base@example.com"
addresses = [
utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk)
for salt in ["salt1", "salt2"]
]
assert len(addresses) == len(set(addresses))
def test_get_pks_from_message():
salt = "test_salt"
pks = [1, 10, 1000]
base_address = "base@example.com"
addresses = {
pk: utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk)
for pk in pks
}
message = EmailMessage()
message["To"] = "test@example.com"
assert utils.get_pks_from_message(salt, base_address, message) == set()
message = EmailMessage()
message["To"] = f"Test Address <{addresses[1]}>"
assert utils.get_pks_from_message(salt, base_address, message) == {1}
message = EmailMessage()
message["To"] = f"Test Address <{addresses[1]}>"
message["Cc"] = ", ".join(
[
f"Test Address <{addresses[1]}>",
f"Another Test Address <{addresses[10].lower()}>",
"A Third Address <irrelevant@example.com>",
]
)
assert utils.get_pks_from_message(salt, base_address, message) == {1, 10}
def test_get_pks_from_message_logging(caplog):
salt = "test_salt"
pks = [1, 10, 1000]
base_address = "base@example.com"
addresses = {
pk: utils.get_address_for_pk(salt=salt, base_address=base_address, pk=pk)
for pk in pks
}
message = EmailMessage()
message["To"] = f"Test Address <{base_address}>"
assert utils.get_pks_from_message(salt, base_address, message) == set()
relevant_records = [
record
for record in caplog.records
if record.name == "swh.web.inbound_email.utils"
]
assert len(relevant_records) == 1
assert relevant_records[0].levelname == "DEBUG"
assert (
f"{base_address} cannot be matched to a request"
in relevant_records[0].getMessage()
)
# Replace the signature with "mangle{signature}"
mangled_address = addresses[1].replace(".", ".mangle", 1)
message = EmailMessage()
message["To"] = f"Test Address <{mangled_address}>"
assert utils.get_pks_from_message(salt, base_address, message) == set()
relevant_records = [
record
for record in caplog.records
if record.name == "swh.web.inbound_email.utils"
]
assert len(relevant_records) == 2
assert relevant_records[0].levelname == "DEBUG"
assert relevant_records[1].levelname == "DEBUG"
assert f"{mangled_address} failed" in relevant_records[1].getMessage()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment