blob: 154efa02e397b74b9e737f18c6ee75302140bade [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using rmq = Apache.Rocketmq.V2;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;
using Grpc.Core;
using System.Collections.Generic;
using System.Linq;
using Google.Protobuf.WellKnownTypes;
namespace Org.Apache.Rocketmq
{
public class SimpleConsumer : Client
{
public SimpleConsumer(AccessPoint accessPoint,
string resourceNamespace, string group)
: base(accessPoint, resourceNamespace)
{
_fifo = false;
_subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
_topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
_group = group;
}
public override void BuildClientSetting(rmq::Settings settings)
{
base.BuildClientSetting(settings);
settings.ClientType = rmq::ClientType.SimpleConsumer;
settings.Subscription = new rmq::Subscription();
settings.Subscription.Group = new rmq::Resource();
settings.Subscription.Group.Name = _group;
settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
foreach (var kv in _subscriptions)
{
settings.Subscription.Subscriptions.Add(kv.Value);
}
}
public override async Task Start()
{
await base.Start();
// Scan load assignment periodically
schedule(async () =>
{
while (!_scanAssignmentCts.IsCancellationRequested)
{
await ScanLoadAssignments();
}
}, 30, _scanAssignmentCts.Token);
await ScanLoadAssignments();
}
public override async Task Shutdown()
{
await base.Shutdown();
if (!await NotifyClientTermination())
{
Logger.Warn("Failed to NotifyClientTermination");
}
}
private async Task ScanLoadAssignments()
{
List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
List<string> topics = new List<string>();
foreach (var sub in _subscriptions)
{
var request = new rmq::QueryAssignmentRequest();
request.Topic = new rmq::Resource();
request.Topic.ResourceNamespace = ResourceNamespace;
request.Topic.Name = sub.Key;
topics.Add(sub.Key);
request.Group = new rmq::Resource();
request.Group.Name = _group;
request.Group.ResourceNamespace = ResourceNamespace;
request.Endpoints = new rmq::Endpoints();
request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
var address = new rmq::Address();
address.Host = _accessPoint.Host;
address.Port = _accessPoint.Port;
request.Endpoints.Addresses.Add(address);
var metadata = new Metadata();
Signature.sign(this, metadata);
tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
}
List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
var i = 0;
foreach (var assignments in list)
{
string topic = topics[i];
if (null == assignments || 0 == assignments.Count)
{
Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
++i;
continue;
}
Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
_topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
++i;
}
}
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
{
request.ClientType = rmq::ClientType.SimpleConsumer;
request.Group = new rmq::Resource();
request.Group.Name = _group;
request.Group.ResourceNamespace = ResourceNamespace;
}
public void Subscribe(string topic, rmq::FilterType filterType, string expression)
{
var entry = new rmq::SubscriptionEntry();
entry.Topic = new rmq::Resource();
entry.Topic.Name = topic;
entry.Topic.ResourceNamespace = ResourceNamespace;
entry.Expression = new rmq::FilterExpression();
entry.Expression.Type = filterType;
entry.Expression.Expression = expression;
_subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
AddTopicOfInterest(topic);
}
public override void OnSettingsReceived(rmq.Settings settings)
{
base.OnSettingsReceived(settings);
if (settings.Subscription.Fifo)
{
_fifo = true;
Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
}
}
public async Task<List<Message>> Receive(int batchSize, TimeSpan timeout)
{
var messageQueue = NextQueue();
if (null == messageQueue)
{
Logger.Debug("NextQueue returned null");
return new List<Message>();
}
var request = new rmq.ReceiveMessageRequest();
request.Group = new rmq.Resource();
request.Group.ResourceNamespace = ResourceNamespace;
request.Group.Name = _group;
request.MessageQueue = new rmq.MessageQueue();
request.MessageQueue.MergeFrom(messageQueue);
request.BatchSize = batchSize;
// Client is responsible of extending message invisibility duration
request.AutoRenew = false;
var targetUrl = Utilities.TargetUrl(messageQueue);
var metadata = new Metadata();
Signature.sign(this, metadata);
return await Manager.ReceiveMessage(targetUrl, metadata, request, timeout);
}
public async Task Ack(Message message)
{
var request = new rmq.AckMessageRequest();
request.Group = new rmq.Resource();
request.Group.ResourceNamespace = ResourceNamespace;
request.Group.Name = _group;
request.Topic = new rmq.Resource();
request.Topic.ResourceNamespace = ResourceNamespace;
request.Topic.Name = message.Topic;
var entry = new rmq.AckMessageEntry();
request.Entries.Add(entry);
entry.MessageId = message.MessageId;
entry.ReceiptHandle = message._receiptHandle;
var targetUrl = message._sourceHost;
var metadata = new Metadata();
Signature.sign(this, metadata);
await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
}
public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
{
var request = new rmq.ChangeInvisibleDurationRequest();
request.Group = new rmq.Resource();
request.Group.ResourceNamespace = ResourceNamespace;
request.Group.Name = _group;
request.Topic = new rmq.Resource();
request.Topic.ResourceNamespace = ResourceNamespace;
request.Topic.Name = message.Topic;
request.ReceiptHandle = message._receiptHandle;
request.MessageId = message.MessageId;
request.InvisibleDuration = Duration.FromTimeSpan(invisibleDuration);
var targetUrl = message._sourceHost;
var metadata = new Metadata();
Signature.sign(this, metadata);
await Manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
}
private rmq.MessageQueue NextQueue()
{
if (_topicAssignments.IsEmpty)
{
return null;
}
UInt32 topicSeq = CurrentTopicSequence.Value;
CurrentTopicSequence.Value = topicSeq + 1;
var total = _topicAssignments.Count;
var topicIndex = topicSeq % total;
var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
UInt32 queueSeq = CurrentQueueSequence.Value;
CurrentQueueSequence.Value = queueSeq + 1;
List<rmq.Assignment> assignments;
if (_topicAssignments.TryGetValue(topic, out assignments))
{
if (null == assignments)
{
return null;
}
var idx = queueSeq % assignments.Count;
return assignments[(int)idx].MessageQueue;
}
return null;
}
private ThreadLocal<UInt32> CurrentTopicSequence = new ThreadLocal<UInt32>(true);
private ThreadLocal<UInt32> CurrentQueueSequence = new ThreadLocal<UInt32>(true);
private readonly string _group;
private bool _fifo;
private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();
}
}