Fix topic name is shown as a pointer rather than string (#331)
### Motivation
This is an additional fix to #329 because I still observed logs like:
```
Closing consumer for topic 0x6000028e0648
Closing producer for topic 0x600001210b88
```
It's because `HandlerBase::topic_` field is protected and could be
accessed directly from the derived classes.
### Motivation
In `HandlerBase`, make `topic_` private and add two methods `topic()`
and `getTopicPtr()` to get the reference to the string and the shared
pointer. `getTopicPtr()` should only be called when being passed to
`MessageImpl::setTopicName`.
diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc
index c1b1727..fa11a84 100644
--- a/lib/BatchMessageContainerBase.cc
+++ b/lib/BatchMessageContainerBase.cc
@@ -27,7 +27,7 @@
namespace pulsar {
BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& producer)
- : topicName_(producer.topic_),
+ : topicName_(producer.topic()),
producerConfig_(producer.conf_),
producerName_(producer.producerName_),
producerId_(producer.producerId_),
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index cd17d6d..ed42c07 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -90,7 +90,7 @@
protected:
// references to ProducerImpl's fields
- const std::shared_ptr<std::string> topicName_;
+ const std::string topicName_;
const ProducerConfiguration& producerConfig_;
const std::string& producerName_;
const uint64_t& producerId_;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d65fe54..61def68 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -174,7 +174,7 @@
const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }
-const std::string& ConsumerImpl::getTopic() const { return *topic_; }
+const std::string& ConsumerImpl::getTopic() const { return topic(); }
void ConsumerImpl::start() {
HandlerBase::start();
@@ -194,7 +194,7 @@
// Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
// constructor completed.
- if (TopicName::get(*topic_)->isPersistent()) {
+ if (TopicName::get(topic())->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(),
@@ -249,7 +249,7 @@
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
- *topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
+ topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());
@@ -552,7 +552,7 @@
Message m(messageId, brokerEntryMetadata, metadata, payload);
m.impl_->cnx_ = cnx.get();
- m.impl_->setTopicName(topic_);
+ m.impl_->setTopicName(getTopicPtr());
m.impl_->setRedeliveryCount(msg.redelivery_count());
if (metadata.has_schema_version()) {
@@ -1243,7 +1243,7 @@
return;
}
- LOG_INFO(getName() << "Closing consumer for topic " << topic_);
+ LOG_INFO(getName() << "Closing consumer for topic " << topic());
state_ = Closing;
incomingMessages_.close();
@@ -1764,7 +1764,7 @@
return;
}
if (result != ResultOk) {
- LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
+ LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to acknowledge the message {"
<< originMessageId
<< "} of the original topic but send to the DLQ successfully : "
@@ -1777,7 +1777,7 @@
}
});
} else {
- LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
+ LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to send DLQ message to {"
<< self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
<< "{" << originMessageId << "} : " << res);
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index b929939..986063e 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -30,8 +30,8 @@
namespace pulsar {
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
- : client_(client),
- topic_(std::make_shared<std::string>(topic)),
+ : topic_(std::make_shared<std::string>(topic)),
+ client_(client),
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
@@ -88,7 +88,7 @@
return;
}
auto self = shared_from_this();
- client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
+ client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx).addListener([this, self](Result result, bool) {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index ad16a22..937b308 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -87,14 +87,18 @@
virtual const std::string& getName() const = 0;
+ const std::string& topic() const { return *topic_; }
+ const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; }
+
private:
+ const std::shared_ptr<std::string> topic_;
+
void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
void handleTimeout(const boost::system::error_code& ec);
protected:
ClientImplWeakPtr client_;
- const std::shared_ptr<std::string> topic_;
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index dd53038..5162a61 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -72,7 +72,7 @@
interceptors_(interceptors) {
std::stringstream consumerStrStream;
consumerStrStream << "[Muti Topics Consumer: "
- << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
+ << "TopicName - " << topic() << " - Subscription - " << subscriptionName << "]";
consumerStr_ = consumerStrStream.str();
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
@@ -312,7 +312,7 @@
}
void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
- LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
+ LOG_INFO("[ Topics Consumer " << topic() << "," << subscriptionName_ << "] Unsubscribing");
auto callback = [this, originalCallback](Result result) {
if (result == ResultOk) {
@@ -483,7 +483,7 @@
*numberTopicPartitions_ = 0;
if (consumers.empty()) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
- << " topic" << topic_ << " subscription - " << subscriptionName_);
+ << " topic" << topic() << " subscription - " << subscriptionName_);
callback(ResultAlreadyClosed);
return;
}
@@ -518,7 +518,7 @@
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
- msg.impl_->setTopicName(consumer.impl_->topic_);
+ msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
@@ -744,7 +744,7 @@
}
const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; }
-const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; }
+const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); }
const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 41595dd..a66fbfb 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -58,7 +58,7 @@
partition_(partition),
producerName_(conf_.getProducerName()),
userProvidedProducerName_(false),
- producerStr_("[" + *topic_ + ", " + producerName_ + "] "),
+ producerStr_("[" + topic() + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
batchTimer_(executor_->createDeadlineTimer()),
@@ -67,7 +67,7 @@
memoryLimitController_(client->getMemoryLimitController()),
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
interceptors_(interceptors) {
- LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
+ LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
<< " id: " << producerId_);
int64_t initialSequenceId = conf.getInitialSequenceId();
@@ -93,7 +93,7 @@
if (conf_.isEncryptionEnabled()) {
std::ostringstream logCtxStream;
- logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]";
+ logCtxStream << "[" << topic() << ", " << producerName_ << ", " << producerId_ << "]";
std::string logCtx = logCtxStream.str();
msgCrypto_ = std::make_shared<MessageCrypto>(logCtx, true);
msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
@@ -123,7 +123,7 @@
}
}
-const std::string& ProducerImpl::getTopic() const { return *topic_; }
+const std::string& ProducerImpl::getTopic() const { return topic(); }
const std::string& ProducerImpl::getProducerName() const { return producerName_; }
@@ -148,7 +148,7 @@
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
- SharedBuffer cmd = Commands::newProducer(*topic_, producerId_, producerName_, requestId,
+ SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
conf_.getProperties(), conf_.getSchema(), epoch_,
userProvidedProducerName_, conf_.isEncryptionEnabled(),
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
@@ -218,7 +218,7 @@
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
- producerStr_ = "[" + *topic_ + ", " + producerName_ + "] ";
+ producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
topicEpoch = responseData.topicEpoch;
if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
@@ -788,7 +788,7 @@
return;
}
- LOG_INFO(getName() << "Closing producer for topic " << topic_);
+ LOG_INFO(getName() << "Closing producer for topic " << topic());
state_ = Closing;
ClientConnectionPtr cnx = getCnx().lock();