| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "Message.h" |
| #include "MessageId.h" |
| #include <pulsar/c/message.h> |
| |
| static const std::string CFG_DATA = "data"; |
| static const std::string CFG_PROPS = "properties"; |
| static const std::string CFG_EVENT_TIME = "eventTimestamp"; |
| static const std::string CFG_SEQUENCE_ID = "sequenceId"; |
| static const std::string CFG_PARTITION_KEY = "partitionKey"; |
| static const std::string CFG_REPL_CLUSTERS = "replicationClusters"; |
| |
| Napi::FunctionReference Message::constructor; |
| |
| Napi::Object Message::Init(Napi::Env env, Napi::Object exports) { |
| Napi::HandleScope scope(env); |
| |
| Napi::Function func = DefineClass( |
| env, "Message", |
| {InstanceMethod("getTopicName", &Message::GetTopicName), |
| InstanceMethod("getProperties", &Message::GetProperties), InstanceMethod("getData", &Message::GetData), |
| InstanceMethod("getMessageId", &Message::GetMessageId), |
| InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp), |
| InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp), |
| InstanceMethod("getPartitionKey", &Message::GetPartitionKey)}); |
| |
| constructor = Napi::Persistent(func); |
| constructor.SuppressDestruct(); |
| |
| exports.Set("Message", func); |
| return exports; |
| } |
| |
| Napi::Object Message::NewInstance(Napi::Value arg, pulsar_message_t *cMessage) { |
| Napi::Object obj = constructor.New({}); |
| Message *msg = Unwrap(obj); |
| msg->cMessage = cMessage; |
| return obj; |
| } |
| |
| Message::Message(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Message>(info), cMessage(nullptr) {} |
| |
| pulsar_message_t *Message::GetCMessage() { return this->cMessage; } |
| |
| Napi::Value Message::GetTopicName(const Napi::CallbackInfo &info) { |
| Napi::Env env = info.Env(); |
| if (!ValidateCMessage(env)) { |
| return env.Null(); |
| } |
| return Napi::String::New(env, pulsar_message_get_topic_name(this->cMessage)); |
| } |
| |
| Napi::Value Message::GetProperties(const Napi::CallbackInfo &info) { |
| Napi::Env env = info.Env(); |
| if (!ValidateCMessage(env)) { |
| return env.Null(); |
| } |
| Napi::Array arr = Napi::Array::New(env); |
| pulsar_string_map_t *cProperties = pulsar_message_get_properties(this->cMessage); |
| int size = pulsar_string_map_size(cProperties); |
| for (int i = 0; i < size; i++) { |
| arr.Set(pulsar_string_map_get_key(cProperties, i), pulsar_string_map_get_value(cProperties, i)); |
| } |
| return arr; |
| } |
| |
| Napi::Value Message::GetData(const Napi::CallbackInfo &info) { |
| Napi::Env env = info.Env(); |
| if (!ValidateCMessage(env)) { |
| return env.Null(); |
| } |
| void *data = const_cast<void *>(pulsar_message_get_data(this->cMessage)); |
| size_t size = (size_t)pulsar_message_get_length(this->cMessage); |
| return Napi::Buffer<char>::New(env, (char *)data, size); |
| } |
| |
| Napi::Value Message::GetMessageId(const Napi::CallbackInfo &info) { |
| Napi::Env env = info.Env(); |
| if (!ValidateCMessage(env)) { |
| return env.Null(); |
| } |
| return MessageId::NewInstanceFromMessage(info, this->cMessage); |
| } |
| |
| Napi::Value Message::GetEventTimestamp(const Napi::CallbackInfo &info) { |
| Napi::Env env = info.Env(); |
| if (!ValidateCMessage(env)) { |
| return env.Null(); |
| } |
| return Napi::Number::New(env, pulsar_message_get_event_timestamp(this->cMessage)); |
| } |
| |
| Napi::Value Message::GetPublishTimestamp(const Napi::CallbackInfo &info) { |
| Napi::Env env = info.Env(); |
| if (!ValidateCMessage(env)) { |
| return env.Null(); |
| } |
| return Napi::Number::New(env, pulsar_message_get_publish_timestamp(this->cMessage)); |
| } |
| |
| Napi::Value Message::GetPartitionKey(const Napi::CallbackInfo &info) { |
| Napi::Env env = info.Env(); |
| if (!ValidateCMessage(env)) { |
| return env.Null(); |
| } |
| return Napi::String::New(env, pulsar_message_get_partitionKey(this->cMessage)); |
| } |
| |
| bool Message::ValidateCMessage(Napi::Env env) { |
| if (this->cMessage) { |
| return true; |
| } else { |
| Napi::Error::New(env, "Message has not been built").ThrowAsJavaScriptException(); |
| return false; |
| } |
| } |
| |
| pulsar_message_t *Message::BuildMessage(Napi::Object conf) { |
| pulsar_message_t *cMessage = pulsar_message_create(); |
| |
| if (conf.Has(CFG_DATA) && conf.Get(CFG_DATA).IsBuffer()) { |
| Napi::Buffer<char> buf = conf.Get(CFG_DATA).As<Napi::Buffer<char>>(); |
| char *data = buf.Data(); |
| pulsar_message_set_content(cMessage, data, buf.Length()); |
| } |
| |
| if (conf.Has(CFG_PROPS) && conf.Get(CFG_PROPS).IsObject()) { |
| Napi::Object propObj = conf.Get(CFG_PROPS).ToObject(); |
| Napi::Array arr = propObj.GetPropertyNames(); |
| int size = arr.Length(); |
| for (int i = 0; i < size; i++) { |
| Napi::String key = arr.Get(i).ToString(); |
| Napi::String value = propObj.Get(key).ToString(); |
| pulsar_message_set_property(cMessage, key.Utf8Value().c_str(), value.Utf8Value().c_str()); |
| } |
| } |
| |
| if (conf.Has(CFG_EVENT_TIME) && conf.Get(CFG_EVENT_TIME).IsNumber()) { |
| int64_t eventTimestamp = conf.Get(CFG_EVENT_TIME).ToNumber().Int64Value(); |
| if (eventTimestamp >= 0) { |
| pulsar_message_set_event_timestamp(cMessage, eventTimestamp); |
| } |
| } |
| |
| if (conf.Has(CFG_SEQUENCE_ID) && conf.Get(CFG_SEQUENCE_ID).IsNumber()) { |
| Napi::Number sequenceId = conf.Get(CFG_SEQUENCE_ID).ToNumber(); |
| pulsar_message_set_sequence_id(cMessage, sequenceId.Int64Value()); |
| } |
| |
| if (conf.Has(CFG_PARTITION_KEY) && conf.Get(CFG_PARTITION_KEY).IsString()) { |
| Napi::String partitionKey = conf.Get(CFG_PARTITION_KEY).ToString(); |
| pulsar_message_set_partition_key(cMessage, partitionKey.Utf8Value().c_str()); |
| } |
| |
| if (conf.Has(CFG_REPL_CLUSTERS) && conf.Get(CFG_REPL_CLUSTERS).IsArray()) { |
| Napi::Array clusters = conf.Get(CFG_REPL_CLUSTERS).As<Napi::Array>(); |
| // Empty list means to disable replication |
| int length = clusters.Length(); |
| if (length == 0) { |
| pulsar_message_disable_replication(cMessage, 1); |
| } else { |
| char **arr = NewStringArray(length); |
| for (int i = 0; i < length; i++) { |
| SetString(arr, clusters.Get(i).ToString().Utf8Value().c_str(), i); |
| } |
| // TODO: temoporalily commented out unless 2.3.1 which includes interface change of |
| // pulsar_message_set_replication_clusters (#3729) is released |
| // pulsar_message_set_replication_clusters(cMessage, (const char **)arr, length); |
| FreeStringArray(arr, length); |
| } |
| } |
| return cMessage; |
| } |
| |
| Message::~Message() { |
| if (this->cMessage != nullptr) { |
| pulsar_message_free(this->cMessage); |
| } |
| } |