blob: ec42456e0211d3747a1dba39c1658d5c6df684eb [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Org.Apache.Rocketmq;
namespace Org.Apache.Rocketmq.Examples
{
/// <summary>
/// Example demonstrating how to use LitePushConsumer for consuming messages from lite topics.
/// Lite topics enable dynamic topic routing without pre-defining all topics.
///
/// Key Points:
/// - SubscribeLite() adds lite topics dynamically
/// - UnsubscribeLite() removes lite topics
/// - GetLiteTopicSet() returns current subscriptions
/// - BindTopic is the parent topic for all lite topics
/// </summary>
internal static class LitePushConsumerExample
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(LitePushConsumerExample).FullName);
internal static async Task QuickStart()
{
// Configure client for local testing (no authentication required)
var clientConfig = new ClientConfig.Builder()
.SetEndpoints("127.0.0.1:8081")
.Build();
// Define bind topic (parent topic) and consumer group
const string bindTopic = "topic-lite";
const string consumerGroup = "GID-lite-consumer";
// Build lite push consumer
Logger.LogInformation($"Creating LitePushConsumer, bindTopic={bindTopic}, consumerGroup={consumerGroup}");
var litePushConsumer = await new LitePushConsumer.Builder()
.SetClientConfig(clientConfig)
.SetConsumerGroup(consumerGroup)
.SetBindTopic(bindTopic)
.SetMessageListener(new CustomMessageListener())
.SetMaxCacheMessageCount(1024)
.SetMaxCacheMessageSizeInBytes(64 * 1024 * 1024)
.SetConsumptionThreadCount(20)
.Build();
Logger.LogInformation($"LitePushConsumer started successfully, bindTopic={bindTopic}, consumerGroup={consumerGroup}");
try
{
// Subscribe to lite topics dynamically
var liteTopics = new[] { "order-created", "order-updated", "order-completed" };
Logger.LogInformation($"Subscribing to {liteTopics.Length} lite topics...");
foreach (var liteTopic in liteTopics)
{
await litePushConsumer.SubscribeLite(liteTopic);
Logger.LogInformation($"Subscribed to lite topic: {liteTopic}");
}
// Optionally subscribe with offset option
// await litePushConsumer.SubscribeLite("new-topic", OffsetOption.LastOffset);
// Get current lite topic set
var subscribedTopics = litePushConsumer.GetLiteTopicSet();
Logger.LogInformation($"Currently subscribed to {subscribedTopics.Count} lite topics");
// Keep the consumer running
Logger.LogInformation("LitePushConsumer is running. Press any key to exit...");
Console.ReadKey();
// Unsubscribe from a lite topic
await litePushConsumer.UnsubscribeLite("order-completed");
Logger.LogInformation("Unsubscribed from lite topic: order-completed");
}
catch (Exception ex)
{
Logger.LogError(ex, "Error occurred while running LitePushConsumer");
}
finally
{
// Shutdown the consumer
await litePushConsumer.DisposeAsync();
Logger.LogInformation("LitePushConsumer shutdown completed");
}
}
private class CustomMessageListener : IMessageListener
{
public ConsumeResult Consume(MessageView messageView)
{
var body = Encoding.UTF8.GetString(messageView.Body);
Logger.LogInformation($"Received lite message: messageId={messageView.MessageId}, " +
$"topic={messageView.Topic}, body={body}");
return ConsumeResult.SUCCESS;
}
}
}
}