blob: 7852d938ab704e193872bb2f9180450c3ef36ddd [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import deque, namedtuple
from dataclasses import dataclass
import optparse
import time
import uuid
from typing import Optional, Union
from proton import (Condition, Delivery, Described, Disposition, DispositionType,
Endpoint, Link, Sender, Message, Terminus, DeclaredDisposition,
RejectedDisposition, TransactionalDisposition)
from proton.handlers import MessagingHandler
from proton.reactor import Container
class Queue:
def __init__(self, broker: Broker, name: str, dynamic: bool = False):
self.broker: Broker = broker
self.name: str = name
self.dynamic: bool = dynamic
self.queue: deque[Message] = deque()
self.consumers: list[Sender] = []
def subscribe(self, consumer: Sender):
self.consumers.append(consumer)
def unsubscribe(self, consumer: Sender):
if consumer in self.consumers:
self.consumers.remove(consumer)
def removable(self):
return len(self.consumers) == 0 and (self.dynamic or len(self.queue) == 0)
def publish(self, message: Message):
self.queue.append(message)
self.dispatch()
def dispatch(self, consumer: Optional[Sender] = None):
if consumer:
c = [consumer]
else:
c = self.consumers
while self._deliver_to(c):
pass
def _deliver_to(self, consumers: list[Sender]):
try:
result = False
for c in consumers:
if c.credit:
message = self.queue.popleft()
self.broker.deliver(c, message)
result = True
return result
except IndexError: # no more messages
return False
UnsettledDelivery = namedtuple('UnsettledDelivery', ['address', 'message'])
class TransactionAction(ABC):
@abstractmethod
def commit(self, broker: Broker) -> None: ...
@abstractmethod
def rollback(self, broker: Broker) -> None: ...
@dataclass
class QueueMessage(TransactionAction):
delivery: Delivery
message: Message
address: str
def commit(self, broker: Broker) -> None:
broker.publish(self.message, self.address)
self.delivery.settle()
def rollback(self, broker: Broker) -> None:
pass
@dataclass
class RejectDelivery(TransactionAction):
delivery: Delivery
def commit(self, broker: Broker) -> None:
self.delivery.settle()
def rollback(self, broker: Broker) -> None:
pass
@dataclass
class DeliveryUpdate(TransactionAction):
delivery: Delivery
commit_outcome: DispositionType
def commit(self, broker: Broker) -> None:
broker.delivery_outcome(self.delivery, self.commit_outcome)
def rollback(self, broker: Broker) -> None:
pass
class Transaction:
"""
Simple container for broker-managed transactions.
"""
def __init__(
self,
tid: bytes,
start_time: float,
):
self.tid = tid
self.start_time = start_time
self.actions: list[TransactionAction] = []
def add_action(self, action: TransactionAction) -> None:
self.actions.append(action)
def commit(self, broker: Broker) -> Optional[Condition]:
for action in self.actions:
action.commit(broker)
return None
def rollback(self, broker: Broker) -> None:
for action in self.actions:
action.rollback(broker)
class Broker(MessagingHandler):
def __init__(self, url: str, verbose: bool, redelivery_limit: int, txn_timeout: float) -> None:
super().__init__(auto_accept=False)
self.verbose = verbose
self.url = url
self.redelivery_limit = redelivery_limit
self.txn_timeout = txn_timeout
self.queues: dict[str, Queue] = {}
self.unsettled_deliveries: dict[Delivery, UnsettledDelivery] = {}
self.txns: dict[bytes, Transaction] = {}
self.acceptor = None
def _verbose_print(self, message):
if self.verbose:
print(message)
def _queue(self, address, dynamic=False):
if address not in self.queues:
self.queues[address] = Queue(self, address, dynamic)
self._verbose_print(f"{address=}: Created")
return self.queues[address]
def on_start(self, event):
self.acceptor = event.container.listen(self.url)
def on_connection_opening(self, event):
event.connection.offered_capabilities = 'ANONYMOUS-RELAY'
def on_link_opening(self, event):
link = event.link
if link.is_sender:
dynamic = link.remote_source.dynamic
if dynamic or link.remote_source.address:
address = str(uuid.uuid4()) if dynamic else link.remote_source.address
link.source.address = address
self._queue(address, dynamic).subscribe(link)
self._verbose_print(f"{link=}: Subscribed: {address=}")
elif link.remote_target.type == Terminus.COORDINATOR:
# Set up transaction coordinator
# Should check for compatible capabilities
# requested = link.remote_target.capabilities.get_object()
link.target.type = Terminus.COORDINATOR
link.target.copy(link.remote_target)
link._txns: set[Transaction] = set()
elif link.remote_target.address:
link.target.address = link.remote_target.address
def _unsubscribe(self, link):
address = link.source.address
if address in self.queues:
q = self.queues[address]
q.unsubscribe(link)
self._verbose_print(f"{link=}: Unsubscribed: {address=}")
if q.removable():
del q
self._verbose_print(f"{address=}: Removed")
def _modify_delivery(self, delivery: Delivery, message: Message, address: str):
disposition = delivery.remote
# If not deliverable don't requeue
if disposition.undeliverable:
self._verbose_print(f"{delivery.tag=}: Modified: Undeliverable: {message.id=}")
# Don't requeue the message
return
# Check if we need to update the delivery count
if disposition.failed:
if message.delivery_count >= self.redelivery_limit:
self._verbose_print(f"{delivery.tag=}: Modified: Redelivery limit exceeded: {message.id=}")
# Don't requeue the message
return
# Update the delivery count
message.delivery_count += 1
# Requeue the message from the delivery
self._verbose_print(f"{delivery.tag=}: Modified: {message.id=} Requeued: {address=}")
self._queue(address).publish(message)
def delivery_outcome(self, delivery: Delivery, outcome: Union[int, DispositionType]):
unsettled_delivery = self.unsettled_deliveries[delivery]
message = unsettled_delivery.message
address = unsettled_delivery.address
if outcome == Disposition.ACCEPTED:
self._verbose_print(f"{delivery.tag=}: Accepted: {message.id=}")
# Delivery was accepted - nothing further to do
elif outcome == Disposition.REJECTED:
self._verbose_print(f"{delivery.tag=}: Rejected: {message.id=}")
# Delivery was rejected - nothing further to do
elif outcome == Disposition.RELEASED:
self._verbose_print(f"{delivery.tag=}: Released: {message.id=} Requeued: {address=}")
# Requeue the message from the delivery
self._queue(address).publish(message)
elif outcome == Disposition.MODIFIED:
self._modify_delivery(delivery, message, address)
delivery.settle()
del unsettled_delivery
def _declare_txn(self) -> Transaction:
tid = bytes(uuid.uuid4().bytes)
txn = Transaction(tid, time.monotonic())
self.txns[tid] = txn
return txn
def _discharge_txn(self, txn: Transaction, failed) -> Optional[Condition]:
error = None
tid = txn.tid
if not failed:
# Commit
txn_time = time.monotonic() - txn.start_time
if self.txn_timeout > 0 and txn_time > self.txn_timeout:
error = Condition('amqp:transaction:timeout', f"timeout: {self.txn_timeout}s time: {txn_time}s")
else:
error = txn.commit(self)
if error is not None:
self._verbose_print(f"{tid=}: Commit failed: {error} -> rollback")
txn.rollback(self)
else:
# Rollback
self._verbose_print(f"{tid=}: Rollback")
txn.rollback(self)
del self.txns[tid]
return error
def _coordinator_message(self, msg, delivery):
body = msg.body
if isinstance(body, Described):
link = delivery.link
d = body.descriptor
if d == "amqp:declare:list":
# Allocate transaction id
txn = self._declare_txn()
self._verbose_print(f"{txn.tid=}: Declare")
delivery.local = DeclaredDisposition(txn.tid)
link._txns.add(txn)
elif d == "amqp:discharge:list":
# Commit/abort the transaction (commit may fail and be rejected).
value = body.value
tid = bytes(value[0])
failed = bool(value[1])
txn = self.txns.get(tid)
if txn is not None:
error = self._discharge_txn(txn, failed)
if error is None:
delivery.update(Disposition.ACCEPTED)
else:
delivery.update(RejectedDisposition(error))
link._txns.remove(txn)
else:
self._verbose_print(f"{tid=}: Discharge unknown txn-id: {failed=}")
delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id')))
delivery.settle()
def on_link_closing(self, event):
link = event.link
if link.is_sender:
self._unsubscribe(link)
elif link.target.type == Terminus.COORDINATOR:
# Abort any remaining active transactions
for txn in link._txns:
self._discharge_txn(txn, failed=True)
link._txns.clear()
def _remove_stale_consumers(self, connection):
for link in connection.links(Endpoint.REMOTE_ACTIVE):
if link.is_sender:
self._unsubscribe(link)
def _abort_active_transactions(self, connection):
for link in connection.links(Endpoint.LOCAL_ACTIVE):
if link.target.type == Terminus.COORDINATOR:
# Abort any remaining active transactions
for txn in link._txns:
self._discharge_txn(txn, failed=True)
link._txns.clear()
def on_connection_closing(self, event):
connection = event.connection
self._remove_stale_consumers(connection)
self._abort_active_transactions(connection)
def on_disconnected(self, event):
connection = event.connection
self._remove_stale_consumers(connection)
self._abort_active_transactions(connection)
def on_sendable(self, event):
link: Link = event.link
self._queue(link.source.address).dispatch(link)
def on_message(self, event):
link: Link = event.link
delivery: Delivery = event.delivery
message = event.message
if link.target.type == Terminus.COORDINATOR:
# Deal with special transaction messages
self._coordinator_message(message, delivery)
return
address = link.target.address
if address is None:
address = message.address
ldisposition = None
if address is None:
self._verbose_print(f"{message.id=}: Message without address")
ldisposition = RejectedDisposition(Condition('amqp:link:invalid-address'))
else:
ldisposition = Disposition.ACCEPTED
# Is this a transactioned message?
if delivery.remote_state == Disposition.TRANSACTIONAL_STATE:
rdisposition = delivery.remote
tid = rdisposition.id
if tid in self.txns:
if address:
self._verbose_print(f"{tid=}: Message: {message.id=}")
self.txns[tid].add_action(QueueMessage(delivery, message, address))
else:
self.txns[tid].add_action(RejectDelivery(delivery))
delivery.update(TransactionalDisposition(tid, ldisposition))
return
else:
self._verbose_print(f"{tid=}: Message: unknown txn-id")
delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id')))
delivery.settle()
return
if address:
self.publish(message, address)
delivery.update(ldisposition)
delivery.settle()
def publish(self, message, address):
queue = self._queue(address)
queue.publish(message)
self._verbose_print(f"{message.id=} Queued: {queue.name=}")
def deliver(self, consumer, message):
delivery = consumer.send(message)
address = consumer.source.address
self.unsettled_deliveries[delivery] = UnsettledDelivery(address, message)
self._verbose_print(f"{delivery.tag=}: Sent: {message.id=} to {address=}")
def on_delivery_updated(self, event):
"""Handle all delivery updates for the link."""
delivery = event.delivery
disposition_type = delivery.remote_state
# Is this a transactioned delivery update?
if disposition_type == Disposition.TRANSACTIONAL_STATE:
disposition = delivery.remote
tid = disposition.id
outcome = disposition.outcome_type
if tid in self.txns:
self._verbose_print(f"{tid=}: Delivery update: outcome={outcome}")
self.txns[tid].add_action(DeliveryUpdate(delivery, outcome))
return
else:
self._verbose_print(f"{tid=}: Delivery update: unknown txn-id")
delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id')))
else:
self.delivery_outcome(delivery, disposition_type)
# The delivery is settled in every case except a valid transaction
# where the outcome is not yet known until the transaction is discharged.
delivery.settle()
def main():
parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-v", "--verbose", action="store_true", default=False,
help="enable verbose output (default %default)")
parser.add_option("-a", "--address", default="localhost:5672",
help="address router listens on (default %default)")
parser.add_option("-l", "--redelivery-limit", default=5,
help="maximum redelivery attempts (default %default)")
parser.add_option("-t", "--txn-timeout", default=0,
help="transaction timeout in seconds (default %default)")
opts, args = parser.parse_args()
try:
Container(Broker(opts.address, opts.verbose, int(opts.redelivery_limit), float(opts.txn_timeout))).run()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()