blob: 2dee1a1183d81482bf72901a84e78c95f92a517f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "utils.h"
#include <pulsar/ConsoleLoggerFactory.h>
#include "lib/Utils.h"
#include <memory>
template <typename T>
struct ListenerWrapper {
PyObject* _pyListener;
ListenerWrapper(py::object pyListener) : _pyListener(pyListener.ptr()) { Py_XINCREF(_pyListener); }
ListenerWrapper(const ListenerWrapper& other) {
_pyListener = other._pyListener;
Py_XINCREF(_pyListener);
}
ListenerWrapper& operator=(const ListenerWrapper& other) {
_pyListener = other._pyListener;
Py_XINCREF(_pyListener);
return *this;
}
virtual ~ListenerWrapper() { Py_XDECREF(_pyListener); }
void operator()(T consumer, const Message& msg) {
PyGILState_STATE state = PyGILState_Ensure();
try {
py::call<void>(_pyListener, py::object(&consumer), py::object(&msg));
} catch (const py::error_already_set& e) {
PyErr_Print();
}
PyGILState_Release(state);
}
};
static ConsumerConfiguration& ConsumerConfiguration_setMessageListener(ConsumerConfiguration& conf,
py::object pyListener) {
conf.setMessageListener(ListenerWrapper<Consumer>(pyListener));
return conf;
}
static ReaderConfiguration& ReaderConfiguration_setReaderListener(ReaderConfiguration& conf,
py::object pyListener) {
conf.setReaderListener(ListenerWrapper<Reader>(pyListener));
return conf;
}
static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfiguration& conf,
py::object authentication) {
AuthenticationWrapper wrapper = py::extract<AuthenticationWrapper>(authentication);
conf.setAuth(wrapper.auth);
return conf;
}
static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf,
py::object cryptoKeyReader) {
CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
return conf;
}
static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf,
py::object cryptoKeyReader) {
CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
return conf;
}
static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf,
py::object cryptoKeyReader) {
CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
return conf;
}
class LoggerWrapper : public Logger {
PyObject* const _pyLogger;
const int _pythonLogLevel;
const std::unique_ptr<Logger> _fallbackLogger;
static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); }
public:
LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
: _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) {
Py_XINCREF(_pyLogger);
}
LoggerWrapper(const LoggerWrapper&) = delete;
LoggerWrapper(LoggerWrapper&&) noexcept = delete;
LoggerWrapper& operator=(const LoggerWrapper&) = delete;
LoggerWrapper& operator=(LoggerWrapper&&) = delete;
virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }
bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; }
void log(Level level, int line, const std::string& message) {
if (!Py_IsInitialized()) {
// Python logger is unavailable - fallback to console logger
_fallbackLogger->log(level, line, message);
} else {
PyGILState_STATE state = PyGILState_Ensure();
try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
break;
}
} catch (const py::error_already_set& e) {
_fallbackLogger->log(level, line, message);
}
PyGILState_Release(state);
}
}
};
class LoggerWrapperFactory : public LoggerFactory {
std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
PyObject* _pyLogger;
Optional<int> _pythonLogLevel{Optional<int>::empty()};
void initializePythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();
try {
int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
_pythonLogLevel = Optional<int>::of(level);
} catch (const py::error_already_set& e) {
// Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
_pythonLogLevel = Optional<int>::empty();
}
PyGILState_Release(state);
}
public:
LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
initializePythonLogLevel();
}
virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }
Logger* getLogger(const std::string& fileName) {
const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
if (_pythonLogLevel.is_present()) {
return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
} else {
return fallbackLogger;
}
}
};
static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf, py::object logger) {
conf.setLogger(new LoggerWrapperFactory(logger));
return conf;
}
void export_config() {
using namespace boost::python;
class_<ClientConfiguration>("ClientConfiguration")
.def("authentication", &ClientConfiguration_setAuthentication, return_self<>())
.def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds)
.def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
.def("connection_timeout", &ClientConfiguration::getConnectionTimeout)
.def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>())
.def("io_threads", &ClientConfiguration::getIOThreads)
.def("io_threads", &ClientConfiguration::setIOThreads, return_self<>())
.def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads)
.def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, return_self<>())
.def("concurrent_lookup_requests", &ClientConfiguration::getConcurrentLookupRequest)
.def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>())
.def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath,
return_value_policy<copy_const_reference>())
.def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>())
.def("use_tls", &ClientConfiguration::isUseTls)
.def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
.def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath,
return_value_policy<copy_const_reference>())
.def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, return_self<>())
.def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection)
.def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection,
return_self<>())
.def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>())
.def("set_logger", &ClientConfiguration_setLogger, return_self<>());
class_<ProducerConfiguration>("ProducerConfiguration")
.def("producer_name", &ProducerConfiguration::getProducerName,
return_value_policy<copy_const_reference>())
.def("producer_name", &ProducerConfiguration::setProducerName, return_self<>())
.def("schema", &ProducerConfiguration::getSchema, return_value_policy<copy_const_reference>())
.def("schema", &ProducerConfiguration::setSchema, return_self<>())
.def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
.def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>())
.def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId)
.def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, return_self<>())
.def("compression_type", &ProducerConfiguration::getCompressionType)
.def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>())
.def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages)
.def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>())
.def("max_pending_messages_across_partitions",
&ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
.def("max_pending_messages_across_partitions",
&ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
.def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull)
.def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>())
.def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
.def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
.def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers)
.def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers,
return_self<>())
.def("batching_enabled", &ProducerConfiguration::getBatchingEnabled,
return_value_policy<copy_const_reference>())
.def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>())
.def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages,
return_value_policy<copy_const_reference>())
.def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, return_self<>())
.def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes,
return_value_policy<copy_const_reference>())
.def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes,
return_self<>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs,
return_value_policy<copy_const_reference>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs,
return_self<>())
.def("property", &ProducerConfiguration::setProperty, return_self<>())
.def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>())
.def("batching_type", &ProducerConfiguration::getBatchingType)
.def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>())
.def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>());
class_<ConsumerConfiguration>("ConsumerConfiguration")
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
.def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>())
.def("schema", &ConsumerConfiguration::getSchema, return_value_policy<copy_const_reference>())
.def("schema", &ConsumerConfiguration::setSchema, return_self<>())
.def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>())
.def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize)
.def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize)
.def("max_total_receiver_queue_size_across_partitions",
&ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
.def("max_total_receiver_queue_size_across_partitions",
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
.def("consumer_name", &ConsumerConfiguration::getConsumerName,
return_value_policy<copy_const_reference>())
.def("consumer_name", &ConsumerConfiguration::setConsumerName)
.def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
.def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
.def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs)
.def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs)
.def("broker_consumer_stats_cache_time_ms",
&ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
.def("broker_consumer_stats_cache_time_ms",
&ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
.def("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
.def("pattern_auto_discovery_period", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
.def("read_compacted", &ConsumerConfiguration::isReadCompacted)
.def("read_compacted", &ConsumerConfiguration::setReadCompacted)
.def("property", &ConsumerConfiguration::setProperty, return_self<>())
.def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
.def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
.def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>())
.def("replicate_subscription_state_enabled",
&ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
.def("replicate_subscription_state_enabled",
&ConsumerConfiguration::isReplicateSubscriptionStateEnabled);
class_<ReaderConfiguration>("ReaderConfiguration")
.def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>())
.def("schema", &ReaderConfiguration::getSchema, return_value_policy<copy_const_reference>())
.def("schema", &ReaderConfiguration::setSchema, return_self<>())
.def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize)
.def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize)
.def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy<copy_const_reference>())
.def("reader_name", &ReaderConfiguration::setReaderName)
.def("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix,
return_value_policy<copy_const_reference>())
.def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
.def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>());
}