blob: c3c252aeb86347ea3309a3fe03cd1096c4a1a5af [file] [log] [blame]
# /*
# * Licensed to the Apache Software Foundation (ASF) under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The ASF licenses this file to You under the Apache License, Version 2.0
# * (the "License"); you may not use this file except in compliance with
# * the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */
import __init__
from librocketmqclientpython import *
import time
import sys
topic = 'test-topic-normal'
topic_orderly = 'test-topic-normal-orderly'
name_srv = '127.0.0.1:9876'
tag = 'rmq-tag'
consumer_group = 'test-consumer-group'
consumer_group_orderly = 'test-topic-normal-orderly_group'
totalMsg = 0
def sigint_handler(signum, frame):
global is_sigint_up
is_sigint_up = True
sys.exit(0)
def consumer_message(msg, args):
global totalMsg
totalMsg += 1
print 'total count %d' % totalMsg
print 'topic=%s' % GetMessageTopic(msg)
print 'tag=%s' % GetMessageTags(msg)
print 'body=%s' % GetMessageBody(msg)
print 'msg id=%s' % GetMessageId(msg)
print 'map.keys %s' % GetMessageKeys(msg)
print 'map.name %s' % GetMessageProperty(msg, 'name')
print 'map.id %s' % GetMessageProperty(msg, 'id')
return CConsumeStatus.E_CONSUME_SUCCESS
def init_consumer(_group, _topic, _tag):
consumer = CreatePushConsumer(_group)
SetPushConsumerNameServerAddress(consumer, name_srv)
SetPushConsumerThreadCount(consumer, 1)
SetPushConsumerLogLevel(consumer, CLogLevel.E_LOG_LEVEL_INFO)
SetPushConsumerMessageModel(consumer, CMessageModel.CLUSTERING)
Subscribe(consumer, _topic, _tag)
RegisterMessageCallback(consumer, consumer_message, None)
StartPushConsumer(consumer)
print 'consumer is ready...'
return consumer
def start_one_consumer(_group, _topic, _tag):
consumer = init_consumer(_group, _topic, _tag)
i = 1
while i <= 10:
print 'clock: ' + str(i)
i += 1
time.sleep(10)
ShutdownPushConsumer(consumer)
DestroyPushConsumer(consumer)
print("Consumer Down....")
def start_orderly_consumer():
consumer = init_consumer(consumer_group_orderly, topic_orderly, "*")
i = 1
while i <= 10:
print 'clock: ' + str(i)
i += 1
time.sleep(10)
ShutdownPushConsumer(consumer)
DestroyPushConsumer(consumer)
print("Consumer Down....")
if __name__ == '__main__':
start_orderly_consumer()