Fix topic name is shown as a pointer rather than string (#331)

### Motivation

This is an additional fix to #329 because I still observed logs like:

```
Closing consumer for topic 0x6000028e0648
Closing producer for topic 0x600001210b88
```

It's because `HandlerBase::topic_` field is protected and could be
accessed directly from the derived classes.

### Motivation

In `HandlerBase`, make `topic_` private and add two methods `topic()`
and `getTopicPtr()` to get the reference to the string and the shared
pointer. `getTopicPtr()` should only be called when being passed to
`MessageImpl::setTopicName`.
diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc
index c1b1727..fa11a84 100644
--- a/lib/BatchMessageContainerBase.cc
+++ b/lib/BatchMessageContainerBase.cc
@@ -27,7 +27,7 @@
 namespace pulsar {
 
 BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& producer)
-    : topicName_(producer.topic_),
+    : topicName_(producer.topic()),
       producerConfig_(producer.conf_),
       producerName_(producer.producerName_),
       producerId_(producer.producerId_),
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index cd17d6d..ed42c07 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -90,7 +90,7 @@
 
    protected:
     // references to ProducerImpl's fields
-    const std::shared_ptr<std::string> topicName_;
+    const std::string topicName_;
     const ProducerConfiguration& producerConfig_;
     const std::string& producerName_;
     const uint64_t& producerId_;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d65fe54..61def68 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -174,7 +174,7 @@
 
 const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }
 
-const std::string& ConsumerImpl::getTopic() const { return *topic_; }
+const std::string& ConsumerImpl::getTopic() const { return topic(); }
 
 void ConsumerImpl::start() {
     HandlerBase::start();
@@ -194,7 +194,7 @@
 
     // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
     // constructor completed.
-    if (TopicName::get(*topic_)->isPersistent()) {
+    if (TopicName::get(topic())->isPersistent()) {
         if (config_.getAckGroupingTimeMs() > 0) {
             ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
                 connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(),
@@ -249,7 +249,7 @@
     ClientImplPtr client = client_.lock();
     uint64_t requestId = client->newRequestId();
     SharedBuffer cmd = Commands::newSubscribe(
-        *topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
+        topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
         subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
         config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
         config_.getKeySharedPolicy(), config_.getPriorityLevel());
@@ -552,7 +552,7 @@
 
     Message m(messageId, brokerEntryMetadata, metadata, payload);
     m.impl_->cnx_ = cnx.get();
-    m.impl_->setTopicName(topic_);
+    m.impl_->setTopicName(getTopicPtr());
     m.impl_->setRedeliveryCount(msg.redelivery_count());
 
     if (metadata.has_schema_version()) {
@@ -1243,7 +1243,7 @@
         return;
     }
 
-    LOG_INFO(getName() << "Closing consumer for topic " << topic_);
+    LOG_INFO(getName() << "Closing consumer for topic " << topic());
     state_ = Closing;
     incomingMessages_.close();
 
@@ -1764,7 +1764,7 @@
                             return;
                         }
                         if (result != ResultOk) {
-                            LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
+                            LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
                                          << self->consumerName_ << "} Failed to acknowledge the message {"
                                          << originMessageId
                                          << "} of the original topic but send to the DLQ successfully : "
@@ -1777,7 +1777,7 @@
                         }
                     });
                 } else {
-                    LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
+                    LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
                                  << self->consumerName_ << "} Failed to send DLQ message to {"
                                  << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
                                  << "{" << originMessageId << "} : " << res);
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index b929939..986063e 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -30,8 +30,8 @@
 namespace pulsar {
 
 HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
-    : client_(client),
-      topic_(std::make_shared<std::string>(topic)),
+    : topic_(std::make_shared<std::string>(topic)),
+      client_(client),
       executor_(client->getIOExecutorProvider()->get()),
       mutex_(),
       creationTimestamp_(TimeUtils::now()),
@@ -88,7 +88,7 @@
         return;
     }
     auto self = shared_from_this();
