blob: b93380bc7afb9f45e451e091c38a19063139b0ca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "utils.h"
#include <datetime.h>
#include <boost/python/suite/indexing/map_indexing_suite.hpp>
#include <boost/python/suite/indexing/vector_indexing_suite.hpp>
std::string MessageId_str(const MessageId& msgId) {
std::stringstream ss;
ss << msgId;
return ss.str();
}
bool MessageId_eq(const MessageId& a, const MessageId& b) { return a == b; }
bool MessageId_ne(const MessageId& a, const MessageId& b) { return a != b; }
bool MessageId_lt(const MessageId& a, const MessageId& b) { return a < b; }
bool MessageId_le(const MessageId& a, const MessageId& b) { return a <= b; }
bool MessageId_gt(const MessageId& a, const MessageId& b) { return a > b; }
bool MessageId_ge(const MessageId& a, const MessageId& b) { return a >= b; }
boost::python::object MessageId_serialize(const MessageId& msgId) {
std::string serialized;
msgId.serialize(serialized);
return boost::python::object(
boost::python::handle<>(PyBytes_FromStringAndSize(serialized.c_str(), serialized.length())));
}
std::string Message_str(const Message& msg) {
std::stringstream ss;
ss << msg;
return ss.str();
}
boost::python::object Message_data(const Message& msg) {
return boost::python::object(
boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength())));
}
boost::python::object Message_properties(const Message& msg) {
boost::python::dict pyProperties;
for (const auto& item : msg.getProperties()) {
pyProperties[item.first] = item.second;
}
return boost::python::object(std::move(pyProperties));
}
std::string Topic_name_str(const Message& msg) {
std::stringstream ss;
ss << msg.getTopicName();
return ss.str();
}
std::string schema_version_str(const Message& msg) {
std::stringstream ss;
ss << msg.getSchemaVersion();
return ss.str();
}
const MessageId& Message_getMessageId(const Message& msg) { return msg.getMessageId(); }
void deliverAfter(MessageBuilder* const builder, PyObject* obj_delta) {
PyDateTime_Delta const* pydelta = reinterpret_cast<PyDateTime_Delta*>(obj_delta);
long days = pydelta->days;
const bool is_negative = days < 0;
if (is_negative) {
days = -days;
}
// Create chrono duration object
std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::hours(24) * days + std::chrono::seconds(pydelta->seconds) +
std::chrono::microseconds(pydelta->microseconds));
if (is_negative) {
duration = duration * -1;
}
builder->setDeliverAfter(duration);
}
void export_message() {
using namespace boost::python;
PyDateTime_IMPORT;
MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const std::string&) =
&MessageBuilder::setContent;
class_<MessageBuilder, boost::noncopyable>("MessageBuilder")
.def("content", MessageBuilderSetContentString, return_self<>())
.def("property", &MessageBuilder::setProperty, return_self<>())
.def("properties", &MessageBuilder::setProperties, return_self<>())
.def("sequence_id", &MessageBuilder::setSequenceId, return_self<>())
.def("deliver_after", &deliverAfter, return_self<>())
.def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>())
.def("partition_key", &MessageBuilder::setPartitionKey, return_self<>())
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>())
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>())
.def("disable_replication", &MessageBuilder::disableReplication, return_self<>())
.def("build", &MessageBuilder::build);
class_<Message::StringMap>("MessageStringMap").def(map_indexing_suite<Message::StringMap>());
static const MessageId& _MessageId_earliest = MessageId::earliest();
static const MessageId& _MessageId_latest = MessageId::latest();
class_<MessageId>("MessageId")
.def(init<int32_t, int64_t, int64_t, int32_t>())
.def("__str__", &MessageId_str)
.def("__eq__", &MessageId_eq)
.def("__ne__", &MessageId_ne)
.def("__le__", &MessageId_le)
.def("__lt__", &MessageId_lt)
.def("__ge__", &MessageId_ge)
.def("__gt__", &MessageId_gt)
.def("ledger_id", &MessageId::ledgerId)
.def("entry_id", &MessageId::entryId)
.def("batch_index", &MessageId::batchIndex)
.def("partition", &MessageId::partition)
.add_static_property("earliest", make_getter(&_MessageId_earliest))
.add_static_property("latest", make_getter(&_MessageId_latest))
.def("serialize", &MessageId_serialize)
.def("deserialize", &MessageId::deserialize)
.staticmethod("deserialize");
class_<Message>("Message")
.def("properties", &Message_properties)
.def("data", &Message_data)
.def("length", &Message::getLength)
.def("partition_key", &Message::getPartitionKey, return_value_policy<copy_const_reference>())
.def("publish_timestamp", &Message::getPublishTimestamp)
.def("event_timestamp", &Message::getEventTimestamp)
.def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>())
.def("__str__", &Message_str)
.def("topic_name", &Topic_name_str)
.def("redelivery_count", &Message::getRedeliveryCount)
.def("schema_version", &schema_version_str);
MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,
uint32_t batchSize) = &MessageBatch::parseFrom;
class_<MessageBatch>("MessageBatch")
.def("with_message_id", &MessageBatch::withMessageId, return_self<>())
.def("parse_from", MessageBatchParseFromString, return_self<>())
.def("messages", &MessageBatch::messages, return_value_policy<copy_const_reference>());
class_<std::vector<Message> >("Messages").def(vector_indexing_suite<std::vector<Message> >());
}