blob: ffd1d4d309927d7617c476f05c4aed3e80193edb [file]
import logging
from typing import Optional, List
from rocketmq import (
ClientConfiguration,
Credentials,
FilterExpression,
LitePushConsumer,
Message,
MessageListener,
Producer,
PushConsumer,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _validate_non_empty(value: str, param_name: str) -> None:
"""
Validate that a string parameter is not empty.
Args:
value: String value to validate
param_name: Parameter name for error messages
Raises:
ValueError: If value is empty or whitespace-only
"""
if not value or not value.strip():
logger.warning("Invalid %s: [%s]", param_name, value)
raise ValueError(f"{param_name} cannot be empty")
def build_producer(
endpoint: str,
access_key: Optional[str] = None,
secret_key: Optional[str] = None
) -> Producer:
"""
Build and start a RocketMQ producer.
Args:
endpoint: RocketMQ service endpoint
access_key: Optional access key for authentication
secret_key: Optional secret key for authentication
Returns:
Started Producer instance
Raises:
ValueError: If required parameters are invalid
Exception: If producer fails to start
"""
# Validate required parameters
_validate_non_empty(endpoint, "Endpoint")
try:
# Create configuration with optional credentials
if access_key and secret_key:
credentials = Credentials(access_key, secret_key)
config = ClientConfiguration(endpoint, credentials)
logger.info("Using authenticated connection with credentials")
else:
config = ClientConfiguration(endpoint)
logger.info("Using unauthenticated connection (no credentials provided)")
# Initialize and start producer
producer = Producer(client_configuration=config)
producer.startup()
logger.info("Producer started successfully. Endpoint: [%s]", endpoint)
return producer
except Exception as e:
logger.error("Failed to start Producer: %s", str(e))
raise
def build_lite_push_consumer(
endpoint: str,
consumer_group: str,
topic: str,
message_listener: MessageListener,
access_key: Optional[str] = None,
secret_key: Optional[str] = None,
) -> LitePushConsumer:
"""
Build and start a RocketMQ lite push consumer.
Args:
endpoint: RocketMQ service endpoint
consumer_group: Consumer group ID
topic: Topic to subscribe to
message_listener: Custom message listener to handle incoming messages
access_key: Optional access key for authentication
secret_key: Optional secret key for authentication
Returns:
Started LitePushConsumer instance
Raises:
ValueError: If required parameters are invalid
Exception: If consumer fails to start
"""
# Validate required parameters
_validate_non_empty(endpoint, "Endpoint")
_validate_non_empty(consumer_group, "Consumer group")
_validate_non_empty(topic, "Topic")
# Validate message listener is provided
if message_listener is None:
logger.warning("Invalid message_listener: None")
raise ValueError("Message listener cannot be None")
try:
# Create configuration with optional credentials
if access_key and secret_key:
credentials = Credentials(access_key, secret_key)
config = ClientConfiguration(endpoint, credentials)
logger.info("Using authenticated connection with credentials")
else:
config = ClientConfiguration(endpoint)
logger.info("Using unauthenticated connection (no credentials provided)")
# Initialize and start lite push consumer
consumer = LitePushConsumer(
client_configuration=config,
consumer_group=consumer_group,
bind_topic=topic,
message_listener=message_listener,
)
consumer.startup()
logger.info(
"LitePushConsumer started successfully. Group: [%s], Topic: [%s], Endpoint: [%s]",
consumer_group,
topic,
endpoint,
)
return consumer
except Exception as e:
logger.error("Failed to start LitePushConsumer: %s", str(e))
raise
def build_push_consumer(
endpoint: str,
consumer_group: str,
topic: str,
message_listener: MessageListener,
access_key: Optional[str] = None,
secret_key: Optional[str] = None,
tag_expression: Optional[str] = None,
) -> PushConsumer:
"""
Build and start a RocketMQ push consumer with subscription support.
Args:
endpoint: RocketMQ service endpoint
consumer_group: Consumer group ID
topic: Topic to subscribe to
message_listener: Message listener to handle incoming messages
access_key: Optional access key for authentication
secret_key: Optional secret key for authentication
tag_expression: Optional tag filter expression (e.g., "TagA || TagB")
Returns:
Started PushConsumer instance
Raises:
ValueError: If required parameters are invalid
Exception: If consumer fails to start
"""
# Validate required parameters
_validate_non_empty(endpoint, "Endpoint")
_validate_non_empty(consumer_group, "Consumer group")
_validate_non_empty(topic, "Topic")
# Validate message listener is provided
if message_listener is None:
logger.warning("Invalid message_listener: None")
raise ValueError("Message listener cannot be None")
try:
# Create configuration with optional credentials
if access_key and secret_key:
credentials = Credentials(access_key, secret_key)
config = ClientConfiguration(endpoint, credentials)
logger.info("Using authenticated connection with credentials")
else:
config = ClientConfiguration(endpoint)
logger.info("Using unauthenticated connection (no credentials provided)")
# Initialize push consumer with subscription
consumer = PushConsumer(
client_configuration=config,
consumer_group=consumer_group,
message_listener=message_listener,
subscription={topic: FilterExpression()},
)
# Start consumer
consumer.startup()
logger.info(
"PushConsumer started successfully. Group: [%s], Topic: [%s], Endpoint: [%s]",
consumer_group,
topic,
endpoint,
)
return consumer
except Exception as e:
logger.error("Failed to start PushConsumer: %s", str(e))
raise
def build_message(
topic: str,
body: str,
keys: Optional[List[str]] = None,
tags: Optional[str] = None,
lite_topic: Optional[str] = None
) -> Message:
"""
Build a RocketMQ message.
Args:
topic: Message topic
body: Message body as string (already serialized)
keys: Optional message keys for indexing
tags: Optional message tags for filtering
lite_topic: Optional lightweight topic for session-based routing
Returns:
Configured Message instance ready to send
Raises:
ValueError: If required parameters are invalid
Exception: If message building fails
"""
# Validate required parameters
_validate_non_empty(topic, "Topic")
_validate_non_empty(body, "Body")
try:
# Encode body to UTF-8 bytes
body_bytes = body.encode("utf-8")
# Create and configure message
msg = Message()
msg.topic = topic
msg.body = body_bytes
# Set lightweight topic if provided
if lite_topic and lite_topic.strip():
msg.lite_topic = lite_topic.strip()
# Set optional properties
if keys:
msg.keys = keys
logger.debug("Set message keys: %s", keys)
if tags:
msg.tags = tags
logger.debug("Set message tags: %s", tags)
logger.debug("Message built for topic: [%s], size: %d bytes", topic, len(body_bytes))
return msg
except Exception as e:
logger.error("Failed to build message: %s", str(e))
raise