blob: 7eceba657cf1ea344564f7f7e668068f238797af [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "utils.h"
template<typename T>
struct ListenerWrapper {
PyObject* _pyListener;
ListenerWrapper(py::object pyListener) :
_pyListener(pyListener.ptr()) {
Py_XINCREF(_pyListener);
}
ListenerWrapper(const ListenerWrapper& other) {
_pyListener = other._pyListener;
Py_XINCREF(_pyListener);
}
ListenerWrapper& operator=(const ListenerWrapper& other) {
_pyListener = other._pyListener;
Py_XINCREF(_pyListener);
return *this;
}
virtual ~ListenerWrapper() {
Py_XDECREF(_pyListener);
}
void operator()(T consumer, const Message& msg) {
PyGILState_STATE state = PyGILState_Ensure();
try {
py::call<void>(_pyListener, py::object(&consumer), py::object(&msg));
} catch (py::error_already_set e) {
PyErr_Print();
}
PyGILState_Release(state);
}
};
static ConsumerConfiguration& ConsumerConfiguration_setMessageListener(ConsumerConfiguration& conf,
py::object pyListener) {
conf.setMessageListener(ListenerWrapper<Consumer>(pyListener));
return conf;
}
static ReaderConfiguration& ReaderConfiguration_setReaderListener(ReaderConfiguration& conf,
py::object pyListener) {
conf.setReaderListener(ListenerWrapper<Reader>(pyListener));
return conf;
}
static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfiguration& conf,
py::object authentication) {
AuthenticationWrapper wrapper = py::extract<AuthenticationWrapper>(authentication);
conf.setAuth(wrapper.auth);
return conf;
}
void export_config() {
using namespace boost::python;
class_<ClientConfiguration>("ClientConfiguration")
.def("authentication", &ClientConfiguration_setAuthentication, return_self<>())
.def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds)
.def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
.def("io_threads", &ClientConfiguration::getIOThreads)
.def("io_threads", &ClientConfiguration::setIOThreads, return_self<>())
.def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads)
.def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, return_self<>())
.def("concurrent_lookup_requests", &ClientConfiguration::getConcurrentLookupRequest)
.def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>())
.def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, return_value_policy<copy_const_reference>())
.def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>())
.def("use_tls", &ClientConfiguration::isUseTls)
.def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
.def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath)
.def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, return_self<>())
.def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection)
.def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection, return_self<>())
;
class_<ProducerConfiguration>("ProducerConfiguration")
.def("producer_name", &ProducerConfiguration::getProducerName, return_value_policy<copy_const_reference>())
.def("producer_name", &ProducerConfiguration::setProducerName, return_self<>())
.def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
.def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>())
.def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId)
.def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, return_self<>())
.def("compression_type", &ProducerConfiguration::getCompressionType)
.def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>())
.def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages)
.def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>())
.def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull)
.def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>())
.def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
.def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
.def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy<copy_const_reference>())
.def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>())
.def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, return_value_policy<copy_const_reference>())
.def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, return_self<>())
.def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes, return_value_policy<copy_const_reference>())
.def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, return_self<>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs, return_value_policy<copy_const_reference>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs, return_self<>())
;
class_<ConsumerConfiguration>("ConsumerConfiguration")
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
.def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>())
.def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>())
.def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize)
.def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize)
.def("consumer_name", &ConsumerConfiguration::getConsumerName, return_value_policy<copy_const_reference>())
.def("consumer_name", &ConsumerConfiguration::setConsumerName)
.def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
.def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
.def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
.def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
;
class_<ReaderConfiguration>("ReaderConfiguration")
.def("message_listener", &ReaderConfiguration_setReaderListener, return_self<>())
.def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize)
.def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize)
.def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy<copy_const_reference>())
.def("reader_name", &ReaderConfiguration::setReaderName)
;
}