chore(consumer) remove pull consumer implementation
diff --git a/README.md b/README.md
index b7e7497..93004e5 100644
--- a/README.md
+++ b/README.md
@@ -62,25 +62,6 @@
```
-### PullConsumer
-
-```python
-from rocketmq.client import PullConsumer
-
-
-consumer = PullConsumer('CID_XXX')
-consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
-# For ip and port name server address, use `set_namesrv_addr` method, for example:
-# consumer.set_namesrv_addr('127.0.0.1:9887')
-consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
-consumer.start()
-
-for msg in consumer.pull('YOUR-TOPIC'):
- print(msg.id, msg.body)
-
-consumer.shutdown()
-```
-
## License
This work is released under the MIT license. A copy of the license is provided in the [LICENSE](./LICENSE) file.
diff --git a/ci/Dockerfile b/ci/Dockerfile
index ccbc51f..6b9660f 100644
--- a/ci/Dockerfile
+++ b/ci/Dockerfile
@@ -12,7 +12,7 @@
rm -rf /tmp/zlib-1.2.11
# Build rocketmq-client-cpp
-RUN git clone --depth=1 --branch=1.2.3 https://github.com/apache/rocketmq-client-cpp.git /tmp/rocketmq-client-cpp && \
+RUN git clone --depth=1 --branch=1.2.4 https://github.com/apache/rocketmq-client-cpp.git /tmp/rocketmq-client-cpp && \
mkdir -p /tmp/rocketmq-client-cpp/tmp_down_dir && \
curl -sqL -o /tmp/rocketmq-client-cpp/tmp_down_dir/libevent-release-2.1.8-stable.zip https://github.com/libevent/libevent/archive/release-2.1.8-stable.zip && \
curl -sqL -o /tmp/rocketmq-client-cpp/tmp_down_dir/jsoncpp-0.10.7.zip https://github.com/open-source-parsers/jsoncpp/archive/0.10.7.zip && \
diff --git a/rocketmq/client.py b/rocketmq/client.py
index 1a58aa1..cce01fc 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -6,7 +6,7 @@
from collections import namedtuple
from .ffi import (
- dll, _CSendResult, MSG_CALLBACK_FUNC, _CMessageQueue, _CPullStatus,
+ dll, _CSendResult, MSG_CALLBACK_FUNC, _CMessageQueue,
_CConsumeStatus, MessageModel, QUEUE_SELECTOR_CALLBACK, TRANSACTION_CHECK_CALLBACK,
LOCAL_TRANSACTION_EXECUTE_CALLBACK
)
@@ -16,7 +16,7 @@
)
from .consts import MessageProperty
-__all__ = ['SendStatus', 'Message', 'RecvMessage', 'Producer', 'PushConsumer', 'PullConsumer', 'TransactionMQProducer',
+__all__ = ['SendStatus', 'Message', 'RecvMessage', 'Producer', 'PushConsumer', 'TransactionMQProducer',
'TransactionStatus']
PY2 = sys.version_info[0] == 2
@@ -498,97 +498,3 @@
def set_instance_name(self, name):
ffi_check(dll.SetPushConsumerInstanceName(self._handle, _to_bytes(name)))
-
-
-class PullConsumer(object):
- offset_table = {}
-
- def __init__(self, group_id):
- self._handle = dll.CreatePullConsumer(_to_bytes(group_id))
- if self._handle is None:
- raise NullPointerException('CreatePullConsumer returned null pointer')
-
- def __del__(self):
- if self._handle is not None:
- ffi_check(dll.DestroyPullConsumer(self._handle))
-
- def __enter__(self):
- self.start()
-
- def __exit__(self, type, value, traceback):
- self.shutdown()
-
- def start(self):
- ffi_check(dll.StartPullConsumer(self._handle))
-
- def shutdown(self):
- ffi_check(dll.ShutdownPullConsumer(self._handle))
-
- def set_group(self, group_id):
- ffi_check(dll.SetPullConsumerGroupID(self._handle, _to_bytes(group_id)))
-
- def set_namesrv_addr(self, addr):
- ffi_check(dll.SetPullConsumerNameServerAddress(self._handle, _to_bytes(addr)))
-
- def set_namesrv_domain(self, domain):
- ffi_check(dll.SetPullConsumerNameServerDomain(self._handle, _to_bytes(domain)))
-
- def set_session_credentials(self, access_key, access_secret, channel):
- ffi_check(dll.SetPullConsumerSessionCredentials(
- self._handle,
- _to_bytes(access_key),
- _to_bytes(access_secret),
- _to_bytes(channel)
- ))
-
- def _get_mq_key(self, mq):
- key = '%s@%s' % (mq.topic, mq.queueId)
- return key
-
- def get_message_queue_offset(self, mq):
- offset = self.offset_table.get(self._get_mq_key(mq), 0)
- return offset
-
- def set_message_queue_offset(self, mq, offset):
- self.offset_table[self._get_mq_key(mq)] = offset
-
- def pull(self, topic, expression='*', max_num=32):
- message_queue = POINTER(_CMessageQueue)()
- queue_size = c_int()
- ffi_check(dll.FetchSubscriptionMessageQueues(
- self._handle,
- _to_bytes(topic),
- ctypes.pointer(message_queue),
- ctypes.pointer(queue_size)
- ))
- for i in range(int(queue_size.value)):
- mq = message_queue[i]
- tmp_offset = ctypes.c_longlong(self.get_message_queue_offset(mq))
-
- has_new_msg = True
- while has_new_msg:
- pull_res = dll.Pull(
- self._handle,
- ctypes.pointer(mq),
- _to_bytes(expression),
- tmp_offset,
- max_num,
- )
-
- if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:
- tmp_offset = pull_res.nextBeginOffset
- self.set_message_queue_offset(mq, tmp_offset)
-
- if pull_res.pullStatus == _CPullStatus.FOUND:
- for i in range(int(pull_res.size)):
- yield RecvMessage(pull_res.msgFoundList[i])
- elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:
- pass
- elif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:
- has_new_msg = False
- elif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:
- pass
- else:
- pass
- dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here
- ffi_check(dll.ReleaseSubscriptionMessageQueue(message_queue))
diff --git a/rocketmq/exceptions.py b/rocketmq/exceptions.py
index 5beb83f..0871d11 100644
--- a/rocketmq/exceptions.py
+++ b/rocketmq/exceptions.py
@@ -81,19 +81,4 @@
@_register(_CStatus.PUSHCONSUMER_START_FAILED)
class PushConsumerStartFailed(ConsumerException):
- pass
-
-
-@_register(_CStatus.PULLCONSUMER_START_FAILED)
-class PullConsumerStartFailed(ConsumerException):
- pass
-
-
-@_register(_CStatus.PULLCONSUMER_FETCH_MQ_FAILED)
-class PullConsumerFetchMQFailed(ConsumerException):
- pass
-
-
-@_register(_CStatus.PULLCONSUMER_FETCH_MQ_FAILED)
-class PullConsumerFetchMessageFailed(ConsumerException):
- pass
+ pass
\ No newline at end of file
diff --git a/rocketmq/ffi.py b/rocketmq/ffi.py
index 0c87c43..edda2bc 100644
--- a/rocketmq/ffi.py
+++ b/rocketmq/ffi.py
@@ -45,10 +45,7 @@
PRODUCER_SEND_ASYNC_FAILED = 14
# push consumer
PUSHCONSUMER_START_FAILED = 20
- # pull consumer
- PULLCONSUMER_START_FAILED = 30
- PULLCONSUMER_FETCH_MQ_FAILED = 31
- PULLCONSUMER_FETCH_MESSAGE_FAILED = 32
+
NOT_SUPPORT_NOW = -1
class _CLogLevel(CtypesEnum):
@@ -92,26 +89,6 @@
]
-class _CPullStatus(CtypesEnum):
- FOUND = 0
- NO_NEW_MSG = 1
- NO_MATCHED_MSG = 2
- OFFSET_ILLEGAL = 3
- BROKER_TIMEOUT = 4
-
-
-class _CPullResult(Structure):
- _fields_ = [
- ('pullStatus', c_int),
- ('nextBeginOffset', c_longlong),
- ('minOffset', c_longlong),
- ('maxOffset', c_longlong),
- ('msgFoundList', POINTER(c_void_p)),
- ('size', c_int),
- ('pData', c_void_p),
- ]
-
-
class _CConsumeStatus(CtypesEnum):
CONSUME_SUCCESS = 0
RECONSUME_LATER = 1
@@ -242,40 +219,6 @@
POINTER(_CSendResult)]
dll.SendMessageTransaction.restype = c_int
-# Pull Consumer
-dll.CreatePullConsumer.argtypes = [c_char_p]
-dll.CreatePullConsumer.restype = c_void_p
-dll.DestroyPullConsumer.argtypes = [c_void_p]
-dll.DestroyPullConsumer.restype = _CStatus
-dll.StartPullConsumer.argtypes = [c_void_p]
-dll.StartPullConsumer.restype = _CStatus
-dll.ShutdownPullConsumer.argtypes = [c_void_p]
-dll.ShutdownPullConsumer.restype = _CStatus
-dll.SetPullConsumerGroupID.argtypes = [c_void_p, c_char_p]
-dll.SetPullConsumerGroupID.restype = _CStatus
-dll.GetPullConsumerGroupID.argtypes = [c_void_p]
-dll.GetPullConsumerGroupID.restype = c_char_p
-dll.SetPullConsumerNameServerAddress.argtypes = [c_void_p, c_char_p]
-dll.SetPullConsumerNameServerAddress.restype = _CStatus
-dll.SetPullConsumerNameServerDomain.argtypes = [c_void_p, c_char_p]
-dll.SetPullConsumerNameServerDomain.restype = _CStatus
-dll.SetPullConsumerSessionCredentials.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
-dll.SetPullConsumerSessionCredentials.restype = _CStatus
-dll.SetPullConsumerLogPath.argtypes = [c_void_p, c_char_p]
-dll.SetPullConsumerLogPath.restype = _CStatus
-dll.SetPullConsumerLogFileNumAndSize.argtypes = [c_void_p, c_int, c_long]
-dll.SetPullConsumerLogFileNumAndSize.restype = _CStatus
-dll.SetPullConsumerLogLevel.argtypes = [c_void_p, _CLogLevel]
-dll.SetPullConsumerLogLevel.restype = _CStatus
-dll.FetchSubscriptionMessageQueues.argtypes = [c_void_p, c_char_p, POINTER(POINTER(_CMessageQueue)), POINTER(c_int)]
-dll.FetchSubscriptionMessageQueues.restype = _CStatus
-dll.ReleaseSubscriptionMessageQueue.argtypes = [POINTER(_CMessageQueue)]
-dll.ReleaseSubscriptionMessageQueue.restype = _CStatus
-dll.Pull.argtypes = [c_void_p, POINTER(_CMessageQueue), c_char_p, c_longlong, c_int]
-dll.Pull.restype = _CPullResult
-dll.ReleasePullResult.argtypes = [_CPullResult]
-dll.ReleasePullResult.restype = _CStatus
-
# Push Consumer
MSG_CALLBACK_FUNC = ctypes.CFUNCTYPE(c_int, c_void_p, c_void_p)
dll.CreatePushConsumer.argtypes = [c_char_p]
diff --git a/tests/conftest.py b/tests/conftest.py
index 7a76032..f761496 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
import pytest
-from rocketmq.client import Producer, PushConsumer, PullConsumer
+from rocketmq.client import Producer, PushConsumer
# HACK: It's buggy, don't call it in test case for now
@@ -23,11 +23,3 @@
yield consumer
consumer.shutdown()
-
-@pytest.fixture(scope='function')
-def pull_consumer():
- consumer = PullConsumer('testGroup')
- consumer.set_namesrv_addr('127.0.0.1:9876')
- consumer.start()
- yield consumer
- consumer.shutdown()
diff --git a/tests/test_consumer.py b/tests/test_consumer.py
index fb408ab..89bac96 100644
--- a/tests/test_consumer.py
+++ b/tests/test_consumer.py
@@ -18,13 +18,6 @@
assert ret.status == SendStatus.OK
-def test_pull_consumer(producer, pull_consumer):
- _send_test_msg(producer)
- time.sleep(5)
- msg = next(pull_consumer.pull('test'))
- assert msg.body.decode('utf-8') == 'XXXX'
-
-
def test_push_consumer_no_subscription_start_fail(push_consumer):
with pytest.raises(PushConsumerStartFailed):
push_consumer.start()