blob: feb3ab00da93cf2ffb2ec80b9f489a2452e8d2a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "RoundRobinMessageRouter.h"
#include "TimeUtils.h"
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int_distribution.hpp>
namespace pulsar {
RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme,
bool batchingEnabled, uint32_t maxBatchingMessages,
uint32_t maxBatchingSize,
boost::posix_time::time_duration maxBatchingDelay)
: MessageRouterBase(hashingScheme),
batchingEnabled_(batchingEnabled),
lastPartitionChange_(TimeUtils::currentTimeMillis()),
msgCounter_(0),
cumulativeBatchSize_(0),
maxBatchingMessages_(maxBatchingMessages),
maxBatchingSize_(maxBatchingSize),
maxBatchingDelay_(maxBatchingDelay) {
boost::random::mt19937 rng(time(nullptr));
boost::random::uniform_int_distribution<int> dist;
currentPartitionCursor_ = dist(rng);
}
RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
// override
int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
if (topicMetadata.getNumPartitions() == 1) {
// When there are no partitions, don't even bother
return 0;
}
// if message has a key, hash the key and return the partition
if (msg.hasPartitionKey()) {
return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
}
if (!batchingEnabled_) {
// If there's no batching, do the round-robin at the message scope
// as there is no gain otherwise.
return currentPartitionCursor_++ % topicMetadata.getNumPartitions();
}
// If there's no key, we do round-robin across partition, sticking with a given
// partition for a certain amount of messages or volume buffered or the max delay to batch is reached so
// that we ensure having a decent amount of batching of the messages. Note that it is possible that we
// skip more than one partition if multiple goroutines increment currentPartitionCursor at the same time.
// If that happens it shouldn't be a problem because we only want to spread the data on different
// partitions but not necessarily in a specific sequence.
uint32_t messageSize = msg.getLength();
uint32_t messageCount = msgCounter_;
uint32_t batchSize = cumulativeBatchSize_;
int64_t lastPartitionChange = lastPartitionChange_;
int64_t now = TimeUtils::currentTimeMillis();
if (messageCount >= maxBatchingMessages_ || (messageSize >= maxBatchingSize_ - batchSize) ||
(now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds())) {
uint32_t currentPartitionCursor = ++currentPartitionCursor_;
lastPartitionChange_ = now;
cumulativeBatchSize_ = messageSize;
msgCounter_ = 1;
return currentPartitionCursor % topicMetadata.getNumPartitions();
}
++msgCounter_;
cumulativeBatchSize_ += messageSize;
return currentPartitionCursor_ % topicMetadata.getNumPartitions();
}
} // namespace pulsar