blob: 46fa355ee2ef0a4f66bea9cf1c0d33837bae8a2f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "NamedEntity.h"
#include "LogUtils.h"
#include "PartitionedProducerImpl.h"
#include "TopicName.h"
#include <boost/format.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/find.hpp>
#include <memory>
#include <string>
#include <vector>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <exception>
DECLARE_LOG_OBJECT()
namespace pulsar {
const std::string TopicDomain::Persistent = "persistent";
const std::string TopicDomain::NonPersistent = "non-persistent";
typedef std::unique_lock<std::mutex> Lock;
// static members
CURL* TopicName::curl = NULL;
std::mutex TopicName::curlHandleMutex;
CURL* TopicName::getCurlHandle() {
if (curl == NULL) {
// this handle can not be shared across threads, so had to get here everytime
curl = curl_easy_init();
}
return curl;
}
//********************************************************************
TopicName::TopicName() {}
bool TopicName::init(const std::string& topicName) {
topicName_ = topicName;
if (topicName.find("://") == std::string::npos) {
std::string topicNameCopy_ = topicName;
std::vector<std::string> pathTokens;
boost::algorithm::split(pathTokens, topicNameCopy_, boost::algorithm::is_any_of("/"));
if (pathTokens.size() == 3) {
topicName_ =
TopicDomain::Persistent + "://" + pathTokens[0] + "/" + pathTokens[1] + "/" + pathTokens[2];
} else if (pathTokens.size() == 1) {
topicName_ = TopicDomain::Persistent + "://public/default/" + pathTokens[0];
} else {
LOG_ERROR(
"Topic name is not valid, short topic name should be in the format of '<topic>' or "
"'<property>/<namespace>/<topic>' - "
<< topicName);
return false;
}
}
isV2Topic_ = parse(topicName_, domain_, property_, cluster_, namespacePortion_, localName_);
if (isV2Topic_ && !cluster_.empty()) {
LOG_ERROR("V2 Topic name is not valid, cluster is not empty - " << topicName_ << " : cluster "
<< cluster_);
return false;
} else if (!isV2Topic_ && cluster_.empty()) {
LOG_ERROR("V1 Topic name is not valid, cluster is empty - " << topicName_);
return false;
}
if (localName_.empty()) {
LOG_ERROR("Topic name is not valid, topic name is empty - " << topicName_);
return false;
}
if (isV2Topic_ && cluster_.empty()) {
namespaceName_ = NamespaceName::get(property_, namespacePortion_);
} else {
namespaceName_ = NamespaceName::get(property_, cluster_, namespacePortion_);
}
partition_ = TopicName::getPartitionIndex(localName_);
return true;
}
bool TopicName::parse(const std::string& topicName, std::string& domain, std::string& property,
std::string& cluster, std::string& namespacePortion, std::string& localName) {
std::string topicNameCopy = topicName;
boost::replace_first(topicNameCopy, "://", "/");
std::vector<std::string> pathTokens;
boost::algorithm::split(pathTokens, topicNameCopy, boost::algorithm::is_any_of("/"));
if (pathTokens.size() < 4) {
LOG_ERROR("Topic name is not valid, does not have enough parts - " << topicName);
return false;
}
domain = pathTokens[0];
size_t numSlashIndexes;
bool isV2Topic;
if (pathTokens.size() == 4) {
// New topic name without cluster name
property = pathTokens[1];
cluster = "";
namespacePortion = pathTokens[2];
localName = pathTokens[3];
numSlashIndexes = 3;
isV2Topic = true;
} else {
// Legacy topic name that includes cluster name
property = pathTokens[1];
cluster = pathTokens[2];
namespacePortion = pathTokens[3];
localName = pathTokens[4];
numSlashIndexes = 4;
isV2Topic = false;
}
size_t slashIndex = -1;
// find `numSlashIndexes` '/', whatever is left is topic local name
for (int i = 0; i < numSlashIndexes; i++) {
slashIndex = topicNameCopy.find('/', slashIndex + 1);
}
// get index to next char to '/'
slashIndex++;
localName = topicNameCopy.substr(slashIndex, (topicNameCopy.size() - slashIndex));
return isV2Topic;
}
std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) {
Lock lock(curlHandleMutex);
std::string nameAfterEncoding;
if (getCurlHandle()) {
char* encodedName =
curl_easy_escape(getCurlHandle(), nameBeforeEncoding.c_str(), nameBeforeEncoding.size());
if (encodedName) {
nameAfterEncoding.assign(encodedName);
curl_free(encodedName);
} else {
LOG_ERROR("Unable to encode the name using curl_easy_escape, name - " << nameBeforeEncoding);
}
} else {
LOG_ERROR("Unable to get CURL handle to encode the name - " << nameBeforeEncoding);
}
return nameAfterEncoding;
}
bool TopicName::isV2Topic() { return isV2Topic_; }
std::string TopicName::getDomain() { return domain_; }
std::string TopicName::getProperty() { return property_; }
std::string TopicName::getCluster() { return cluster_; }
std::string TopicName::getNamespacePortion() { return namespacePortion_; }
std::string TopicName::getLocalName() { return localName_; }
std::string TopicName::getEncodedLocalName() { return getEncodedName(localName_); }
bool TopicName::operator==(const TopicName& other) {
return (this->topicName_.compare(other.topicName_) == 0);
}
bool TopicName::validate() {
// Check if domain matches with TopicDomain::Persistent, in future check "memory" when server is
// ready.
if (domain_.compare(TopicDomain::Persistent) != 0 && domain_.compare(TopicDomain::NonPersistent) != 0) {
return false;
}
// cluster_ can be empty
if (!isV2Topic_ && !property_.empty() && !cluster_.empty() && !namespacePortion_.empty() &&
!localName_.empty()) {
// v1 topic format
return NamedEntity::checkName(property_) && NamedEntity::checkName(cluster_) &&
NamedEntity::checkName(namespacePortion_);
} else if (isV2Topic_ && !property_.empty() && !namespacePortion_.empty() && !localName_.empty()) {
// v2 topic format
return NamedEntity::checkName(property_) && NamedEntity::checkName(namespacePortion_);
} else {
return false;
}
}
std::shared_ptr<TopicName> TopicName::get(const std::string& topicName) {
std::shared_ptr<TopicName> ptr(new TopicName());
if (!ptr->init(topicName)) {
LOG_ERROR("Topic name initialization failed");
return std::shared_ptr<TopicName>();
}
if (ptr->validate()) {
return ptr;
} else {
LOG_ERROR("Topic name validation Failed - " << topicName);
return std::shared_ptr<TopicName>();
}
}
// TODO - for now return empty string if there's any error in format, later think about better error handling
std::string TopicName::getLookupName() {
std::stringstream ss;
std::string seperator("/");
if (isV2Topic_ && cluster_.empty()) {
ss << domain_ << seperator << property_ << seperator << namespacePortion_ << seperator
<< getEncodedLocalName();
} else {
ss << domain_ << seperator << property_ << seperator << cluster_ << seperator << namespacePortion_
<< seperator << getEncodedLocalName();
}
return ss.str();
}
std::string TopicName::toString() {
std::stringstream ss;
std::string seperator("/");
if (isV2Topic_ && cluster_.empty()) {
ss << domain_ << "://" << property_ << seperator << namespacePortion_ << seperator << localName_;
} else {
ss << domain_ << "://" << property_ << seperator << cluster_ << seperator << namespacePortion_
<< seperator << localName_;
}
return ss.str();
}
bool TopicName::isPersistent() const { return this->domain_ == TopicDomain::Persistent; }
const std::string TopicName::getTopicPartitionName(unsigned int partition) {
std::stringstream topicPartitionName;
// make this topic name as well
topicPartitionName << toString() << PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition;
return topicPartitionName.str();
}
int TopicName::getPartitionIndex(const std::string& topic) {
const auto& suffix = PartitionedProducerImpl::PARTITION_NAME_SUFFIX;
const size_t pos = topic.rfind(suffix);
if (pos == std::string::npos) {
return -1;
}
try {
// TODO: When handling topic name like "xxx-partition-00", it should return -1.
// But here it will returns, which is consistent with Java client's behavior
// Another corner case: "xxx-partition--2" => 2 (not -1)
return std::stoi(topic.substr(topic.rfind('-') + 1));
} catch (const std::exception&) {
return -1;
}
}
NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; }
} // namespace pulsar