blob: 6d1930088060661b8fe13eab3f189ff7b6586027 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
from google.protobuf.duration_pb2 import Duration
from rocketmq.filter_expression import ExpressionType
from rocketmq.log import logger
from rocketmq.protocol.definition_pb2 import \
FilterExpression as ProtoFilterExpression
from rocketmq.protocol.definition_pb2 import FilterType as ProtoFilterType
from rocketmq.protocol.definition_pb2 import Resource as ProtoResource
from rocketmq.protocol.definition_pb2 import Settings as ProtoSettings
from rocketmq.protocol.definition_pb2 import Subscription as ProtoSubscription
from rocketmq.protocol.definition_pb2 import \
SubscriptionEntry as ProtoSubscriptionEntry
from .settings import ClientType, ClientTypeHelper, Settings
# Assuming a simple representation of FilterExpression for the purpose of this example
class FilterExpression:
def __init__(self, type, expression):
self.Type = type
self.Expression = expression
class SimpleSubscriptionSettings(Settings):
def __init__(self, clientId, endpoints, consumerGroup, requestTimeout, longPollingTimeout,
subscriptionExpressions: Dict[str, FilterExpression]):
super().__init__(clientId, ClientType.SimpleConsumer, endpoints, None, requestTimeout)
self._group = consumerGroup # Simplified as string for now
self._longPollingTimeout = longPollingTimeout
self._subscriptionExpressions = subscriptionExpressions
def Sync(self, settings: ProtoSettings):
if not isinstance(settings, ProtoSettings):
logger.error(f"[Bug] Issued settings doesn't match with the client type, clientId={self.ClientId}, clientType={self.ClientType}")
def to_protobuf(self):
subscriptionEntries = []
for key, value in self._subscriptionExpressions.items():
topic = ProtoResource()
topic.name = key
subscriptionEntry = ProtoSubscriptionEntry()
filterExpression = ProtoFilterExpression()
if value.type == ExpressionType.Tag:
filterExpression.type = ProtoFilterType.TAG
elif value.type == ExpressionType.Sql92:
filterExpression.type = ProtoFilterType.SQL
else:
logger.warn(f"[Bug] Unrecognized filter type={value.Type} for simple consumer")
filterExpression.expression = value.expression
subscriptionEntry.topic.CopyFrom(topic)
subscriptionEntries.append(subscriptionEntry)
subscription = ProtoSubscription()
group = ProtoResource()
group.name = self._group
subscription.group.CopyFrom(group)
subscription.subscriptions.extend(subscriptionEntries)
duration_longPollingTimeout = Duration(seconds=self._longPollingTimeout)
subscription.long_polling_timeout.CopyFrom(duration_longPollingTimeout)
settings = super().to_protobuf()
settings.access_point.CopyFrom(self.Endpoints.to_protobuf()) # Assuming Endpoints has a to_protobuf method
settings.client_type = ClientTypeHelper.to_protobuf(self.ClientType)
settings.request_timeout.CopyFrom(Duration(seconds=int(self.RequestTimeout.total_seconds())))
settings.subscription.CopyFrom(subscription)
return settings