blob: 179e1f147caa572ee3d6ce83ab55320e9535504b [file] [log] [blame]
# /*
# * Licensed to the Apache Software Foundation (ASF) under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The ASF licenses this file to You under the Apache License, Version 2.0
# * (the "License"); you may not use this file except in compliance with
# * the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */
import __init__
from librocketmqclientpython import *
import time
topic = 'test-topic-normal'
topic_orderly = 'test-topic-normal-orderly'
name_srv = '127.0.0.1:9876'
def init_producer():
producer = CreateProducer('TestProducer')
SetProducerLogLevel(producer, CLogLevel.E_LOG_LEVEL_INFO)
SetProducerNameServerAddress(producer, name_srv)
StartProducer(producer)
return producer
def transaction_local_checker(msg):
print 'begin check for msg: ' + GetMessageId(msg)
return TransactionStatus.E_COMMIT_TRANSACTION
def init_transaction_producer():
producer = CreateTransactionProducer('TransactionTestProducer', transaction_local_checker)
SetProducerLogLevel(producer, CLogLevel.E_LOG_LEVEL_INFO)
SetProducerNameServerAddress(producer, name_srv)
StartProducer(producer)
return producer
producer = init_transaction_producer()
tag = 'rmq-tag'
key = 'rmq-key'
def send_messages_sync(count):
for a in range(count):
print 'start sending...'
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print '[RMQ-PRODUCER]start sending...done, msg id = ' + \
result.GetMsgId()
def send_messages_sync_with_map(count):
print 'sending message with properties...id, name'
for a in range(count):
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageProperty(msg, 'name', 'test')
SetMessageProperty(msg, 'id', str(time.time()))
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print '[RMQ-PRODUCER]start sending...done, msg id = ' + \
result.GetMsgId()
def send_messages_with_tag_sync(count):
print 'sending message with tag...' + tag
for a in range(count):
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageTags(msg, tag)
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print 'msg id = ' + result.GetMsgId()
def send_messages_with_tag_and_map_sync(count):
print 'sending message with tag...' + tag + ' and properties id, name'
for a in range(count):
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageProperty(msg, 'name', 'test')
SetMessageProperty(msg, 'id', str(time.time()))
SetMessageTags(msg, tag)
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print 'msg id = ' + result.GetMsgId()
def send_messages_with_key_sync(count):
print 'sending message with keys...' + key
for a in range(count):
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print 'msg id = ' + result.GetMsgId()
def send_messages_with_key_and_map_sync(count):
print 'sending message with keys...' + key + ' and properties id, name'
for a in range(count):
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageProperty(msg, 'name', 'test')
SetMessageProperty(msg, 'id', str(time.time()))
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print 'msg id = ' + result.GetMsgId()
def send_messages_with_key_and_tag_sync(count):
key = 'rmq-key'
print 'sending message with keys and tag...' + key + ', ' + tag
for a in range(count):
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print 'msg id = ' + result.GetMsgId()
def send_messages_with_key_and_tag_and_map_sync(count):
key = 'rmq-key'
print 'sending message with keys and tag...' + \
key + ', ' + tag + ' and properties id, name'
for a in range(count):
body = 'hi rmq, now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageProperty(msg, 'name', 'test')
SetMessageProperty(msg, 'id', str(time.time()))
SetMessageTags(msg, tag)
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print 'msg id = ' + result.GetMsgId()
def send_messages_oneway(count):
for a in range(count):
print 'start sending...'
body = 'hi rmq, this is oneway message. now is ' + \
time.strftime('%Y.%m.%d', time.localtime(time.time()))
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageProperty(msg, 'name', 'test')
SetMessageProperty(msg, 'id', str(time.time()))
SendMessageOneway(producer, msg)
DestroyMessage(msg)
print 'send oneway is over'
def send_delay_messages(producer, topic, count):
key = 'rmq-key'
print 'start sending message'
tag = 'test'
for n in range(count):
body = 'hi rmq, now is' + str(time.time())
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageProperty(msg, 'name', 'hello world')
SetMessageProperty(msg, 'id', str(time.time()))
SetMessageTags(msg, tag)
# messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SetDelayTimeLevel(msg, 5)
print str(msg)
result = SendMessageSync(producer, msg)
DestroyMessage(msg)
print 'msg id =' + result.GetMsgId()
def send_message_orderly(count):
key = 'rmq-key'
print 'start sending order-ly message'
tag = 'test'
for n in range(count):
body = 'hi rmq orderly-message, now is' + str(n)
msg = CreateMessage(topic_orderly)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)
result = SendMessageOrderly(producer, msg, 1, None, calc_which_queue_to_send)
DestroyMessage(msg)
print 'msg id =' + result.GetMsgId()
def send_message_orderly_with_shardingkey(count):
key = 'rmq-key'
print 'start sending sharding key order-ly message'
tag = 'test'
for n in range(count):
body = 'hi rmq sharding orderly-message, now is' + str(n)
msg = CreateMessage(topic_orderly)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)
result = SendMessageOrderlyByShardingKey(producer, msg, 'orderId')
DestroyMessage(msg)
print 'msg id =' + result.GetMsgId()
def calc_which_queue_to_send(size, msg, arg): ## it is index start with 0....
return 0
def send_message_async(count):
key = 'rmq-key'
print 'start sending message'
tag = 'test'
for n in range(count):
body = 'hi rmq message, now is' + str(n)
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)
SendMessageAsync(producer, msg, send_message_async_success, send_message_async_fail)
DestroyMessage(msg)
print 'send async message done'
time.sleep(10000)
def send_message_async_success(result, msg):
print 'send success'
print 'msg id =' + result.GetMsgId()
def send_message_async_fail(msg, exception):
print 'send message failed'
print 'error msg: ' + exception.GetMsg()
def send_transaction_message(count):
key = 'rmq-key'
print 'start send transaction message'
tag = 'test'
for n in range(count):
body = 'hi rmq message, now is' + str(n)
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)
SendMessageInTransaction(producer, msg, transaction_local_execute, None)
print 'send transaction message done'
time.sleep(10000)
def transaction_local_execute(msg, args):
print 'begin execute local transaction'
return TransactionStatus.E_UNKNOWN_TRANSACTION
if __name__ == '__main__':
send_transaction_message(10)