blob: 498fc6d9fef2f8162e071b88e88e42b0f7ed4346 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import List
from protocol.definition_pb2 import Broker as ProtoBroker
from protocol.definition_pb2 import Encoding as ProtoEncoding
from protocol.definition_pb2 import MessageQueue as ProtoMessageQueue
from protocol.definition_pb2 import MessageType as ProtoMessageType
from protocol.definition_pb2 import Permission as ProtoPermission
from protocol.definition_pb2 import Resource as ProtoResource
from rocketmq.protocol import definition_pb2
from rocketmq.rpc_client import Endpoints
class Encoding(Enum):
"""Enumeration of supported encoding types."""
IDENTITY = 0
GZIP = 1
class EncodingHelper:
"""Helper class for converting encoding types to protobuf."""
@staticmethod
def to_protobuf(mq_encoding):
"""Convert encoding type to protobuf.
:param mq_encoding: The encoding to be converted.
:return: The corresponding protobuf encoding.
"""
if mq_encoding == Encoding.IDENTITY:
return ProtoEncoding.IDENTITY
elif mq_encoding == Encoding.GZIP:
return ProtoEncoding.GZIP
class Broker:
"""Represent a broker entity."""
def __init__(self, broker):
self.name = broker.name
self.id = broker.id
self.endpoints = Endpoints(broker.endpoints)
def to_protobuf(self):
"""Convert the broker to its protobuf representation.
:return: The protobuf representation of the broker.
"""
return ProtoBroker(
name=self.name, id=self.id, endpoints=self.endpoints.to_protobuf()
)
class Resource:
"""Represent a resource entity."""
def __init__(self, name=None, resource=None):
"""Initialize a resource.
:param name: The name of the resource.
:param resource: The resource object.
"""
if resource is not None:
self.namespace = resource.resource_namespace
self.name = resource.name
else:
self.namespace = ""
self.name = name
def to_protobuf(self):
"""Convert the resource to its protobuf representation.
:return: The protobuf representation of the resource.
"""
resource = ProtoResource()
resource.name = self.name
resource.resource_namespace = self.namespace
return resource
def __str__(self):
return f"{self.namespace}.{self.name}" if self.namespace else self.name
class Permission(Enum):
"""Enumeration of supported permission types."""
NONE = 0
READ = 1
WRITE = 2
READ_WRITE = 3
class PermissionHelper:
"""Helper class for converting permission types to protobuf and vice versa."""
@staticmethod
def from_protobuf(permission):
"""Convert protobuf permission to Permission enum.
:param permission: The protobuf permission to be converted.
:return: The corresponding Permission enum.
"""
if permission == ProtoPermission.READ:
return Permission.READ
elif permission == ProtoPermission.WRITE:
return Permission.WRITE
elif permission == ProtoPermission.READ_WRITE:
return Permission.READ_WRITE
elif permission == ProtoPermission.NONE:
return Permission.NONE
else:
pass
@staticmethod
def to_protobuf(permission):
"""Convert Permission enum to protobuf permission.
:param permission: The Permission enum to be converted.
:return: The corresponding protobuf permission.
"""
if permission == Permission.READ:
return ProtoPermission.READ
elif permission == Permission.WRITE:
return ProtoPermission.WRITE
elif permission == Permission.READ_WRITE:
return ProtoPermission.READ_WRITE
else:
pass
@staticmethod
def is_writable(permission):
"""Check if the permission is writable.
:param permission: The Permission enum to be checked.
:return: True if the permission is writable, False otherwise.
"""
if permission in [Permission.WRITE, Permission.READ_WRITE]:
return True
else:
return False
@staticmethod
def is_readable(permission):
"""Check if the permission is readable.
:param permission: The Permission enum to be checked.
:return: True if the permission is readable, False otherwise.
"""
if permission in [Permission.READ, Permission.READ_WRITE]:
return True
else:
return False
class MessageType(Enum):
"""Enumeration of supported message types."""
NORMAL = 0
FIFO = 1
DELAY = 2
TRANSACTION = 3
class MessageTypeHelper:
"""Helper class for converting message types to protobuf and vice versa."""
@staticmethod
def from_protobuf(message_type):
"""Convert protobuf message type to MessageType enum.
:param message_type: The protobuf message type to be converted.
:return: The corresponding MessageType enum.
"""
if message_type == ProtoMessageType.NORMAL:
return MessageType.NORMAL
elif message_type == ProtoMessageType.FIFO:
return MessageType.FIFO
elif message_type == ProtoMessageType.DELAY:
return MessageType.DELAY
elif message_type == ProtoMessageType.TRANSACTION:
return MessageType.TRANSACTION
else:
pass
@staticmethod
def to_protobuf(message_type):
"""Convert MessageType enum to protobuf message type.
:param message_type: The MessageType enum to be converted.
:return: The corresponding protobuf message type.
"""
if message_type == MessageType.NORMAL:
return ProtoMessageType.NORMAL
elif message_type == MessageType.FIFO:
return ProtoMessageType.FIFO
elif message_type == MessageType.DELAY:
return ProtoMessageType.DELAY
elif message_type == MessageType.TRANSACTION:
return ProtoMessageType.TRANSACTION
else:
return ProtoMessageType.UNSPECIFIED
class MessageQueue:
"""A class that encapsulates a message queue entity."""
def __init__(self, message_queue):
"""Initialize a MessageQueue instance.
:param message_queue: The initial message queue to be encapsulated.
"""
self._topic_resource = Resource(message_queue.topic.name, message_queue.topic)
self.queue_id = message_queue.id
self.permission = PermissionHelper.from_protobuf(message_queue.permission)
self.accept_message_types = [
MessageTypeHelper.from_protobuf(mt)
for mt in message_queue.accept_message_types
]
self.broker = Broker(message_queue.broker)
@property
def topic(self):
"""The topic resource name.
:return: The name of the topic resource.
"""
return self._topic_resource.name
def __str__(self):
"""Get a string representation of the MessageQueue instance.
:return: A string that represents the MessageQueue instance.
"""
return f"{self.broker.name}.{self._topic_resource}.{self.queue_id}"
def to_protobuf(self):
"""Convert the MessageQueue instance to protobuf message queue.
:return: A protobuf message queue that represents the MessageQueue instance.
"""
message_types = [
MessageTypeHelper.to_protobuf(mt) for mt in self.accept_message_types
]
return ProtoMessageQueue(
topic=self._topic_resource.to_protobuf(),
id=self.queue_id,
permission=PermissionHelper.to_protobuf(self.permission),
broker=self.broker.to_protobuf(),
accept_message_types=message_types,
)
class TopicRouteData:
"""A class that encapsulates a list of message queues."""
def __init__(self, message_queues: List[definition_pb2.MessageQueue]):
"""Initialize a TopicRouteData instance.
:param message_queues: The initial list of message queues to be encapsulated.
"""
message_queue_list = []
for mq in message_queues:
message_queue_list.append(MessageQueue(mq))
self.__message_queue_list = message_queue_list
@property
def message_queues(self) -> List[MessageQueue]:
"""The list of MessageQueue instances.
:return: The list of MessageQueue instances that the TopicRouteData instance encapsulates.
"""
return self.__message_queue_list