blob: 87d498268766af7b9866b9648d93355fca000d99 [file] [log] [blame]
# -*- coding: utf-8 -*-
import time
import threading
import pytest
from rocketmq.client import Message, SendStatus
from rocketmq.exceptions import PushConsumerStartFailed
from rocketmq.consts import MessageProperty
def _send_test_msg(producer):
msg = Message('test')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body('XXXX')
ret = producer.send_sync(msg)
assert ret.status == SendStatus.OK
def test_pull_consumer(producer, pull_consumer):
_send_test_msg(producer)
time.sleep(5)
msg = next(pull_consumer.pull('test'))
assert msg.body.decode('utf-8') == 'XXXX'
def test_push_consumer_no_subscription_start_fail(push_consumer):
with pytest.raises(PushConsumerStartFailed):
push_consumer.start()
def test_push_consumer(producer, push_consumer):
stop_event = threading.Event()
_send_test_msg(producer)
def on_message(msg):
stop_event.set()
assert msg.body.decode('utf-8') == 'XXXX'
assert msg[MessageProperty.KEYS]
push_consumer.subscribe('test', on_message)
push_consumer.start()
while not stop_event.is_set():
time.sleep(10)
def test_push_consumer_reconsume_later(producer, push_consumer):
stop_event = threading.Event()
_send_test_msg(producer)
raised_exc = threading.Event()
def on_message(msg):
if not raised_exc.is_set():
raised_exc.set()
raise Exception('Should reconsume later')
stop_event.set()
assert msg.body.decode('utf-8') == 'XXXX'
assert msg[MessageProperty.KEYS]
push_consumer.subscribe('test', on_message)
push_consumer.start()
while not stop_event.is_set():
time.sleep(10)