Execute python_build.yml according to the result of path-filter (#511)
* Execute python_build.yml according to the result of path-filter
* Ban liskin/gh-problem-matcher-wrap@d8afa2cfb66dd3f982b1950429e652bc14d0d7d2
* Fix syntax error
* Support black
* Fix trailing whitespace
* Fix isort
* Fix format issue
* Ignore E501 in flake8
* Delete useless main.py
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index e0a0576..4103149 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -16,6 +16,7 @@
csharp: ${{ steps.filter.outputs.csharp }}
php: ${{ steps.filter.outputs.php }}
rust: ${{ steps.filter.outputs.rust }}
+ python: ${{ steps.filter.outputs.python }}
steps:
- uses: actions/checkout@v2
- uses: dorny/paths-filter@v2
@@ -34,6 +35,8 @@
- 'php/**'
rust:
- 'rust/**'
+ python:
+ - 'python/**'
java-build:
needs: [paths-filter]
if: ${{ needs.paths-filter.outputs.java == 'true' }}
@@ -59,6 +62,10 @@
needs: [paths-filter]
if: ${{ needs.paths-filter.outputs.rust == 'true' }}
uses: ./.github/workflows/rust_build.yml
+ python-build:
+ needs: [paths-filter]
+ if: ${{ needs.paths-filter.outputs.python == 'true' }}
+ uses: ./.github/workflows/python_build.yml
build-result:
runs-on: ubuntu-latest
needs: [java-build, cpp-build, csharp-build, golang-build, php-build, rust-build]
diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml
index 7d3436a..4524a78 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -14,11 +14,8 @@
python-version: 3.7
- run: python -m pip install flake8
- name: flake8
- # Pinned to v2.0.0.
- uses: liskin/gh-problem-matcher-wrap@d8afa2cfb66dd3f982b1950429e652bc14d0d7d2
- with:
- linters: flake8
- run: flake8 --exclude python/protocol python
+ run: |
+ flake8 --ignore=E501 --exclude python/protocol python
isort:
runs-on: ubuntu-latest
steps:
@@ -30,18 +27,18 @@
python-version: 3.7
- run: python -m pip install isort
- name: isort
- # Pinned to v2.0.0.
- uses: liskin/gh-problem-matcher-wrap@d8afa2cfb66dd3f982b1950429e652bc14d0d7d2
+ run: |
+ isort --check --diff --skip python/protocol python
+ black:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ - name: Set up Python
+ uses: actions/setup-python@v4
with:
- linters: isort
- run: isort --check --diff --skip python/protocol python
- # black:
- # runs-on: ubuntu-latest
- # steps:
- # - name: Checkout
- # uses: actions/checkout@v3
- # - name: black
- # uses: psf/black@stable
- # with:
- # src: "./python"
- # options: --exclude "./python/protocol/"
+ python-version: 3.7
+ - run: python -m pip install black
+ - name: black
+ run: |
+ black --exclude "./python/protocol/" python
diff --git a/python/client/rpc_client.py b/python/client/rpc_client.py
index 9cd9b46..cc1dfd4 100644
--- a/python/client/rpc_client.py
+++ b/python/client/rpc_client.py
@@ -4,122 +4,95 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from grpc import ssl_channel_credentials, insecure_channel
-from datetime import timedelta
import time
-from grpc_interceptor import ClientInterceptor, ClientCallDetails
-import protocol.service_pb2 as pb2
-import protocol.service_pb2_grpc as servicegrpc
+from datetime import timedelta
-class MetadataInterceptor(ClientInterceptor):
- def __init__(self, metadata):
- self.metadata = metadata
-
- def intercept(self, request, metadata, client_call_details, next):
- metadata.update(self.metadata)
- new_client_call_details = ClientCallDetails(
- client_call_details.method,
- client_call_details.timeout,
- metadata,
- client_call_details.credentials,
- client_call_details.wait_for_ready,
- client_call_details.compression,
- )
- return next(request, new_client_call_details)
+import protocol.service_pb2_grpc as service
+from grpc import aio, insecure_channel, ssl_channel_credentials
class RpcClient:
- CONNECT_TIMEOUT_MILLIS = 3*1000
- GRPC_MAX_MESSAGE_SIZE = 2*31 - 1
- def __init__(self, endpoints, sslEnabled):
+ CONNECT_TIMEOUT_MILLIS = 3 * 1000
+ GRPC_MAX_MESSAGE_SIZE = 2 * 31 - 1
+
+ def __init__(self, endpoints, ssl_enabled):
channel_options = [
('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
- ('grpc.keepalive_time_ms', 1000),
- ('grpc.keepalive_timeout_ms', 5000),
- ('grpc.keepalive_permit_without_calls', True),
('grpc.connect_timeout_ms', self.CONNECT_TIMEOUT_MILLIS),
]
- if sslEnabled:
- ssl_creds = ssl_channel_credentials()
- self.channel = Channel(endpoints.getGrpcTarget(), ssl_creds, options=channel_options)
+ if ssl_enabled:
+ ssl_credentials = ssl_channel_credentials()
+ self.channel = aio.secure_channel(endpoints.getGrpcTarget(), ssl_credentials, options=channel_options)
else:
self.channel = insecure_channel(endpoints.getGrpcTarget(), options=channel_options)
- self.activityNanoTime = time.monotonic_ns()
-
- def get_stub(self, metadata):
- interceptor = MetadataInterceptor(metadata)
-
- interceptor_channel = grpc.intercept_channel(self.channel, interceptor)
- stub = servicegrpc.MessagingServiceStub(interceptor_channel)
- return stub
+ self.activity_nano_time = time.monotonic_ns()
def __del__(self):
self.channel.close()
-
- def idle_duration(activity_nano_time):
- return timedelta(microseconds=(time.monotonic_ns() - activity_nano_time) / 1000)
-
- async def query_route(self, metadata, request, duration):
- self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.QueryRoute(request, timeout=duration)
- async def heartbeat(self, metadata, request, duration):
- self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.Heartbeat(request, timeout=duration)
+ def idle_duration(self):
+ return timedelta(microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000)
- async def send_message(self, metadata, request, duration):
+ async def query_route(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.SendMessage(request, timeout=duration)
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.QueryRoute(request, timeout=timeout_seconds)
- async def query_assignment(self, metadata, request, duration):
+ async def heartbeat(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.QueryAssignment(request, timeout=duration)
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.Heartbeat(request, timeout=timeout_seconds)
- # TODO: Not yet imeplemented
- async def receive_message(self, metadata, request, duration):
+ async def send_message(self, request, timeout_seconds: int):
+ self.activity_nano_time = time.monotonic_ns()
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.SendMessage(request, timeout=timeout_seconds)
+
+ async def query_assignment(self, request, timeout_seconds: int):
+ self.activity_nano_time = time.monotonic_ns()
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.QueryAssignment(request, timeout=timeout_seconds)
+
+ # TODO: Not yet implemented
+ async def receive_message(self, metadata, request, timeout_seconds: int):
pass
- async def ack_message(self, metadata, request, duration):
+ async def ack_message(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.AckMessage(request, timeout=duration)
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.AckMessage(request, timeout=timeout_seconds)
- async def change_invisible_duration(self, metadata, request, duration):
+ async def change_invisible_duration(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.ChangeInvisibleDuration(request, timeout=duration)
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.ChangeInvisibleDuration(request, timeout=timeout_seconds)
- async def forward_message_to_dead_letter_queue(self, metadata, request, duration):
+ async def forward_message_to_dead_letter_queue(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.ForwardMessageToDeadLetterQueue(request, timeout=duration)
-
- async def endTransaction(self, metadata, request, duration):
- self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.EndTransaction(request, timeout=duration)
-
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.ForwardMessageToDeadLetterQueue(request, timeout=timeout_seconds)
- async def notifyClientTermination(self, metadata, request, duration):
+ async def end_transaction(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
- stub = self.get_stub(self, metadata)
- return await stub.NotifyClientTermination(request, timeout=duration)
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.EndTransaction(request, timeout=timeout_seconds)
+ async def notify_client_termination(self, request, timeout_seconds: int):
+ self.activity_nano_time = time.monotonic_ns()
+ stub = service.MessagingServiceStub(self.channel)
+ return await stub.NotifyClientTermination(request, timeout=timeout_seconds)
+
+ # TODO: Not yet implemented
async def telemetry(self, metadata, duration, response_observer):
- stub = self.get_stub(self, metadata)
- return await stub.Telemetry(response_observer, timeout=duration)
\ No newline at end of file
+ pass