Execute python_build.yml according to the result of path-filter (#511)

* Execute python_build.yml according to the result of path-filter

* Ban liskin/gh-problem-matcher-wrap@d8afa2cfb66dd3f982b1950429e652bc14d0d7d2

* Fix syntax error

* Support black

* Fix trailing whitespace

* Fix isort

* Fix format issue

* Ignore E501 in flake8

* Delete useless main.py
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index e0a0576..4103149 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -16,6 +16,7 @@
       csharp: ${{ steps.filter.outputs.csharp }}
       php: ${{ steps.filter.outputs.php }}
       rust: ${{ steps.filter.outputs.rust }}
+      python: ${{ steps.filter.outputs.python }}
     steps:
       - uses: actions/checkout@v2
       - uses: dorny/paths-filter@v2
@@ -34,6 +35,8 @@
               - 'php/**'
             rust:
               - 'rust/**'
+            python:
+              - 'python/**'
   java-build:
     needs: [paths-filter]
     if: ${{ needs.paths-filter.outputs.java == 'true' }}
@@ -59,6 +62,10 @@
     needs: [paths-filter]
     if: ${{ needs.paths-filter.outputs.rust == 'true' }}
     uses: ./.github/workflows/rust_build.yml
+  python-build:
+    needs: [paths-filter]
+    if: ${{ needs.paths-filter.outputs.python == 'true' }}
+    uses: ./.github/workflows/python_build.yml
   build-result:
     runs-on: ubuntu-latest
     needs: [java-build, cpp-build, csharp-build, golang-build, php-build, rust-build]
diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml
index 7d3436a..4524a78 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -14,11 +14,8 @@
           python-version: 3.7
       - run: python -m pip install flake8
       - name: flake8
-        # Pinned to v2.0.0.
-        uses: liskin/gh-problem-matcher-wrap@d8afa2cfb66dd3f982b1950429e652bc14d0d7d2
-        with:
-          linters: flake8
-          run: flake8  --exclude python/protocol python
+        run: |
+          flake8 --ignore=E501 --exclude python/protocol python
   isort:
     runs-on: ubuntu-latest
     steps:
@@ -30,18 +27,18 @@
           python-version: 3.7
       - run: python -m pip install isort
       - name: isort
-        # Pinned to v2.0.0.
-        uses: liskin/gh-problem-matcher-wrap@d8afa2cfb66dd3f982b1950429e652bc14d0d7d2
+        run: |
+          isort --check --diff --skip python/protocol python
+  black:
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v3
+      - name: Set up Python
+        uses: actions/setup-python@v4
         with:
-          linters: isort
-          run: isort --check --diff --skip python/protocol python
-  # black:
-  #   runs-on: ubuntu-latest
-  #   steps:
-  #     - name: Checkout
-  #       uses: actions/checkout@v3
-  #     - name: black
-  #       uses: psf/black@stable
-  #       with:
-  #         src: "./python"
-  #         options: --exclude "./python/protocol/"
+          python-version: 3.7
+      - run: python -m pip install black
+      - name: black
+        run: |
+          black --exclude "./python/protocol/" python
diff --git a/python/client/rpc_client.py b/python/client/rpc_client.py
index 9cd9b46..cc1dfd4 100644
--- a/python/client/rpc_client.py
+++ b/python/client/rpc_client.py
@@ -4,122 +4,95 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #     http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from grpc import ssl_channel_credentials, insecure_channel
-from datetime import timedelta
 import time
-from grpc_interceptor import ClientInterceptor,  ClientCallDetails
-import protocol.service_pb2 as pb2
-import protocol.service_pb2_grpc as servicegrpc
+from datetime import timedelta
 
-class MetadataInterceptor(ClientInterceptor):
-    def __init__(self, metadata):
-        self.metadata = metadata
-
-    def intercept(self, request, metadata, client_call_details, next):
-        metadata.update(self.metadata)
-        new_client_call_details = ClientCallDetails(
-            client_call_details.method,
-            client_call_details.timeout,
-            metadata,
-            client_call_details.credentials,
-            client_call_details.wait_for_ready,
-            client_call_details.compression,
-        )
-        return next(request, new_client_call_details)
+import protocol.service_pb2_grpc as service
+from grpc import aio, insecure_channel, ssl_channel_credentials
 
 
 class RpcClient:
-    CONNECT_TIMEOUT_MILLIS = 3*1000
-    GRPC_MAX_MESSAGE_SIZE = 2*31 - 1
-    def __init__(self, endpoints, sslEnabled):
+    CONNECT_TIMEOUT_MILLIS = 3 * 1000
+    GRPC_MAX_MESSAGE_SIZE = 2 * 31 - 1
+
+    def __init__(self, endpoints, ssl_enabled):
         channel_options = [
             ('grpc.max_send_message_length', -1),
             ('grpc.max_receive_message_length', -1),
-            ('grpc.keepalive_time_ms', 1000),
-            ('grpc.keepalive_timeout_ms', 5000),
-            ('grpc.keepalive_permit_without_calls', True),
             ('grpc.connect_timeout_ms', self.CONNECT_TIMEOUT_MILLIS),
         ]
-        if sslEnabled:
-            ssl_creds = ssl_channel_credentials()
-            self.channel = Channel(endpoints.getGrpcTarget(), ssl_creds, options=channel_options)
+        if ssl_enabled:
+            ssl_credentials = ssl_channel_credentials()
+            self.channel = aio.secure_channel(endpoints.getGrpcTarget(), ssl_credentials, options=channel_options)
         else:
             self.channel = insecure_channel(endpoints.getGrpcTarget(), options=channel_options)
 
-        self.activityNanoTime = time.monotonic_ns()
-
-    def get_stub(self, metadata):
-        interceptor = MetadataInterceptor(metadata)
-
-        interceptor_channel = grpc.intercept_channel(self.channel, interceptor)
-        stub = servicegrpc.MessagingServiceStub(interceptor_channel)
-        return stub
+        self.activity_nano_time = time.monotonic_ns()
 
     def __del__(self):
         self.channel.close()
-    
-    def idle_duration(activity_nano_time):
-        return timedelta(microseconds=(time.monotonic_ns() - activity_nano_time) / 1000)
-    
-    async def query_route(self, metadata, request, duration):
-        self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.QueryRoute(request, timeout=duration)
 
-    async def heartbeat(self, metadata, request, duration):
-        self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.Heartbeat(request, timeout=duration)
+    def idle_duration(self):
+        return timedelta(microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000)
 
-    async def send_message(self, metadata, request, duration):
+    async def query_route(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.SendMessage(request, timeout=duration)
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.QueryRoute(request, timeout=timeout_seconds)
 
-    async def query_assignment(self, metadata, request, duration):
+    async def heartbeat(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.QueryAssignment(request, timeout=duration)
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.Heartbeat(request, timeout=timeout_seconds)
 
-    # TODO: Not yet imeplemented
-    async def receive_message(self, metadata, request, duration):
+    async def send_message(self, request, timeout_seconds: int):
+        self.activity_nano_time = time.monotonic_ns()
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.SendMessage(request, timeout=timeout_seconds)
+
+    async def query_assignment(self, request, timeout_seconds: int):
+        self.activity_nano_time = time.monotonic_ns()
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.QueryAssignment(request, timeout=timeout_seconds)
+
+    # TODO: Not yet implemented
+    async def receive_message(self, metadata, request, timeout_seconds: int):
         pass
 
-    async def ack_message(self, metadata, request, duration):
+    async def ack_message(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.AckMessage(request, timeout=duration)
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.AckMessage(request, timeout=timeout_seconds)
 
-    async def change_invisible_duration(self, metadata, request, duration):
+    async def change_invisible_duration(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.ChangeInvisibleDuration(request, timeout=duration)
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.ChangeInvisibleDuration(request, timeout=timeout_seconds)
 
-    async def forward_message_to_dead_letter_queue(self, metadata, request, duration):
+    async def forward_message_to_dead_letter_queue(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.ForwardMessageToDeadLetterQueue(request, timeout=duration)
-    
-    async def endTransaction(self, metadata, request, duration):
-        self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.EndTransaction(request, timeout=duration)
-    
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.ForwardMessageToDeadLetterQueue(request, timeout=timeout_seconds)
 
-    async def notifyClientTermination(self, metadata, request, duration):
+    async def end_transaction(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
-        stub = self.get_stub(self, metadata)
-        return await stub.NotifyClientTermination(request, timeout=duration)
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.EndTransaction(request, timeout=timeout_seconds)
 
+    async def notify_client_termination(self, request, timeout_seconds: int):
+        self.activity_nano_time = time.monotonic_ns()
+        stub = service.MessagingServiceStub(self.channel)
+        return await stub.NotifyClientTermination(request, timeout=timeout_seconds)
+
+    # TODO: Not yet implemented
     async def telemetry(self, metadata, duration, response_observer):
-        stub = self.get_stub(self, metadata)
-        return await stub.Telemetry(response_observer, timeout=duration)
\ No newline at end of file
+        pass