-    client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
+    client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
         if (result == ResultOk) {
             LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
             connectionOpened(cnx).addListener([this, self](Result result, bool) {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index ad16a22..937b308 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -87,14 +87,18 @@
 
     virtual const std::string& getName() const = 0;
 
+    const std::string& topic() const { return *topic_; }
+    const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; }
+
    private:
+    const std::shared_ptr<std::string> topic_;
+
     void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
 
     void handleTimeout(const boost::system::error_code& ec);
 
    protected:
     ClientImplWeakPtr client_;
-    const std::shared_ptr<std::string> topic_;
     ExecutorServicePtr executor_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index dd53038..5162a61 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -72,7 +72,7 @@
       interceptors_(interceptors) {
     std::stringstream consumerStrStream;
     consumerStrStream << "[Muti Topics Consumer: "
-                      << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
+                      << "TopicName - " << topic() << " - Subscription - " << subscriptionName << "]";
     consumerStr_ = consumerStrStream.str();
 
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
@@ -312,7 +312,7 @@
 }
 
 void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
-    LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
+    LOG_INFO("[ Topics Consumer " << topic() << "," << subscriptionName_ << "] Unsubscribing");
 
     auto callback = [this, originalCallback](Result result) {
         if (result == ResultOk) {
@@ -483,7 +483,7 @@
     *numberTopicPartitions_ = 0;
     if (consumers.empty()) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
-                  << " topic" << topic_ << " subscription - " << subscriptionName_);
+                  << " topic" << topic() << " subscription - " << subscriptionName_);
         callback(ResultAlreadyClosed);
         return;
     }
@@ -518,7 +518,7 @@
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
     LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
                                                           << " message:" << msg.getDataAsString());
-    msg.impl_->setTopicName(consumer.impl_->topic_);
+    msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
 
     Lock lock(pendingReceiveMutex_);
     if (!pendingReceives_.empty()) {
@@ -744,7 +744,7 @@
 }
 const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; }
 
-const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; }
+const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 41595dd..a66fbfb 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -58,7 +58,7 @@
       partition_(partition),
       producerName_(conf_.getProducerName()),
       userProvidedProducerName_(false),
-      producerStr_("[" + *topic_ + ", " + producerName_ + "] "),
+      producerStr_("[" + topic() + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
       msgSequenceGenerator_(0),
       batchTimer_(executor_->createDeadlineTimer()),
@@ -67,7 +67,7 @@
       memoryLimitController_(client->getMemoryLimitController()),
       chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
       interceptors_(interceptors) {
-    LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
+    LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
                                 << " id: " << producerId_);
 
     int64_t initialSequenceId = conf.getInitialSequenceId();
@@ -93,7 +93,7 @@
 
     if (conf_.isEncryptionEnabled()) {
         std::ostringstream logCtxStream;
-        logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]";
+        logCtxStream << "[" << topic() << ", " << producerName_ << ", " << producerId_ << "]";
         std::string logCtx = logCtxStream.str();
         msgCrypto_ = std::make_shared<MessageCrypto>(logCtx, true);
         msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
@@ -123,7 +123,7 @@
     }
 }
 
-const std::string& ProducerImpl::getTopic() const { return *topic_; }
+const std::string& ProducerImpl::getTopic() const { return topic(); }
 
 const std::string& ProducerImpl::getProducerName() const { return producerName_; }
 
@@ -148,7 +148,7 @@
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd = Commands::newProducer(*topic_, producerId_, producerName_, requestId,
+    SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
                                              conf_.getProperties(), conf_.getSchema(), epoch_,
                                              userProvidedProducerName_, conf_.isEncryptionEnabled(),
                                              static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
@@ -218,7 +218,7 @@
         cnx->registerProducer(producerId_, shared_from_this());
         producerName_ = responseData.producerName;
         schemaVersion_ = responseData.schemaVersion;
-        producerStr_ = "[" + *topic_ + ", " + producerName_ + "] ";
+        producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
         topicEpoch = responseData.topicEpoch;
 
         if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
@@ -788,7 +788,7 @@
 
         return;
     }
-    LOG_INFO(getName() << "Closing producer for topic " << topic_);
+    LOG_INFO(getName() << "Closing producer for topic " << topic());
     state_ = Closing;
 
     ClientConnectionPtr cnx = getCnx().lock();