| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| Generator of signed advisory mails |
| """ |
| |
| from __future__ import absolute_import |
| |
| import re |
| import uuid |
| import hashlib |
| import smtplib |
| import textwrap |
| import email.utils |
| |
| from email.mime.multipart import MIMEMultipart |
| from email.mime.text import MIMEText |
| |
| try: |
| import gnupg |
| except ImportError: |
| import security._gnupg as gnupg |
| |
| import security.parser |
| |
| |
| class Mailer(object): |
| """ |
| Constructs signed PGP/MIME advisory mails. |
| """ |
| |
| def __init__(self, notification, sender, message_template, |
| release_date, dist_revision, *release_versions): |
| assert len(notification) > 0 |
| self.__sender = sender |
| self.__notification = notification |
| self.__message_content = self.__message_content( |
| message_template, release_date, dist_revision, release_versions) |
| |
| def __subject(self): |
| """ |
| Construct a subject line for the notification mail. |
| """ |
| |
| template = ('Confidential pre-notification of' |
| ' {multiple}Subversion {culprit}{vulnerability}') |
| |
| # Construct the {culprit} replacement value. If all advisories |
| # are either about the server or the client, use the |
| # appropriate value; for mixed server/client advisories, use |
| # an empty string. |
| culprit = set() |
| for metadata in self.__notification: |
| culprit |= metadata.culprit |
| assert len(culprit) > 0 |
| if len(culprit) > 1: |
| culprit = '' |
| elif self.__notification.Metadata.CULPRIT_CLIENT in culprit: |
| culprit = 'client ' |
| elif self.__notification.Metadata.CULPRIT_SERVER in culprit: |
| culprit = 'server ' |
| else: |
| raise ValueError('Unknown culprit ' + repr(culprit)) |
| |
| # Construct the format parameters |
| if len(self.__notification) > 1: |
| kwargs = dict(multiple='multiple ', culprit=culprit, |
| vulnerability='vulnerabilities') |
| else: |
| kwargs = dict(multiple='a ', culprit=culprit, |
| vulnerability='vulnerability') |
| |
| return template.format(**kwargs) |
| |
| def __message_content(self, message_template, |
| release_date, dist_revision, release_versions): |
| """ |
| Construct the message from the notification mail template. |
| """ |
| |
| # Construct the replacement arguments for the notification template |
| culprits = set() |
| advisories = [] |
| base_version_keys = self.__notification.base_version_keys() |
| for metadata in self.__notification: |
| culprits |= metadata.culprit |
| advisories.append( |
| ' * {}\n {}'.format(metadata.tracking_id, metadata.title)) |
| release_version_keys = set(security.parser.Patch.split_version(n) |
| for n in release_versions) |
| |
| multi = (len(self.__notification) > 1) |
| kwargs = dict(multiple=(multi and 'multiple ' or 'a '), |
| alert=(multi and 'alerts' or 'alert'), |
| culprits=self.__culprits(culprits), |
| advisories='\n'.join(advisories), |
| release_date=release_date.strftime('%d %B %Y'), |
| release_day=release_date.strftime('%d %B'), |
| base_versions = self.__versions(base_version_keys), |
| release_versions = self.__versions(release_version_keys), |
| dist_revision=str(dist_revision)) |
| |
| # Parse, interpolate and rewrap the notification template |
| wrapped = [] |
| content = security.parser.Text(message_template) |
| for line in content.text.format(**kwargs).split('\n'): |
| if len(line) > 0 and not line[0].isspace(): |
| for part in textwrap.wrap(line, |
| break_long_words=False, |
| break_on_hyphens=False): |
| wrapped.append(part) |
| else: |
| wrapped.append(line) |
| return security.parser.Text(None, '\n'.join(wrapped).encode('utf-8')) |
| |
| def __versions(self, versions): |
| """ |
| Return a textual representation of the set of VERSIONS |
| suitable for inclusion in a notification mail. |
| """ |
| |
| text = tuple(security.parser.Patch.join_version(n) |
| for n in sorted(versions)) |
| assert len(text) > 0 |
| if len(text) == 1: |
| return text[0] |
| elif len(text) == 2: |
| return ' and '.join(text) |
| else: |
| return ', '.join(text[:-1]) + ' and ' + text[-1] |
| |
| def __culprits(self, culprits): |
| """ |
| Return a textual representation of the set of CULPRITS |
| suitable for inclusion in a notification mail. |
| """ |
| |
| if self.__notification.Metadata.CULPRIT_CLIENT in culprits: |
| if self.__notification.Metadata.CULPRIT_SERVER in culprits: |
| return 'clients and servers' |
| else: |
| return 'clients' |
| elif self.__notification.Metadata.CULPRIT_SERVER in culprits: |
| return 'servers' |
| else: |
| raise ValueError('Unknown culprit ' + repr(culprits)) |
| |
| def __attachments(self): |
| filenames = set() |
| |
| def attachment(filename, description, encoding, content): |
| if filename in filenames: |
| raise ValueError('Named attachment already exists: ' |
| + filename) |
| filenames.add(filename) |
| |
| att = MIMEText('', 'plain', 'utf-8') |
| att.set_param('name', filename) |
| att.replace_header('Content-Transfer-Encoding', encoding) |
| att.add_header('Content-Description', description) |
| att.add_header('Content-Disposition', |
| 'attachment', filename=filename) |
| att.set_payload(content) |
| return att |
| |
| for metadata in self.__notification: |
| filename = metadata.tracking_id + '-advisory.txt' |
| description = metadata.tracking_id + ' Advisory' |
| yield attachment(filename, description, 'quoted-printable', |
| metadata.advisory.quoted_printable) |
| |
| for patch in metadata.patches: |
| filename = (metadata.tracking_id + |
| '-' + patch.base_version + '.patch') |
| description = (metadata.tracking_id |
| + ' Patch for Subversion ' + patch.base_version) |
| yield attachment(filename, description, 'base64', patch.base64) |
| |
| def generate_message(self): |
| message = SignedMessage( |
| self.__message_content, |
| self.__attachments()) |
| message['From'] = self.__sender |
| message['Reply-To'] = self.__sender |
| message['To'] = self.__sender # Will be replaced later |
| message['Subject'] = self.__subject() |
| message['Date'] = email.utils.formatdate() |
| |
| # Try to make the message-id refer to the sender's domain |
| address = email.utils.parseaddr(self.__sender)[1] |
| if not address: |
| domain = None |
| else: |
| domain = address.split('@')[1] |
| if not domain: |
| domain = None |
| |
| idstring = uuid.uuid1().hex |
| try: |
| msgid = email.utils.make_msgid(idstring, domain=domain) |
| except TypeError: |
| # The domain keyword was added in Python 3.2 |
| msgid = email.utils.make_msgid(idstring) |
| message["Message-ID"] = msgid |
| return message |
| |
| def send_mail(self, message, username, password, recipients=None, |
| host='mail-relay.apache.org', starttls=True, port=None): |
| if not port and starttls: |
| port = 587 |
| server = smtplib.SMTP(host, port) |
| if starttls: |
| server.starttls() |
| if username and password: |
| server.login(username, password) |
| |
| def send(message): |
| server.sendmail("From: " + message['From'], |
| "To: " + message['To'], |
| message.as_string()) |
| |
| if recipients is None: |
| # Test mode, send message back to originator to checck |
| # that contents and signature are OK. |
| message.replace_header('To', message['From']) |
| send(message) |
| else: |
| for recipient in recipients: |
| message.replace_header('To', recipient) |
| send(message) |
| server.quit() |
| |
| |
| class SignedMessage(MIMEMultipart): |
| """ |
| The signed PGP/MIME message. |
| """ |
| |
| def __init__(self, message, attachments, |
| gpgbinary='gpg', gnupghome=None, use_agent=True, |
| keyring=None, keyid=None): |
| |
| # Hack around the fact that the Pyton 2.x MIMEMultipart is not |
| # a new-style class. |
| try: |
| unicode # Doesn't exist in Python 3 |
| MIMEMultipart.__init__(self, 'signed') |
| except NameError: |
| super(SignedMessage, self).__init__('signed') |
| |
| payload = self.__payload(message, attachments) |
| signature = self.__signature( |
| payload, gpgbinary, gnupghome, use_agent, keyring, keyid) |
| |
| self.set_param('protocol', 'application/pgp-signature') |
| self.set_param('micalg', 'pgp-sha512') ####!!! GET THIS FROM KEY! |
| self.preamble = 'This is an OpenPGP/MIME signed message.' |
| self.attach(payload) |
| self.attach(signature) |
| |
| def __payload(self, message, attachments): |
| """ |
| Create the payload from the given MESSAGE and a |
| set of pre-cooked ATTACHMENTS. |
| """ |
| |
| payload = MIMEMultipart() |
| payload.preamble = 'This is a multi-part message in MIME format.' |
| |
| msg = MIMEText('', 'plain', 'utf-8') |
| msg.replace_header('Content-Transfer-Encoding', 'quoted-printable') |
| msg.set_payload(message.quoted_printable) |
| payload.attach(msg) |
| |
| for att in attachments: |
| payload.attach(att) |
| return payload |
| |
| def __signature(self, payload, |
| gpgbinary, gnupghome, use_agent, keyring, keyid): |
| """ |
| Sign the PAYLOAD and return the detached signature as |
| a MIME attachment. |
| """ |
| |
| # RFC3156 section 5 says line endings in the signed message |
| # must be canonical <CR><LF>. |
| cleartext = re.sub(r'\r?\n', '\r\n', payload.as_string()) |
| |
| gpg = gnupg.GPG(gpgbinary=gpgbinary, gnupghome=gnupghome, |
| use_agent=use_agent, keyring=keyring) |
| signature = gpg.sign(cleartext, |
| keyid=keyid, detach=True, clearsign=False) |
| sig = MIMEText('') |
| sig.set_type('application/pgp-signature') |
| sig.set_charset(None) |
| sig.set_param('name', 'signature.asc') |
| sig.add_header('Content-Description', 'OpenPGP digital signature') |
| sig.add_header('Content-Disposition', |
| 'attachment', filename='signature.asc') |
| sig.set_payload(str(signature)) |
| return sig |