blob: 64a8e7bbaa3e313bfda6facd31cbf3d00483111f [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "utils.h"
#include <pulsar/AutoClusterFailover.h>
#include <pulsar/ServiceInfoProvider.h>
#include <chrono>
#include <memory>
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
namespace py = pybind11;
static ServiceInfo unwrapPythonServiceInfo(const py::handle& object) {
auto serviceInfoObject = py::reinterpret_borrow<py::object>(object);
try {
return serviceInfoObject.cast<ServiceInfo>();
} catch (const py::cast_error&) {
}
if (py::hasattr(serviceInfoObject, "_service_info")) {
try {
return serviceInfoObject.attr("_service_info").cast<ServiceInfo>();
} catch (const py::cast_error&) {
}
}
throw py::value_error("Expected a pulsar.ServiceInfo or _pulsar.ServiceInfo instance");
}
class PythonServiceInfoProvider : public ServiceInfoProvider {
public:
explicit PythonServiceInfoProvider(py::object provider) : provider_(std::move(provider)) {}
~PythonServiceInfoProvider() override {
if (!Py_IsInitialized()) {
return;
}
py::gil_scoped_acquire acquire;
try {
if (py::hasattr(provider_, "close")) {
provider_.attr("close")();
}
} catch (const py::error_already_set&) {
PyErr_Print();
}
}
ServiceInfo initialServiceInfo() override {
py::gil_scoped_acquire acquire;
return unwrapPythonServiceInfo(provider_.attr("initial_service_info")());
}
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {
py::gil_scoped_acquire acquire;
provider_.attr("initialize")(py::cpp_function(
[onServiceInfoUpdate = std::move(onServiceInfoUpdate)](py::object serviceInfo) mutable {
onServiceInfoUpdate(unwrapPythonServiceInfo(serviceInfo));
}));
}
private:
py::object provider_;
};
Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
return waitForAsyncValue<Producer>(
[&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
}
void Client_createProducerAsync(Client& client, const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback) {
py::gil_scoped_release release;
client.createProducerAsync(topic, conf, callback);
}
Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>(
[&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); });
}
Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>(
[&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); });
}
Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
});
}
Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf) {
return waitForAsyncValue<Reader>(
[&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
}
std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
return waitForAsyncValue<std::vector<std::string>>(
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
}
void Client_getTopicPartitionsAsync(Client& client, const std::string& topic,
GetPartitionsCallback callback) {
py::gil_scoped_release release;
client.getPartitionsForTopicAsync(topic, callback);
}
SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic, int64_t version) {
return waitForAsyncValue<SchemaInfo>([&](std::function<void(Result, const SchemaInfo&)> callback) {
client.getSchemaInfoAsync(topic, version, callback);
});
}
std::shared_ptr<Client> Client_createAutoClusterFailover(ServiceInfo primary,
std::vector<ServiceInfo> secondary,
int64_t checkIntervalMs, uint32_t failoverThreshold,
uint32_t switchBackThreshold,
const ClientConfiguration& conf) {
AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary), std::move(secondary));
autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs);
autoClusterFailoverConfig.failoverThreshold = failoverThreshold;
autoClusterFailoverConfig.switchBackThreshold = switchBackThreshold;
return std::make_shared<Client>(
Client::create(std::make_unique<AutoClusterFailover>(std::move(autoClusterFailoverConfig)), conf));
}
std::shared_ptr<Client> Client_createServiceInfoProvider(py::object provider,
const ClientConfiguration& conf) {
return std::make_shared<Client>(
Client::create(std::make_unique<PythonServiceInfoProvider>(std::move(provider)), conf));
}
void Client_close(Client& client) {
waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); });
}
void Client_closeAsync(Client& client, ResultCallback callback) {
py::gil_scoped_release release;
client.closeAsync(callback);
}
void Client_subscribeAsync(Client& client, const std::string& topic, const std::string& subscriptionName,
ConsumerConfiguration conf, SubscribeCallback callback) {
py::gil_scoped_release release;
client.subscribeAsync(topic, subscriptionName, conf, callback);
}
void Client_subscribeAsync_topics(Client& client, const std::vector<std::string>& topics,
const std::string& subscriptionName, ConsumerConfiguration conf,
SubscribeCallback callback) {
py::gil_scoped_release release;
client.subscribeAsync(topics, subscriptionName, conf, callback);
}
void Client_subscribeAsync_pattern(Client& client, const std::string& topic_pattern,
const std::string& subscriptionName, ConsumerConfiguration conf,
SubscribeCallback callback) {
py::gil_scoped_release release;
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
}
void export_client(py::module_& m) {
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
.def(py::init<const std::string&, const ClientConfiguration&>())
.def_static("create_auto_cluster_failover", &Client_createAutoClusterFailover, py::arg("primary"),
py::arg("secondary"), py::arg("check_interval_ms"), py::arg("failover_threshold"),
py::arg("switch_back_threshold"), py::arg("client_configuration"))
.def_static("create_service_info_provider", &Client_createServiceInfoProvider, py::arg("provider"),
py::arg("client_configuration"))
.def("create_producer", &Client_createProducer)
.def("create_producer_async", &Client_createProducerAsync)
.def("subscribe", &Client_subscribe)
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_pattern", &Client_subscribe_pattern)
.def("create_reader", &Client_createReader)
.def("create_table_view",
[](Client& client, const std::string& topic, const TableViewConfiguration& config) {
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
client.createTableViewAsync(topic, config, callback);
});
})
.def("get_topic_partitions", &Client_getTopicPartitions)
.def("get_service_info", &Client::getServiceInfo)
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
.def("close_async", &Client_closeAsync)
.def("get_topic_partitions_async", &Client_getTopicPartitionsAsync)
.def("subscribe_async", &Client_subscribeAsync)
.def("subscribe_async_topics", &Client_subscribeAsync_topics)
.def("subscribe_async_pattern", &Client_subscribeAsync_pattern)
.def("shutdown", &Client::shutdown);
}