blob: e3eeb7d5ee463cd86e13b2961217c239e2f10958 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DestinationName.h"
#include "NamedEntity.h"
#include "LogUtils.h"
#include "PartitionedProducerImpl.h"
#include <boost/format.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/find.hpp>
#include <boost/make_shared.hpp>
#include <vector>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <exception>
DECLARE_LOG_OBJECT()
namespace pulsar {
typedef boost::unique_lock<boost::mutex> Lock;
// static members
CURL* DestinationName::curl = NULL;
boost::mutex DestinationName::curlHandleMutex;
CURL* DestinationName::getCurlHandle() {
if (curl == NULL) {
// this handle can not be shared across threads, so had to get here everytime
curl = curl_easy_init();
}
return curl;
}
//********************************************************************
DestinationName::DestinationName() {
}
bool DestinationName::init(const std::string& destinationName) {
destination_ = destinationName;
if(destinationName.find("://") == std::string::npos){
LOG_ERROR("Destination Name Invalid, domain not present - " << destinationName);
return false;
}
parse(destination_, domain_, property_, cluster_, namespacePortion_, localName_);
if(localName_.empty()) {
LOG_ERROR("Destination Name is not valid, topic name is empty - " << destination_);
return false;
}
namespaceName_ = NamespaceName::get(property_, cluster_, namespacePortion_);
return true;
}
void DestinationName::parse(const std::string& destinationName,
std::string& domain,
std::string& property,
std::string& cluster,
std::string& namespacePortion,
std::string& localName) {
std::string destinationNameCopy = destinationName;
boost::replace_first(destinationNameCopy, "://", "/");
std::vector<std::string> pathTokens;
boost::algorithm::split(pathTokens, destinationNameCopy, boost::algorithm::is_any_of("/"));
if (pathTokens.size() < 5) {
LOG_ERROR("Destination Name Invalid, does not have enough parts - " << destinationName);
return;
}
domain = pathTokens[0];
property = pathTokens[1];
cluster = pathTokens[2];
namespacePortion = pathTokens[3];
size_t slashIndex = -1;
//find four '/', whatever is left is topic local name
for(int i=0; i < 4; i++) {
slashIndex = destinationNameCopy.find('/', slashIndex + 1);
}
// get index to next char to '/'
slashIndex++;
localName = destinationNameCopy.substr( slashIndex, (destinationNameCopy.size() - slashIndex));
}
std::string DestinationName::getEncodedName(const std::string& nameBeforeEncoding) {
Lock lock(curlHandleMutex);
std::string nameAfterEncoding;
if(getCurlHandle()) {
char *encodedName = curl_easy_escape(getCurlHandle(), nameBeforeEncoding.c_str(), nameBeforeEncoding.size());
if(encodedName) {
nameAfterEncoding.assign(encodedName);
curl_free(encodedName);
} else {
LOG_ERROR("Unable to encode the name using curl_easy_escape, name - " << nameBeforeEncoding);
}
} else {
LOG_ERROR("Unable to get CURL handle to encode the name - " << nameBeforeEncoding);
}
return nameAfterEncoding;
}
std::string DestinationName::getDomain() {
return domain_;
}
std::string DestinationName::getProperty() {
return property_;
}
std::string DestinationName::getCluster() {
return cluster_;
}
std::string DestinationName::getNamespacePortion() {
return namespacePortion_;
}
std::string DestinationName::getLocalName() {
return localName_;
}
std::string DestinationName::getEncodedLocalName() {
return getEncodedName(localName_);
}
bool DestinationName::operator ==(const DestinationName& other) {
return (this->destination_.compare(other.destination_) == 0);
}
bool DestinationName::validateDestination() {
// check domain matches to "persistent", in future check "memory" when server is ready
if (domain_.compare("persistent") != 0) {
return false;
}
if (!property_.empty() && !cluster_.empty() && !namespacePortion_.empty() && !localName_.empty()) {
return NamedEntity::checkName(property_) && NamedEntity::checkName(cluster_)
&& NamedEntity::checkName(namespacePortion_);
} else {
return false;
}
}
boost::shared_ptr<DestinationName> DestinationName::get(const std::string& destination) {
boost::shared_ptr<DestinationName> ptr(new DestinationName());
if (!ptr->init(destination)) {
LOG_ERROR("Destination Name Initialization failed");
return boost::shared_ptr<DestinationName>();
}
if (ptr->validateDestination()) {
return ptr;
} else {
LOG_ERROR("Destination Name Validation Failed");
return boost::shared_ptr<DestinationName>();
}
}
//TODO - for now return empty string if there's any error in format, later think about better error handling
std::string DestinationName::getLookupName() {
std::stringstream ss;
std::string seperator("/");
ss << domain_ << seperator << property_ << seperator << cluster_ << seperator
<< namespacePortion_ << seperator << getEncodedLocalName();
return ss.str();
}
std::string DestinationName::toString() {
std::stringstream ss;
std::string seperator("/");
ss << domain_ << "://" << property_ << seperator << cluster_ << seperator
<< namespacePortion_ << seperator << localName_;
return ss.str();
}
const std::string DestinationName::getTopicPartitionName(unsigned int partition) {
std::stringstream topicPartitionName;
// make this topic name as well
topicPartitionName << toString()<< PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition ;
return topicPartitionName.str();
}
} //namespace pulsar