Add MessageIdCodec class (#530)

* add MessageIdCodec class

* add license

* fix style issue

* rename message_id_codec file

* fix process id overflow issue

* Update python/client/message/message_id_codec.py

* Fix style issue

---------

Co-authored-by: Aaron Ai <yangkun.ayk@gmail.com>
diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml
index 4524a78..2fd44c0 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -15,7 +15,7 @@
       - run: python -m pip install flake8
       - name: flake8
         run: |
-          flake8 --ignore=E501 --exclude python/protocol python
+          flake8 --ignore=E501,W503 --exclude python/protocol python
   isort:
     runs-on: ubuntu-latest
     steps:
diff --git a/python/client/message/message_id_codec.py b/python/client/message/message_id_codec.py
new file mode 100644
index 0000000..0a52c83
--- /dev/null
+++ b/python/client/message/message_id_codec.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+    __MESSAGE_ID_VERSION_V1 = "01"
+
+    @staticmethod
+    def __get_process_fixed_string():
+        mac = uuid.getnode()
+        mac = format(mac, "012x")
+        mac_bytes = bytes.fromhex(mac[-12:])
+        pid = os.getpid() % 65536
+        pid_bytes = pid.to_bytes(2, "big")
+        return mac_bytes.hex().upper() + pid_bytes.hex().upper()
+
+    @staticmethod
+    def __get_seconds_since_custom_epoch():
+        custom_epoch = datetime(2021, 1, 1, tzinfo=timezone.utc)
+        now = datetime.now(timezone.utc)
+        return int((now - custom_epoch).total_seconds())
+
+    __PROCESS_FIXED_STRING_V1 = __get_process_fixed_string()
+    __SECONDS_SINCE_CUSTOM_EPOCH = __get_seconds_since_custom_epoch()
+    __SECONDS_START_TIMESTAMP = int(time.time())
+
+    @staticmethod
+    def __delta_seconds():
+        return (
+            int(time.time())
+            - MessageIdCodec.__SECONDS_START_TIMESTAMP
+            + MessageIdCodec.__SECONDS_SINCE_CUSTOM_EPOCH
+        )
+
+    @staticmethod
+    def __int_to_bytes_with_big_endian(number: int, min_bytes: int):
+        num_bytes = max(math.ceil(number.bit_length() / 8), min_bytes)
+        return number.to_bytes(num_bytes, "big")
+
+    __SEQUENCE = 0
+    __SEQUENCE_LOCK = threading.Lock()
+
+    @staticmethod
+    def __get_and_increment_sequence():
+        with MessageIdCodec.__SEQUENCE_LOCK:
+            temp = MessageIdCodec.__SEQUENCE
+            MessageIdCodec.__SEQUENCE += 1
+            return temp
+
+    @staticmethod
+    def next_message_id():
+        seconds = MessageIdCodec.__delta_seconds()
+        seconds_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(seconds, 4)[-4:]
+        sequence_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(
+            MessageIdCodec.__get_and_increment_sequence(), 4
+        )[-4:]
+        return (
+            MessageIdCodec.__MESSAGE_ID_VERSION_V1
+            + MessageIdCodec.__PROCESS_FIXED_STRING_V1
+            + seconds_bytes.hex().upper()
+            + sequence_bytes.hex().upper()
+        )
diff --git a/python/client/rpc_client.py b/python/client/rpc_client.py
index cc1dfd4..1ed9b5e 100644
--- a/python/client/rpc_client.py
+++ b/python/client/rpc_client.py
@@ -26,15 +26,19 @@
 
     def __init__(self, endpoints, ssl_enabled):
         channel_options = [
-            ('grpc.max_send_message_length', -1),
-            ('grpc.max_receive_message_length', -1),
-            ('grpc.connect_timeout_ms', self.CONNECT_TIMEOUT_MILLIS),
+            ("grpc.max_send_message_length", -1),
+            ("grpc.max_receive_message_length", -1),
+            ("grpc.connect_timeout_ms", self.CONNECT_TIMEOUT_MILLIS),
         ]
         if ssl_enabled:
             ssl_credentials = ssl_channel_credentials()
-            self.channel = aio.secure_channel(endpoints.getGrpcTarget(), ssl_credentials, options=channel_options)
+            self.channel = aio.secure_channel(
+                endpoints.getGrpcTarget(), ssl_credentials, options=channel_options
+            )
         else:
-            self.channel = insecure_channel(endpoints.getGrpcTarget(), options=channel_options)
+            self.channel = insecure_channel(
+                endpoints.getGrpcTarget(), options=channel_options
+            )
 
         self.activity_nano_time = time.monotonic_ns()
 
@@ -42,7 +46,9 @@
         self.channel.close()
 
     def idle_duration(self):
-        return timedelta(microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000)
+        return timedelta(
+            microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000
+        )
 
     async def query_route(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
@@ -81,7 +87,9 @@
     async def forward_message_to_dead_letter_queue(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
         stub = service.MessagingServiceStub(self.channel)
-        return await stub.ForwardMessageToDeadLetterQueue(request, timeout=timeout_seconds)
+        return await stub.ForwardMessageToDeadLetterQueue(
+            request, timeout=timeout_seconds
+        )
 
     async def end_transaction(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()