blob: c38f65195bc20c4eb4f4a16a51f7c7505ac48b46 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
import sys
import threading
from proton.reactor import ApplicationEvent, Container, EventInjector
from proton.handlers import MessagingHandler, TransactionHandler
class TxRecv(MessagingHandler, TransactionHandler):
def __init__(self):
super(TxRecv, self).__init__(prefetch=0, auto_accept=False)
def on_start(self, event):
self.container = event.container
self.conn = self.container.connect("localhost:5672")
self.receiver = self.container.create_receiver(self.conn, "examples")
self.container.declare_transaction(self.conn, handler=self, settle_before_discharge=True)
self.transaction = None
def on_message(self, event):
print(event.message.body)
self.transaction.accept(event.delivery)
def on_transaction_declared(self, event):
self.transaction = event.transaction
print("transaction declared")
def on_transaction_committed(self, event):
print("transaction committed")
self.container.declare_transaction(self.conn, handler=self)
def on_transaction_aborted(self, event):
print("transaction aborted")
self.container.declare_transaction(self.conn, handler=self)
def on_commit(self, event):
self.transaction.commit()
def on_abort(self, event):
self.transaction.abort()
def on_fetch(self, event):
self.receiver.flow(1)
def on_quit(self, event):
c = self.receiver.connection
self.receiver.close()
c.close()
try:
reactor = Container(TxRecv())
events = EventInjector()
reactor.selectable(events)
thread = threading.Thread(target=reactor.run)
thread.daemon=True
thread.start()
print("Enter 'fetch', 'commit' or 'abort'")
while True:
line = sys.stdin.readline()
if line:
events.trigger(ApplicationEvent(line.strip()))
else:
break
except KeyboardInterrupt: pass