Merge pull request #4 from apache/master
merge from apache master
diff --git a/.gitignore b/.gitignore
index 990936c..b232131 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,8 @@
.idea/
-cmake-build-debug/
+cmake-build-*/
+
+*.pyc
+*.so
+
+bin/
+
diff --git a/CMakeLists.txt b/CMakeLists.txt
index bc9813a..7df3c84 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -31,6 +31,13 @@
cmake_policy(SET CMP0005 NEW)
endif ()
+if (POLICY CMP0048)
+ cmake_policy(SET CMP0048 NEW)
+endif ()
+
+if (POLICY CMP0064)
+ cmake_policy(SET CMP0064 NEW)
+endif ()
# First, declare project (important for prerequisite checks).
project(rocketmq-client-python)
@@ -56,13 +63,14 @@
endif ()
set(CXX_FLAGS
- #-std=c++0x
+ -std=c++11
-g
-Wall
-Wno-deprecated
-fPIC
-fno-strict-aliasing
-Wno-unused-result
+ -Wno-unused-local-typedef
# -finline-limit=1000
# -Wextra
# -pedantic
@@ -151,7 +159,7 @@
if (WIN32)
find_package(Boost REQUIRED COMPONENTS python)
elseif (APPLE)
- find_package(Boost REQUIRED COMPONENTS python27)
+ find_package(Boost REQUIRED COMPONENTS python)
else ()
find_package(Boost REQUIRED COMPONENTS python)
endif (WIN32)
@@ -196,4 +204,14 @@
set(TIME_WITH_SYS_TIME 1)
set(HAVE_SOCKLEN_T 1)
+option(TEST "Build test cases" OFF)
+
+if (TEST)
+ enable_testing()
+ option(gtest_build_tests OFF)
+ add_subdirectory(third_party/googletest/googletest)
+ include_directories(SYSTEM ${gtest_SOURCE_DIR}/include ${gtest_SOURCE_DIR})
+ add_subdirectory(unitests)
+endif ()
+
add_subdirectory(project)
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..703c28b
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache RocketMQ
+Copyright 2016-2018 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
diff --git a/README.md b/README.md
index bd9728d..ca1409e 100644
--- a/README.md
+++ b/README.md
@@ -3,14 +3,8 @@
* RocketMQ Python client is developed on top of [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
----------
-## Features
-At present, this SDK supports
-* sending message in synchronous mode
-* consuming message using push model
-
-----------
## Quick Start
-* Step-by-step instruction are provided in [RocketMQ Client Python Introduction](https://github.com/apache/rocketmq-client-python/blob/master/doc/Introduction.md)
+* Step-by-step instruction are provided in [RocketMQ Client Python Introduction](https://github.com/apache/rocketmq-client-python/blob/master/doc/Introduction.md).
* Consult [RocketMQ Quick Start](https://rocketmq.apache.org/docs/quick-start/) to setup rocketmq broker and nameserver.
----------
diff --git a/doc/Introduction.md b/doc/Introduction.md
index 980269b..cebe492 100644
--- a/doc/Introduction.md
+++ b/doc/Introduction.md
@@ -1,15 +1,15 @@
----------
## RocketMQ Client Python
-### 1. Python Version
-* Python 2.7.x
+### 1. Python Runtime Version
+* python 2.7.x
-### 2. Dependency
+### 2. Dependency of Python Client
-* [librocketmq](https://github.com/apache/rocketmq-client-cpp)
+* CPP Core: [librocketmq](https://github.com/apache/rocketmq-client-cpp)
* python-devel 2.7.x
-* boost-python 1.53.0+
+* boost-python 1.58.0
### 3. Build and Install
#### Linux Platform
@@ -22,34 +22,37 @@
* Install dependency:
1. python-devel
- ```
- sudo yum install python-devel
- ```
- 2. boost-python
- ```
- sudo yum install boost-python
- sudo yum install boost-python-devel
- ```
- 3. [librocketmq](https://github.com/apache/rocketmq-client-cpp),choose one method below:
+ ```
+ sudo yum install python-devel
+ ```
+
+ 2. zlib-devel
+ ```
+ sudo yum install zlib-devel
+ ```
+ 3. boost-python
+ ```
+ sudo sh install_boostpython.sh
+ ```
+ 4. [librocketmq](https://github.com/apache/rocketmq-client-cpp), choose one method below:
- - make and install the rocketmq library manually,[Here](https://github.com/apache/rocketmq-client-cpp)
+ - make and install the RocketMQ library manually from [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp)
- - quick install
- ```
- mkdir rocketmqlib
- cd rocketmqlib
- wget https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com/cpp-client/linux/1.0.2/RHEL7.x/librocketmq.tar.gz
- tar -xzf librocketmq.tar.gz
- sudo cp librocketmq.so librocketmq.a /usr/local/lib/
- sudo cp -r rocketmq /usr/local/include/
- ```
+ - quick install, please choose the suitable dynamic library version for your system.
+ ```
+ mkdir rocketmqlib
+ cd rocketmqlib
+ wget https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com/cpp-client/linux/1.2.0/RHEL7.x/librocketmq.tar.gz
+ tar -xzf librocketmq.tar.gz
+ sudo cp librocketmq.so librocketmq.a /usr/local/lib/
+ sudo cp -r rocketmq /usr/local/include/
+ ```
-
+
* Make and install module manually
-
- 1. Using Dynamic Rocketmq and boost python libraries are recommended.
+ 1. Using Dynamic RocketMQ and boost python libraries are recommended.
```
- - mkdir build
+ - mkdir build && cd build
- cmake ../ -DBoost_USE_STATIC_LIBS=OFF -DROCKETMQ_USE_STATIC_LIBS=OFF
- make
- make install
@@ -57,7 +60,7 @@
2. Also you can using static libraries.
```
- - mkdir build
+ - mkdir build & cd build
- cmake ../ -DBoost_USE_STATIC_LIBS=ON -DROCKETMQ_USE_STATIC_LIBS=ON
- make
- make install
@@ -66,12 +69,71 @@
```
strings librocketmqclientpython.so |grep PYTHON_CLIENT_VERSION
```
+#### macOS Mojave 10.14.2
+* Install compile tools:
+ ```
+ - brew install make
+ - brew install cmake
+ - brew install gcc-c++
+ ```
+* Install dependency:
+
+ 1. python-devel
+ ```
+ brew install python-devel
+ ```
+
+ 2. zlib-devel
+ ```
+ brew install zlib-devel
+ ```
+ 3. boost-python
+ ```
+ sh install_boostpython.sh
+ ```
+ 4. [librocketmq](https://github.com/apache/rocketmq-client-cpp), choose one method below:
+
+ - make and install the RocketMQ library manually from [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp)
+
+ - quick install
+ ```
+ mkdir rocketmqlib
+ cd rocketmqlib
+ wget https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com/cpp-client/mac/1.2.0/librocketmq.tar.gz
+ tar -xzf librocketmq.tar.gz
+ cp librocketmq.dylib librocketmq.a /usr/local/lib/
+ cp -r rocketmq /usr/local/include/
+ ```
+
+* Make and install module manually
+ 1. Using Dynamic RocketMQ and boost python libraries are recommended.
+ ```
+ - mkdir build && cd build
+ - cmake ../ -DBoost_USE_STATIC_LIBS=OFF -DROCKETMQ_USE_STATIC_LIBS=OFF
+ - make
+ - make install
+ ```
+
+ 2. Also you can using static libraries.
+ ```
+ - mkdir build & cd build
+ - cmake ../ -DBoost_USE_STATIC_LIBS=ON -DROCKETMQ_USE_STATIC_LIBS=ON
+ - make
+ - make install
+ ```
+* Check verion
+ ```
+ strings librocketmqclientpython.so |grep PYTHON_CLIENT_VERSION
+ ```
+
----------
## How to use
- set LD_LIBRARY_PATH
- ``````
+ ```
export LD_LIBRARY_PATH=/usr/local/lib
+ ```
+
- import module
```
from librocketmqclientpython import *
@@ -86,29 +148,29 @@
```
- producer must invoke following interface:
```
- - producer =CreateProducer("please_rename_unique_group_name");
- - SetProducerNameServerAddress(producer,"please_rename_unique_name_server")
+ - producer = CreateProducer("please_rename_unique_group_name");
+ - SetProducerNameServerAddress(producer, "please_rename_unique_name_server")
- StartProducer(producer)
- - SendMessageSync(producer,msg)
+ - SendMessageSync(producer, msg)
- ShutdownProducer(producer)
- DestroyProducer(producer)
```
- how to consumer messages
```
- - def consumerMessage(msg):
- - topic = GetMessageTopic(msg)
- - body = GetMessageBody(msg)
- - tags = GetMessageTags(msg)
- - msgid = GetMessageId(msg)
- - handle message
- - return 0
+ - def consumerMessage(msg, args):
+ - topic = GetMessageTopic(msg)
+ - body = GetMessageBody(msg)
+ - tags = GetMessageTags(msg)
+ - msgid = GetMessageId(msg)
+ - # handle message...
+ - return 0
```
- pushconsumer must invoke following interface:
```
- - consumer =CreatePushConsumer("please_rename_unique_group_name_1");
- - SetPushConsumerNameServerAddress(consumer,"please_rename_unique_name_server")
+ - consumer = CreatePushConsumer("please_rename_unique_group_name_1");
+ - SetPushConsumerNameServerAddress(consumer, "please_rename_unique_name_server")
- Subscribe(consumer, "your_topic", "*")
- - RegisterMessageCallback(consumer,consumerMessage)
+ - RegisterMessageCallback(consumer, consumerMessage, args)
- StartPushConsumer(consumer)
- ShutdownPushConsumer(consumer)
- DestroyPushConsumer(consumer)
@@ -118,4 +180,5 @@
- sync producer
- python testProducer.py
- push consumer
- - python testConsumer.py
\ No newline at end of file
+ - python testConsumer.py
+
diff --git a/doc/api-doc/consumer-push.md b/doc/api-doc/consumer-push.md
new file mode 100644
index 0000000..6ee498e
--- /dev/null
+++ b/doc/api-doc/consumer-push.md
@@ -0,0 +1,81 @@
+----------
+## Api docs
+
+### 1. Push Consumer
+* consumer = CreatePushConsumer(consumerGroup) <br />
+ - function description<br />
+ create a push consumer instance, by setting consumer group<br />
+
+ - input <br />
+ consumerGroup: consumer group<br />
+
+ - return<br />
+ consumer: consumer instance
+
+* SetPushConsumerNameServerAddress(consumer, namesrv) <br />
+ - function description<br />
+ set name srv address for the consumer instance<br />
+
+ - input <br />
+ consumer: consumer intance<br />
+ namesrv: name srv address. like : 127.0.0.1:9876
+
+ - return : no<br />
+
+* Subscribe(consumer, topic, tag) <br />
+ - function description<br />
+ make consumer subscribe the topic and tag <br />
+
+ - input <br />
+ consumer: consumer intance<br />
+ topic: topic name
+ tag: topic tag
+
+* RegisterMessageCallback(consumer, pyCallBack, pyArgs) <br />
+ - function description<br />
+ set callback for push consumer instance <br />
+
+ - input <br />
+ consumer: consumer intance<br />
+ pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method<br />
+ pyArgs: the arguments will be passed to pyCallBack
+
+* SetPushConsumerThreadCount(consumer, threadCount)
+ - function description<br />
+ set push consumer thread count<br />
+
+ - input <br />
+ consumer: consumer intance<br />
+ threadCount: thread count
+
+* SetPushConsumerMessageBatchMaxSize(consumer, batchSize)
+ - function description<br />
+ set message count for one push<br />
+
+ - input <br />
+ consumer: consumer intance<br />
+ batchSize: message count
+
+* SetPushConsumerInstanceName(consumer, instanceName)
+ - function description<br />
+ set consumer instance name<br />
+
+ - input <br />
+ consumer: consumer intance<br />
+ instanceName: consumer instance name
+
+* SetPushConsumerSessionCredentials(consumer, accessKey, secretKey,channel)
+ - function description<br />
+ set consumer access keys<br />
+
+ - input <br />
+ consumer: consumer intance<br />
+ accessKey: accessKey<br />
+ secretKey: secretKey<br />
+ channel: channel<br />
+
+
+
+
+
+
diff --git a/doc/api-doc/message.md b/doc/api-doc/message.md
new file mode 100644
index 0000000..b727a6d
--- /dev/null
+++ b/doc/api-doc/message.md
@@ -0,0 +1,135 @@
+----------
+## Api docs
+
+### 1. Message
+* message = CreateMessage("topicName") <br />
+ - function description<br />
+ create a message instance, by setting topic field<br />
+
+ - input <br />
+ topicName: a topic name<br />
+
+ - return<br />
+ a new message instance, after used it, you need call DestroyMessage(message)<br />
+
+* DestroyMessage(message) <br />
+ - function description<br />
+ destroy a message instance, delete memmory<br />
+
+ - input <br />
+ message: message instance<br />
+
+* SetMessageTopic(message, topic) <br />
+ - function description<br />
+ set topic field value for the message<br />
+
+ - input <br />
+ message: message instance<br />
+ topic: a topic name
+
+* SetMessageTags(message, tags) <br />
+ - function description<br />
+ set tag field value for the message<br />
+
+ - input <br />
+ message: message instance<br />
+ tags: tag for the topic
+
+* SetMessageKeys(message, keys) <br />
+ - function description<br />
+ set key field value for the message<br />
+
+ - input <br />
+ message: message instance<br />
+ keys: key for the topic
+
+* SetMessageBody(message, stringBody) <br />
+ - function description<br />
+ set body for the message<br />
+
+ - input <br />
+ message: message instance<br />
+ body: message body as string
+
+* SetByteMessageBody(message, byteBody, byteLength) <br />
+ - function description<br />
+ set body for the message<br />
+
+ - input <br />
+ message: message instance<br />
+ byteBody: message body as byte[]
+ byteLength: byteBody's length
+
+* SetMessageProperty(message, key, value) <br />
+ - function description<br />
+ set extend k-v for message<br />
+
+ - input <br />
+ message: message instance<br />
+ key: string key
+ value: string value
+
+* SetMessageDelayTimeLevel(message, level) <br />
+ - function description<br />
+ set delay level<br />
+
+ - input <br />
+ message: message instance<br />
+ level: delay level as int
+
+
+### 2. MessageExt
+* topic = GetMessageTopic(msgExt) <br />
+ - function description<br />
+ get topic name from a message instance<br />
+
+ - input <br />
+ msgExt: message instance<br />
+ - return<br />
+ topic: topic name
+
+* tag = GetMessageTags(msgExt) <br />
+ - function description<br />
+ get tag from a message instance<br />
+
+ - input <br />
+ msgExt: message instance<br />
+ - return<br />
+ tag: tag
+
+* key = GetMessageKeys(msgExt) <br />
+ - function description<br />
+ get message key from a message instance<br />
+
+ - input <br />
+ msgExt: message instance<br />
+ - return<br />
+ key: message key
+
+* body = GetMessageBody(msgExt) <br />
+ - function description<br />
+ get message body from a message instance<br />
+
+ - input <br />
+ msgExt: message instance<br />
+ - return<br />
+ body: message body as string
+
+* value = GetMessageProperty(msgExt, key) <br />
+ - function description<br />
+ get a message proprty value from a message instance<br />
+
+ - input <br />
+ msgExt: message instance<br />
+ key: property key
+ - return<br />
+ value: property value as string
+
+* messageId = GetMessageId(msgExt) <br />
+ - function description<br />
+ get a message id from a message instance<br />
+
+ - input <br />
+ msgExt: message instance<br />
+ - return<br />
+ messageId: message id as string
\ No newline at end of file
diff --git a/doc/api-doc/producer.md b/doc/api-doc/producer.md
new file mode 100644
index 0000000..6af4c99
--- /dev/null
+++ b/doc/api-doc/producer.md
@@ -0,0 +1,94 @@
+----------
+## Api docs
+
+### Producer
+* producer = CreateProducer("producerName") <br />
+ - function description<br />
+ create a producer instance<br />
+
+ - input <br />
+ producerName: producer group name<br />
+
+ - return<br />
+ a new producer instance, can send messages<br />
+
+* SetProducerNameServerAddress(producer, "namesrv address")
+ - function description<br />
+ set namesrv address for the producer instance<br />
+
+ - input<br />
+ producer : a producer instance <br />
+
+ namesrv address : like 127.0.0.1:9876<br />
+ - return : no <br />
+* SetProducerInstanceName(producer, "instance name")
+ - function description<br />
+ set instance name for the producer
+
+ - input<br />
+ producer : a producer instance <br />
+ intance name : a producer instance name<br />
+ - return : no <br />
+
+* SetProducerSessionCredentials(producer, accessKey, secretKey, channel)
+ - function description<br />
+ set access keys for accessing broker in the session
+
+ - input<br />
+ producer : a producer instance <br />
+ accessKey : accessKey<br />
+ secretKey : secretKey<br />
+ channel : channel<br />
+ - return : no <br />
+
+* StartProducer(producer)
+ - function description<br />
+ start the producer instance
+
+ - input<br />
+ producer : a producer instance <br />
+
+ - return : no <br />
+
+* ShutdownProducer(producer)
+ - function description<br />
+ shutdown the producer instance
+
+ - input<br />
+ producer : a producer instance <br />
+
+ - return : no <br />
+
+* DestroyProducer(producer)
+ - function description<br />
+ destroy the producer instance
+
+ - input<br />
+ producer : a producer instance <br />
+
+ - return : no <br />
+
+* PySendResult result = SendMessageSync(producer, msg)
+ - function description<br />
+ send a message sync
+
+ - input<br />
+ producer : a producer instance <br />
+ msg : a message instance <br />
+
+ - return<br />
+ result.GetMsgId(): if send successfuly, it is the message id<br />
+ result.offset : message offset in broker<br />
+ result.sendStatus<br />
+ SEND_OK: <br />
+ SEND_FLUSH_DISK_TIMEOUT,<br />
+ SEND_FLUSH_SLAVE_TIMEOUT,<br />
+ SEND_SLAVE_NOT_AVAILABLE<br />
+
+* SendMessageOneway(producer, msg)
+ - function description<br />
+ send a message one way, no matter about the result
+
+ - input<br />
+ producer : a producer instance <br />
+ msg : a message instance <br />
\ No newline at end of file
diff --git a/install_boostpython.sh b/install_boostpython.sh
new file mode 100755
index 0000000..4ff9792
--- /dev/null
+++ b/install_boostpython.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+VERSION=1.58.0
+BOOST=boost_1_58_0
+
+if [ ! -d ${HOME}/${BOOST} ]; then
+ if [ -e ${HOME}/${BOOST}.tar.gz ]; then
+ echo "Find Packge ${HOME}/${BOOST}.tar.gz......."
+ else
+ wget -O ${HOME}/${BOOST}.tar.gz http://sourceforge.net/projects/boost/files/boost/${VERSION}/${BOOST}.tar.gz
+ fi
+ if [ $? -ne 0 ];then
+ exit 1
+ fi
+ tar -xzf ${HOME}/${BOOST}.tar.gz -C ${HOME}
+ if [ $? -ne 0 ];then
+ exit 1
+ fi
+else
+ echo "Find Boost Source:${HOME}/${BOOST}, Build and install....."
+fi
+
+cd ${HOME}/${BOOST}
+
+./bootstrap.sh --prefix=/usr/local --with-libraries=python
+if [ $? -ne 0 ];then
+ exit 1
+fi
+echo "Install boost static library...."
+sudo ./b2 cflags="-fPIC" cxxflags="-fPIC -Wno-unused-local-typedefs -Wno-strict-aliasing" link=static \
+ runtime-link=static --with-python \
+ -a install
+if [ $? -ne 0 ];then
+ exit 1
+fi
+echo "Install boost dynamic library....."
+sudo ./b2 cflags="-fPIC" cxxflags="-fPIC -Wno-unused-local-typedefs -Wno-strict-aliasing" link=shared \
+ runtime-link=shared --with-python \
+ -a install
+if [ $? -ne 0 ];then
+ exit 1
+fi
+echo "Finish build boost library."
diff --git a/package.sh b/package.sh
index f22d293..404a4f4 100755
--- a/package.sh
+++ b/package.sh
@@ -44,7 +44,7 @@
rm -rf ${CWD_DIR}/tmpbuild
mkdir -p ${CWD_DIR}/tmpbuild
cd ${CWD_DIR}/tmpbuild
-if [ $1 = "shared" ]; then
+if [ "$1" = "shared" ]; then
echo "------------Build Client using dynamic library------------"
cmake ${CWD_DIR} -DBoost_USE_STATIC_LIBS=OFF -DROCKETMQ_USE_STATIC_LIBS=OFF
RMQ=$(cat CMakeCache.txt | grep ROCKETMQ_LIBRARIES:FILEPATH= | cut -f2 -d "=")
@@ -66,7 +66,7 @@
echo "Package Library...."
rm -rf ${DEPLOY_BUILD_HOME}
mkdir -p ${DEPLOY_BUILD_HOME}/lib
-if [ $1 = "shared" ];then
+if [ "$1" = "shared" ];then
echo "Copy librocketmq to package...."
cp -rf ${RMQ} ${DEPLOY_BUILD_HOME}/lib/
#cp -rf /usr/local/lib/libboost_python.*.so.* ${DEPLOY_BUILD_HOME}/lib/
@@ -77,7 +77,7 @@
cp -rf ${CWD_DIR}/changelog ${DEPLOY_BUILD_HOME}/
-cd ${CWD_DIR} && tar -cvzf ./${PACKAGE}-${VERSION}.tar.gz ./${VERSION} >/dev/null 2>&1
+cd ${CWD_DIR} && tar -cvzf ./${PACKAGE}-${VERSION}.tar.gz ./${PACKAGE} >/dev/null 2>&1
rm -rf ${DEPLOY_BUILD_HOME}
# # ##====================================================================
cd ${CWD_DIR}/tmpbuild
diff --git a/project/CMakeLists.txt b/project/CMakeLists.txt
index 6f42c44..152c5e3 100644
--- a/project/CMakeLists.txt
+++ b/project/CMakeLists.txt
@@ -14,19 +14,19 @@
#* See the License for the specific language governing permissions and
#* limitations under the License.
#*/
-# source files
+
project(rocketmqclientpython)
-file(GLOB_RECURSE SRC_FILES ${CMAKE_SOURCE_DIR}/src/*)
+file(GLOB_RECURSE SRC_FILES ${CMAKE_SOURCE_DIR}/src/*)
# subdirs
SET(SUB_DIRS)
file(GLOB children ${CMAKE_SOURCE_DIR}/src/*)
-FOREACH(child ${children})
- IF(IS_DIRECTORY ${child})
- LIST(APPEND SUB_DIRS ${child})
- ENDIF()
-ENDFOREACH()
+FOREACH (child ${children})
+ IF (IS_DIRECTORY ${child})
+ LIST(APPEND SUB_DIRS ${child})
+ ENDIF ()
+ENDFOREACH ()
LIST(APPEND SUB_DIRS ${CMAKE_SOURCE_DIR}/src)
# include_directories
@@ -43,15 +43,19 @@
target_link_libraries(rocketmqclientpython_static ${deplibs})
target_link_libraries(rocketmqclientpython_static ${ROCKETMQ_LIBRARIES})
target_link_libraries(rocketmqclientpython_static ${Boost_LIBRARIES})
-
+target_link_libraries(rocketmqclientpython_static ${PYTHON_LIBRARIES})
# shared
set(CMAKE_SHARED_LINKER_FLAGS "-fPIC -shared")
add_library(rocketmqclientpython_shared SHARED ${SRC_FILES})
set_target_properties(rocketmqclientpython_shared PROPERTIES OUTPUT_NAME "rocketmqclientpython")
+if (APPLE)
+ set_target_properties(rocketmqclientpython_shared PROPERTIES SUFFIX .so)
+endif (APPLE)
target_link_libraries(rocketmqclientpython_shared ${deplibs})
target_link_libraries(rocketmqclientpython_shared ${ROCKETMQ_LIBRARIES})
target_link_libraries(rocketmqclientpython_shared ${Boost_LIBRARIES})
+target_link_libraries(rocketmqclientpython_shared ${PYTHON_LIBRARIES})
# install
-install (TARGETS rocketmqclientpython_shared DESTINATION lib)
+install(TARGETS rocketmqclientpython_shared DESTINATION lib)
diff --git a/sample/testConsumer.py b/sample/testConsumer.py
index 93665ed..03ca587 100644
--- a/sample/testConsumer.py
+++ b/sample/testConsumer.py
@@ -18,8 +18,10 @@
import base
import time
from librocketmqclientpython import *
+
totalMsg = 0
-def consumerMessage(msg):
+
+def consumerMessage(msg, args):
global totalMsg
totalMsg += 1
print(">>ConsumerMessage Called:",totalMsg)
@@ -33,11 +35,12 @@
consumer = CreatePushConsumer("awtTest_Producer_Python_Test")
print(consumer)
-SetPushConsumerNameServerAddress(consumer,"172.17.0.2:9876")
-SetPushConsumerThreadCount(consumer,1)
+SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876")
+SetPushConsumerThreadCount(consumer, 1)
Subscribe(consumer, "T_TestTopic", "*")
-RegisterMessageCallback(consumer,consumerMessage)
+RegisterMessageCallback(consumer, consumerMessage, None)
StartPushConsumer(consumer)
+
i = 1
while i <= 60:
print(i)
diff --git a/sample/testProducer.py b/sample/testProducer.py
index 558eb6b..6938fd2 100644
--- a/sample/testProducer.py
+++ b/sample/testProducer.py
@@ -42,6 +42,16 @@
DestroyMessage(msg)
print("Done...............")
+def testSendMessageOneway(producer, topic, key, body):
+ print("Starting Sending(Oneway).....")
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+ SetMessageTags(msg, "Send Message Oneway Test.")
+ SendMessageOneway(producer,msg)
+ DestroyMessage(msg)
+ print("Done...............")
+
def releaseProducer(producer):
ShutdownProducer(producer)
DestroyProducer(producer)
@@ -59,4 +69,9 @@
print("Now Send Message:",i)
+while i < 10:
+ i += 1
+ testSendMessageOneway(producer, topic, key, body)
+ print("Now Send Message One way:",i)
+
releaseProducer(producer)
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..a7d262d
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,46 @@
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+from distutils.core import Extension
+
+from setuptools import setup
+
+BOOST_INCLUDE_PATH = "/usr/local/include/boost"
+PYTHON_INCLUDE_PATH = "/usr/include/python2.7"
+ROCKETMQ_INCLUDE_PATH = "/usr/local/include/rocketmq"
+PYTHON_LIB_DIR = "/usr/lib64"
+BOOST_LIB_DIR = "/usr/local/lib"
+ROCKETMQ_LIB_DIR = "/usr/local/lib"
+NAME = 'librocketmqclientpython'
+setup(name=NAME,
+ version='1.2.0',
+ url="https://github.com/apache/rocketmq-client-python",
+ description="RocketMQ Python client",
+ long_description="RocketMQ Python client is developed on top of rocketmq-client-cpp, which has been proven "
+ "robust and widely adopted within Alibaba Group by many business units for more than three "
+ "years.",
+ license="Apache License, Version 2.0",
+ platforms=["linux"],
+ packages=["src"],
+ ext_modules=[Extension(name=NAME
+ , sources=['src/PythonWrapper.cpp']
+ , extra_compile_args=[]
+ , extra_link_args=["-lboost_python", "-lrocketmq"]
+ , include_dirs=[BOOST_INCLUDE_PATH, ROCKETMQ_INCLUDE_PATH]
+ , library_dirs=[PYTHON_LIB_DIR, ROCKETMQ_LIB_DIR, BOOST_LIB_DIR]
+ ), ],
+ )
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index b6c04b6..919ba5b 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -30,24 +30,37 @@
const char *VERSION =
"PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " ";
-map<CPushConsumer *, PyObject *> g_CallBackMap;
+map<CPushConsumer *, pair<PyObject *, object>> g_CallBackMap;
class PyThreadStateLock {
public:
- PyThreadStateLock(void) {
+ PyThreadStateLock() {
state = PyGILState_Ensure();
}
- ~PyThreadStateLock(void) {
- if (state == PyGILState_LOCKED) {
- PyGILState_Release(state);
- }
+ ~PyThreadStateLock() {
+ // NOTE: must paired with PyGILState_Ensure, otherwise it will cause deadlock!!!
+ PyGILState_Release(state);
}
private:
PyGILState_STATE state;
};
+class PyThreadStateUnlock {
+public:
+ PyThreadStateUnlock() : _save(NULL) {
+ Py_UNBLOCK_THREADS
+ }
+
+ ~PyThreadStateUnlock() {
+ Py_BLOCK_THREADS
+ }
+
+private:
+ PyThreadState *_save;
+};
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -116,6 +129,9 @@
int PySetProducerNameServerAddress(void *producer, const char *namesrv) {
return SetProducerNameServerAddress((CProducer *) producer, namesrv);
}
+int PySetProducerNameServerDomain(void *producer, const char *domain) {
+ return SetProducerNameServerDomain((CProducer *) producer, domain);
+}
int PySetProducerInstanceName(void *producer, const char *instanceName) {
return SetProducerInstanceName((CProducer *)producer, instanceName);
}
@@ -133,58 +149,66 @@
return ret;
}
+int PySendMessageOneway(void *producer, void *msg) {
+ return SendMessageOneway((CProducer *) producer, (CMessage *) msg);
+}
+
//SendResult
const char *PyGetSendResultMsgID(CSendResult &sendResult) {
return (const char *) (sendResult.msgId);
}
//consumer
void *PyCreatePushConsumer(const char *groupId) {
- //Py_Initialize();
- PyEval_InitThreads();
-// PyEval_ReleaseThread(PyThreadState_Get());
+ PyEval_InitThreads(); // ensure create GIL, for call Python callback from C.
return (void *) CreatePushConsumer(groupId);
}
int PyDestroyPushConsumer(void *consumer) {
- return DestroyPushConsumer((CPushConsumer *) consumer);
+ CPushConsumer *consumerInner = (CPushConsumer *) consumer;
+ map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
+ iter = g_CallBackMap.find(consumerInner);
+ if (iter != g_CallBackMap.end()) {
+ UnregisterMessageCallback(consumerInner);
+ g_CallBackMap.erase(iter);
+ }
+ return DestroyPushConsumer(consumerInner);
}
int PyStartPushConsumer(void *consumer) {
return StartPushConsumer((CPushConsumer *) consumer);
}
int PyShutdownPushConsumer(void *consumer) {
- int ret = ShutdownPushConsumer((CPushConsumer *) consumer);
- //PyGILState_Ensure();
- //Py_Finalize();
- return ret;
+ PyThreadStateUnlock PyThreadUnlock; // ShutdownPushConsumer is a block call, ensure thread don't hold GIL.
+ return ShutdownPushConsumer((CPushConsumer *) consumer);
}
int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv) {
return SetPushConsumerNameServerAddress((CPushConsumer *) consumer, namesrv);
}
+int PySetPushConsumerNameServerDomain(void *consumer, const char *domain){
+ return SetPushConsumerNameServerDomain((CPushConsumer *) consumer, domain);
+}
int PySubscribe(void *consumer, const char *topic, const char *expression) {
return Subscribe((CPushConsumer *) consumer, topic, expression);
}
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback) {
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args) {
CPushConsumer *consumerInner = (CPushConsumer *) consumer;
- g_CallBackMap[consumerInner] = pCallback;
+ g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args));
return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner);
}
int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg) {
-
- class PyThreadStateLock PyThreadLock;
- PyMessageExt message;
- message.pMessageExt = msg;
- map<CPushConsumer *, PyObject *>::iterator iter;
+ PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
+ PyMessageExt message = { .pMessageExt = msg };
+ map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
iter = g_CallBackMap.find(consumer);
if (iter != g_CallBackMap.end()) {
- PyObject * pCallback = iter->second;
+ pair<PyObject *, object> callback = iter->second;
+ PyObject * pCallback = callback.first;
+ object& args = callback.second;
if (pCallback != NULL) {
- int status =
- boost::python::call<int>(pCallback, message);
+ int status = boost::python::call<int>(pCallback, message, args);
return status;
}
}
return 1;
-
}
int PySetPushConsumerThreadCount(void *consumer, int threadCount) {
@@ -200,6 +224,11 @@
const char *channel){
return SetPushConsumerSessionCredentials((CPushConsumer *)consumer, accessKey, secretKey, channel);
}
+
+//push consumer
+int PySetPullConsumerNameServerDomain(void *consumer, const char *domain) {
+ return SetPullConsumerNameServerDomain((CPullConsumer *) consumer, domain);
+}
//version
const char *PyGetVersion() {
return VERSION;
@@ -260,9 +289,11 @@
def("StartProducer", PyStartProducer);
def("ShutdownProducer", PyShutdownProducer);
def("SetProducerNameServerAddress", PySetProducerNameServerAddress);
+ def("SetProducerNameServerDomain", PySetProducerNameServerDomain);
def("SetProducerInstanceName", PySetProducerInstanceName);
def("SetProducerSessionCredentials", PySetProducerSessionCredentials);
def("SendMessageSync", PySendMessageSync);
+ def("SendMessageOneway", PySendMessageOneway);
//For Consumer
def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy<return_opaque_pointer>());
@@ -270,6 +301,7 @@
def("StartPushConsumer", PyStartPushConsumer);
def("ShutdownPushConsumer", PyShutdownPushConsumer);
def("SetPushConsumerNameServerAddress", PySetPushConsumerNameServerAddress);
+ def("SetPushConsumerNameServerDomain", PySetPushConsumerNameServerDomain);
def("SetPushConsumerThreadCount", PySetPushConsumerThreadCount);
def("SetPushConsumerMessageBatchMaxSize", PySetPushConsumerMessageBatchMaxSize);
def("SetPushConsumerInstanceName", PySetPushConsumerInstanceName);
@@ -277,6 +309,9 @@
def("Subscribe", PySubscribe);
def("RegisterMessageCallback", PyRegisterMessageCallback);
+ //pull consumer
+ def("SetPullConsumerNameServerDomain", PySetPullConsumerNameServerDomain);
+
//For Version
def("GetVersion", PyGetVersion);
}
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index 887e6f7..2ad8255 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -21,6 +21,7 @@
#include "CSendResult.h"
#include "CProducer.h"
#include "CPushConsumer.h"
+#include "CPullConsumer.h"
#include <boost/python.hpp>
using namespace boost::python;
@@ -39,8 +40,8 @@
CMessageExt *pMessageExt;
} PyMessageExt;
-#define PYTHON_CLIENT_VERSION "1.0.0"
-#define PYCLI_BUILD_DATE "16-10-2018"
+#define PYTHON_CLIENT_VERSION "1.2.0"
+#define PYCLI_BUILD_DATE "04-12-2018"
#ifdef __cplusplus
extern "C" {
@@ -71,9 +72,11 @@
int PyStartProducer(void *producer);
int PyShutdownProducer(void *producer);
int PySetProducerNameServerAddress(void *producer, const char *namesrv);
+int PySetProducerNameServerDomain(void *producer, const char *domain);
int PySetProducerInstanceName(void *producer, const char *instanceName);
int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel);
PySendResult PySendMessageSync(void *producer, void *msg);
+int PySendMessageOneway(void *producer, void *msg);
//sendResult
const char *PyGetSendResultMsgID(CSendResult &sendResult);
@@ -84,14 +87,19 @@
int PyStartPushConsumer(void *consumer);
int PyShutdownPushConsumer(void *consumer);
int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv);
+int PySetPushConsumerNameServerDomain(void *consumer, const char *domain);
int PySubscribe(void *consumer, const char *topic, const char *expression);
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback);
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args);
int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg);
int PySetPushConsumerThreadCount(void *consumer, int threadCount);
int PySetPushConsumerMessageBatchMaxSize(void *consumer, int batchSize);
int PySetPushConsumerInstanceName(void *consumer, const char *instanceName);
int PySetPushConsumerSessionCredentials(void *consumer, const char *accessKey, const char *secretKey,
const char *channel);
+
+//push consumer
+int PySetPullConsumerNameServerDomain(void *consumer, const char *domain);
+
#ifdef __cplusplus
};
#endif
diff --git a/test/TestConsumeMessages.py b/test/TestConsumeMessages.py
new file mode 100644
index 0000000..da4b0b6
--- /dev/null
+++ b/test/TestConsumeMessages.py
@@ -0,0 +1,78 @@
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+import __init__
+from librocketmqclientpython import *
+
+import time
+import sys
+
+topic = 'test'
+name_srv = '127.0.0.1:9876'
+tag = 'rmq-tag'
+consumer_group = 'test-consumer-group'
+totalMsg = 0
+
+
+def sigint_handler(signum, frame):
+ global is_sigint_up
+ is_sigint_up = True
+ sys.exit(0)
+
+
+def consumer_message(msg, args):
+ global totalMsg
+ totalMsg += 1
+ print 'total count %d' % totalMsg
+ print 'topic=%s' % GetMessageTopic(msg)
+ print 'tag=%s' % GetMessageTags(msg)
+ print 'body=%s' % GetMessageBody(msg)
+ print 'msg id=%s' % GetMessageId(msg)
+
+ print 'map.keys %s' % GetMessageKeys(msg)
+
+ print 'map.name %s' % GetMessageProperty(msg, 'name')
+ print 'map.id %s' % GetMessageProperty(msg, 'id')
+ return 0
+
+
+def init_producer(_group, _topic, _tag):
+ consumer = CreatePushConsumer(_group)
+ SetPushConsumerNameServerAddress(consumer, name_srv)
+ SetPushConsumerThreadCount(consumer, 1)
+ Subscribe(consumer, _topic, _tag)
+ RegisterMessageCallback(consumer, consumer_message, None)
+ StartPushConsumer(consumer)
+ print 'consumer is ready...'
+ return consumer
+
+
+def start_one_consumer(_group, _topic, _tag):
+ consumer = init_producer(_group, _topic, _tag)
+ i = 1
+ while i <= 10:
+ print 'clock: ' + str(i)
+ i += 1
+ time.sleep(10)
+
+ ShutdownPushConsumer(consumer)
+ DestroyPushConsumer(consumer)
+ print("Consumer Down....")
+
+
+if __name__ == '__main__':
+ start_one_consumer(consumer_group, topic, '*')
diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py
new file mode 100644
index 0000000..69871a2
--- /dev/null
+++ b/test/TestSendMessages.py
@@ -0,0 +1,206 @@
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+import __init__
+from librocketmqclientpython import *
+import time
+
+topic = 'test'
+name_srv = '127.0.0.1:9876'
+
+
+def init_producer():
+ producer = CreateProducer('TestProducer')
+ SetProducerNameServerAddress(producer, name_srv)
+ StartProducer(producer)
+ return producer
+
+
+producer = init_producer()
+tag = 'rmq-tag'
+key = 'rmq-key'
+
+
+def send_messages_sync(count):
+ for a in range(count):
+ print 'start sending...'
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print '[RMQ-PRODUCER]start sending...done, msg id = ' + \
+ result.GetMsgId()
+
+
+def send_messages_sync_with_map(count):
+ print 'sending message with properties...id, name'
+ for a in range(count):
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+
+ SetMessageProperty(msg, 'name', 'test')
+ SetMessageProperty(msg, 'id', str(time.time()))
+
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print '[RMQ-PRODUCER]start sending...done, msg id = ' + \
+ result.GetMsgId()
+
+
+def send_messages_with_tag_sync(count):
+ print 'sending message with tag...' + tag
+ for a in range(count):
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageTags(msg, tag)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_tag_and_map_sync(count):
+ print 'sending message with tag...' + tag + ' and properties id, name'
+ for a in range(count):
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+
+ SetMessageProperty(msg, 'name', 'test')
+ SetMessageProperty(msg, 'id', str(time.time()))
+
+ SetMessageTags(msg, tag)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_sync(count):
+ print 'sending message with keys...' + key
+ for a in range(count):
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_and_map_sync(count):
+ print 'sending message with keys...' + key + ' and properties id, name'
+ for a in range(count):
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+
+ SetMessageProperty(msg, 'name', 'test')
+ SetMessageProperty(msg, 'id', str(time.time()))
+
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_and_tag_sync(count):
+ key = 'rmq-key'
+ print 'sending message with keys and tag...' + key + ', ' + tag
+ for a in range(count):
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+ SetMessageTags(msg, tag)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_with_key_and_tag_and_map_sync(count):
+ key = 'rmq-key'
+ print 'sending message with keys and tag...' + \
+ key + ', ' + tag + ' and properties id, name'
+ for a in range(count):
+ body = 'hi rmq, now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+
+ SetMessageProperty(msg, 'name', 'test')
+ SetMessageProperty(msg, 'id', str(time.time()))
+
+ SetMessageTags(msg, tag)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'msg id = ' + result.GetMsgId()
+
+
+def send_messages_oneway(count):
+ for a in range(count):
+ print 'start sending...'
+ body = 'hi rmq, this is oneway message. now is ' + \
+ time.strftime('%Y.%m.%d', time.localtime(time.time()))
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+
+ SetMessageKeys(msg, key)
+ SetMessageProperty(msg, 'name', 'test')
+ SetMessageProperty(msg, 'id', str(time.time()))
+
+ SendMessageOneway(producer, msg)
+ DestroyMessage(msg)
+ print 'send oneway is over'
+
+
+def send_delay_messages(producer, topic, count):
+ key = 'rmq-key'
+ print 'start sending message'
+ tag = 'test'
+ for n in range(count):
+ body = 'hi rmq, now is' + str(time.time())
+ msg = CreateMessage(topic)
+ SetMessageBody(msg, body)
+ SetMessageKeys(msg, key)
+ SetMessageProperty(msg, 'name', 'hello world')
+ SetMessageProperty(msg, 'id', str(time.time()))
+ SetMessageTags(msg, tag)
+ # messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+
+ SetDelayTimeLevel(msg, 5)
+
+ print str(msg)
+ result = SendMessageSync(producer, msg)
+ DestroyMessage(msg)
+ print 'msg id =' + result.GetMsgId()
+
+
+if __name__ == '__main__':
+ # print GetVersion()
+ while True:
+ send_messages_oneway(1)
+ time.sleep(1)
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..f3a3a82
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,22 @@
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+import sys
+sys.path.append('/usr/local/lib')
+print("__________Python Version:___________")
+print(sys.version)
+print("______Add Path /usr/local/lib_______")
diff --git a/third_party/googletest/README.md b/third_party/googletest/README.md
new file mode 100644
index 0000000..39da329
--- /dev/null
+++ b/third_party/googletest/README.md
@@ -0,0 +1,4 @@
+### Google Test
+
+-----------------------
+Download [googletest source](https://github.com/abseil/googletest)
diff --git a/bin/Makefile b/unitests/CMakeLists.txt
similarity index 73%
rename from bin/Makefile
rename to unitests/CMakeLists.txt
index f850c19..02004ae 100644
--- a/bin/Makefile
+++ b/unitests/CMakeLists.txt
@@ -15,13 +15,17 @@
#* limitations under the License.
#*/
-clean:
- $(RM) -rf *_d.*
- $(RM) -rf core
- $(RM) -rf *.exp*
- $(RM) -rf *.pdb*
- $(RM) -rf *Producer.lib*
- $(RM) -rf *RocketMQClient.dll*
- $(RM) -rf *RocketMQClient.lib*
- $(RM) -rf *pushConsumer.lib*
- $(RM) -rf *.a *.so
+add_executable(runUnitTests
+ PythonWrapperTest.cpp
+ )
+
+target_link_libraries(runUnitTests
+ dl
+ gtest)
+
+if (UNIX AND NOT APPLE)
+ target_link_libraries(runUnitTests rt)
+endif ()
+
+set_target_properties(runUnitTests PROPERTIES
+ RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
diff --git a/unitests/PythonWrapperTest.cpp b/unitests/PythonWrapperTest.cpp
new file mode 100644
index 0000000..5989335
--- /dev/null
+++ b/unitests/PythonWrapperTest.cpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gtest/gtest.h"
+
+int main(int argc, char **argv) {
+ testing::InitGoogleTest(&argc, argv);
+ int ret = RUN_ALL_TESTS();
+}
+
+TEST(Message, testCreateMessage) {
+ASSERT_TRUE(1 == 1);
+}