blob: 7d258b4fdef42ebb5a54e8b1428f9b63bd559c8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class PublishLoadBalancer
{
public PublishLoadBalancer(TopicRouteData route)
{
this._messageQueues = new List<rmq::MessageQueue>();
foreach (var messageQueue in route.MessageQueues)
{
if (rmq::Permission.Unspecified == messageQueue.Permission)
{
continue;
}
if (rmq::Permission.Read == messageQueue.Permission)
{
continue;
}
this._messageQueues.Add(messageQueue);
}
this._messageQueues.Sort(Utilities.CompareMessageQueue);
Random random = new Random();
this._roundRobinIndex = random.Next(0, this._messageQueues.Count);
}
public void Update(TopicRouteData route)
{
List<rmq::MessageQueue> partitions = new List<rmq::MessageQueue>();
foreach (var partition in route.MessageQueues)
{
if (rmq::Permission.Unspecified == partition.Permission)
{
continue;
}
if (rmq::Permission.Read == partition.Permission)
{
continue;
}
partitions.Add(partition);
}
partitions.Sort();
this._messageQueues = partitions;
}
/**
* Accept a partition iff its broker is different.
*/
private bool Accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
{
if (0 == existing.Count)
{
return true;
}
foreach (var item in existing)
{
if (item.Broker.Equals(messageQueue.Broker))
{
return false;
}
}
return true;
}
public List<rmq::MessageQueue> Select(int maxAttemptTimes)
{
List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
List<rmq::MessageQueue> all = this._messageQueues;
if (0 == all.Count)
{
return result;
}
int start = ++_roundRobinIndex;
int found = 0;
for (int i = 0; i < all.Count; i++)
{
int idx = ((start + i) & int.MaxValue) % all.Count;
if (Accept(result, all[idx]))
{
result.Add(all[idx]);
if (++found >= maxAttemptTimes)
{
break;
}
}
}
return result;
}
private List<rmq::MessageQueue> _messageQueues;
private int _roundRobinIndex;
}
}