Add MessageIdCodec class (#530)
* add MessageIdCodec class
* add license
* fix style issue
* rename message_id_codec file
* fix process id overflow issue
* Update python/client/message/message_id_codec.py
* Fix style issue
---------
Co-authored-by: Aaron Ai <yangkun.ayk@gmail.com>
diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml
index 4524a78..2fd44c0 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -15,7 +15,7 @@
- run: python -m pip install flake8
- name: flake8
run: |
- flake8 --ignore=E501 --exclude python/protocol python
+ flake8 --ignore=E501,W503 --exclude python/protocol python
isort:
runs-on: ubuntu-latest
steps:
diff --git a/python/client/message/message_id_codec.py b/python/client/message/message_id_codec.py
new file mode 100644
index 0000000..0a52c83
--- /dev/null
+++ b/python/client/message/message_id_codec.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+ __MESSAGE_ID_VERSION_V1 = "01"
+
+ @staticmethod
+ def __get_process_fixed_string():
+ mac = uuid.getnode()
+ mac = format(mac, "012x")
+ mac_bytes = bytes.fromhex(mac[-12:])
+ pid = os.getpid() % 65536
+ pid_bytes = pid.to_bytes(2, "big")
+ return mac_bytes.hex().upper() + pid_bytes.hex().upper()
+
+ @staticmethod
+ def __get_seconds_since_custom_epoch():
+ custom_epoch = datetime(2021, 1, 1, tzinfo=timezone.utc)
+ now = datetime.now(timezone.utc)
+ return int((now - custom_epoch).total_seconds())
+
+ __PROCESS_FIXED_STRING_V1 = __get_process_fixed_string()
+ __SECONDS_SINCE_CUSTOM_EPOCH = __get_seconds_since_custom_epoch()
+ __SECONDS_START_TIMESTAMP = int(time.time())
+
+ @staticmethod
+ def __delta_seconds():
+ return (
+ int(time.time())
+ - MessageIdCodec.__SECONDS_START_TIMESTAMP
+ + MessageIdCodec.__SECONDS_SINCE_CUSTOM_EPOCH
+ )
+
+ @staticmethod
+ def __int_to_bytes_with_big_endian(number: int, min_bytes: int):
+ num_bytes = max(math.ceil(number.bit_length() / 8), min_bytes)
+ return number.to_bytes(num_bytes, "big")
+
+ __SEQUENCE = 0
+ __SEQUENCE_LOCK = threading.Lock()
+
+ @staticmethod
+ def __get_and_increment_sequence():
+ with MessageIdCodec.__SEQUENCE_LOCK:
+ temp = MessageIdCodec.__SEQUENCE
+ MessageIdCodec.__SEQUENCE += 1
+ return temp
+
+ @staticmethod
+ def next_message_id():
+ seconds = MessageIdCodec.__delta_seconds()
+ seconds_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(seconds, 4)[-4:]
+ sequence_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(
+ MessageIdCodec.__get_and_increment_sequence(), 4
+ )[-4:]
+ return (
+ MessageIdCodec.__MESSAGE_ID_VERSION_V1
+ + MessageIdCodec.__PROCESS_FIXED_STRING_V1
+ + seconds_bytes.hex().upper()
+ + sequence_bytes.hex().upper()
+ )
diff --git a/python/client/rpc_client.py b/python/client/rpc_client.py
index cc1dfd4..1ed9b5e 100644
--- a/python/client/rpc_client.py
+++ b/python/client/rpc_client.py
@@ -26,15 +26,19 @@
def __init__(self, endpoints, ssl_enabled):
channel_options = [
- ('grpc.max_send_message_length', -1),
- ('grpc.max_receive_message_length', -1),
- ('grpc.connect_timeout_ms', self.CONNECT_TIMEOUT_MILLIS),
+ ("grpc.max_send_message_length", -1),
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.connect_timeout_ms", self.CONNECT_TIMEOUT_MILLIS),
]
if ssl_enabled:
ssl_credentials = ssl_channel_credentials()
- self.channel = aio.secure_channel(endpoints.getGrpcTarget(), ssl_credentials, options=channel_options)
+ self.channel = aio.secure_channel(
+ endpoints.getGrpcTarget(), ssl_credentials, options=channel_options
+ )
else:
- self.channel = insecure_channel(endpoints.getGrpcTarget(), options=channel_options)
+ self.channel = insecure_channel(
+ endpoints.getGrpcTarget(), options=channel_options
+ )
self.activity_nano_time = time.monotonic_ns()
@@ -42,7 +46,9 @@
self.channel.close()
def idle_duration(self):
- return timedelta(microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000)
+ return timedelta(
+ microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000
+ )
async def query_route(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
@@ -81,7 +87,9 @@
async def forward_message_to_dead_letter_queue(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
stub = service.MessagingServiceStub(self.channel)
- return await stub.ForwardMessageToDeadLetterQueue(request, timeout=timeout_seconds)
+ return await stub.ForwardMessageToDeadLetterQueue(
+ request, timeout=timeout_seconds
+ )
async def end_transaction(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()