Support ssl. (#157)
* Support ssl.
* Added compatibility code for the SSL interface.
---------
Co-authored-by: hexueyuan <hexueyuan@baidu.com>
diff --git a/rocketmq/client.py b/rocketmq/client.py
index fecac9c..bb8e9d4 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -262,6 +262,13 @@
def set_message_trace(self, message_trace):
ffi_check(dll.SetProducerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE))
+ def set_ssl_enable(self, enable):
+ ssl_enable_code = 1 if enable else 0
+ ffi_check(dll.SetProducerSsl(self._handle, ssl_enable_code))
+
+ def set_ssl_property_file(self, file_path):
+ ffi_check(dll.SetProducerSslPropertyFile(self._handle, _to_bytes(file_path)))
+
def start(self):
ffi_check(dll.StartProducer(self._handle))
@@ -401,6 +408,13 @@
_to_bytes(channel)
))
+ def set_ssl_enable(self, enable):
+ ssl_enable_code = 1 if enable else 0
+ ffi_check(dll.SetPushConsumerSsl(self._handle, ssl_enable_code))
+
+ def set_ssl_property_file(self, file_path):
+ ffi_check(dll.SetPushConsumerSslPropertyFile(self._handle, _to_bytes(file_path)))
+
def subscribe(self, topic, callback, expression='*'):
def _on_message(consumer, msg):
exc = None
diff --git a/rocketmq/ffi.py b/rocketmq/ffi.py
index b4f6a11..0eca2ca 100644
--- a/rocketmq/ffi.py
+++ b/rocketmq/ffi.py
@@ -207,6 +207,13 @@
dll.SetProducerMaxMessageSize.restype = _CStatus
dll.SetProducerMessageTrace.argtypes = [c_void_p, TraceModel]
dll.SetProducerMessageTrace.restype = _CStatus
+try:
+ dll.SetProducerSsl.argtypes = [c_void_p, c_int]
+ dll.SetProducerSsl.restype = _CStatus
+ dll.SetProducerSslPropertyFile.argtypes = [c_void_p, c_char_p]
+ dll.SetProducerSslPropertyFile.restype = _CStatus
+except AttributeError:
+ pass
dll.SendMessageSync.argtypes = [c_void_p, c_void_p, POINTER(_CSendResult)]
dll.SendMessageSync.restype = _CStatus
dll.SendMessageOneway.argtypes = [c_void_p, c_void_p]
@@ -271,6 +278,13 @@
dll.SetPushConsumerLogLevel.restype = _CStatus
dll.SetPushConsumerMessageTrace.argtypes = [c_void_p, TraceModel]
dll.SetPushConsumerMessageTrace.restype = _CStatus
+try:
+ dll.SetPushConsumerSsl.argtypes = [c_void_p, c_int]
+ dll.SetPushConsumerSsl.restype = _CStatus
+ dll.SetPushConsumerSslPropertyFile.argtypes = [c_void_p, c_char_p]
+ dll.SetPushConsumerSslPropertyFile.restype = _CStatus
+except AttributeError:
+ pass
# Misc
dll.GetLatestErrorMessage.argtypes = []
diff --git a/samples/consumer.py b/samples/consumer.py
index b95da79..bf36c4a 100644
--- a/samples/consumer.py
+++ b/samples/consumer.py
@@ -27,7 +27,9 @@
def start_consume_message():
consumer = PushConsumer('consumer_group')
consumer.set_name_server_address('127.0.0.1:9876')
- consumer.subscribe('TopicTest', callback)
+ consumer.subscribe('BenchmarkTest', callback)
+ # consumer.set_ssl_enable(True)
+ # consumer.set_ssl_property_file("/etc/rocketmq/tls.properties")
print ('start consume message')
consumer.start()
diff --git a/samples/producer.py b/samples/producer.py
index f69534c..4869b77 100644
--- a/samples/producer.py
+++ b/samples/producer.py
@@ -115,6 +115,23 @@
time.sleep(3600)
+def send_message_with_ssl(count):
+ producer = Producer(gid)
+ producer.set_name_server_address(name_srv)
+ producer.set_ssl_enable(True)
+ producer.set_ssl_property_file("/etc/rocketmq/tls.properties")
+ producer.start()
+ for n in range(count):
+ msg = create_message()
+ producer.start()
+ for n in range(count):
+ msg = create_message()
+ ret = producer.send_sync(msg)
+ print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
+ print ('send sync message done')
+ producer.shutdown()
+
+
if __name__ == '__main__':
send_message_sync(10)