| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "ProducerConfig.h" |
| #include <map> |
| |
| static const std::string CFG_TOPIC = "topic"; |
| static const std::string CFG_PRODUCER_NAME = "producerName"; |
| static const std::string CFG_SEND_TIMEOUT = "sendTimeoutMs"; |
| static const std::string CFG_INIT_SEQUENCE_ID = "initialSequenceId"; |
| static const std::string CFG_MAX_PENDING = "maxPendingMessages"; |
| static const std::string CFG_MAX_PENDING_ACROSS_PARTITIONS = "maxPendingMessagesAcrossPartitions"; |
| static const std::string CFG_BLOCK_IF_QUEUE_FULL = "blockIfQueueFull"; |
| static const std::string CFG_ROUTING_MODE = "messageRoutingMode"; |
| static const std::string CFG_HASH_SCHEME = "hashingScheme"; |
| static const std::string CFG_COMPRESS_TYPE = "compressionType"; |
| static const std::string CFG_BATCH_ENABLED = "batchingEnabled"; |
| static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs"; |
| static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages"; |
| static const std::string CFG_PROPS = "properties"; |
| |
| static const std::map<std::string, pulsar_partitions_routing_mode> MESSAGE_ROUTING_MODE = { |
| {"UseSinglePartition", pulsar_UseSinglePartition}, |
| {"RoundRobinDistribution", pulsar_RoundRobinDistribution}, |
| {"CustomPartition", pulsar_CustomPartition}}; |
| |
| static const std::map<std::string, pulsar_hashing_scheme> HASHING_SCHEME = { |
| {"Murmur3_32Hash", pulsar_Murmur3_32Hash}, |
| {"BoostHash", pulsar_BoostHash}, |
| {"JavaStringHash", pulsar_JavaStringHash}, |
| }; |
| |
| static std::map<std::string, pulsar_compression_type> COMPRESSION_TYPE = {{"Zlib", pulsar_CompressionZLib}, |
| {"LZ4", pulsar_CompressionLZ4}}; |
| |
| ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { |
| this->cProducerConfig = pulsar_producer_configuration_create(); |
| |
| if (producerConfig.Has(CFG_TOPIC) && producerConfig.Get(CFG_TOPIC).IsString()) { |
| this->topic = producerConfig.Get(CFG_TOPIC).ToString().Utf8Value(); |
| } |
| |
| if (producerConfig.Has(CFG_PRODUCER_NAME) && producerConfig.Get(CFG_PRODUCER_NAME).IsString()) { |
| std::string producerName = producerConfig.Get(CFG_PRODUCER_NAME).ToString().Utf8Value(); |
| if (!producerName.empty()) |
| pulsar_producer_configuration_set_producer_name(this->cProducerConfig, producerName.c_str()); |
| } |
| |
| if (producerConfig.Has(CFG_SEND_TIMEOUT) && producerConfig.Get(CFG_SEND_TIMEOUT).IsNumber()) { |
| int32_t sendTimeoutMs = producerConfig.Get(CFG_SEND_TIMEOUT).ToNumber().Int32Value(); |
| if (sendTimeoutMs > 0) { |
| pulsar_producer_configuration_set_send_timeout(this->cProducerConfig, sendTimeoutMs); |
| } |
| } |
| |
| if (producerConfig.Has(CFG_INIT_SEQUENCE_ID) && producerConfig.Get(CFG_INIT_SEQUENCE_ID).IsNumber()) { |
| int64_t initialSequenceId = producerConfig.Get(CFG_INIT_SEQUENCE_ID).ToNumber().Int64Value(); |
| pulsar_producer_configuration_set_initial_sequence_id(this->cProducerConfig, initialSequenceId); |
| } |
| |
| if (producerConfig.Has(CFG_MAX_PENDING) && producerConfig.Get(CFG_MAX_PENDING).IsNumber()) { |
| int32_t maxPendingMessages = producerConfig.Get(CFG_MAX_PENDING).ToNumber().Int32Value(); |
| if (maxPendingMessages > 0) { |
| pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, maxPendingMessages); |
| } |
| } |
| |
| if (producerConfig.Has(CFG_MAX_PENDING_ACROSS_PARTITIONS) && |
| producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).IsNumber()) { |
| int32_t maxPendingMessagesAcrossPartitions = |
| producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).ToNumber().Int32Value(); |
| if (maxPendingMessagesAcrossPartitions > 0) { |
| pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, |
| maxPendingMessagesAcrossPartitions); |
| } |
| } |
| |
| if (producerConfig.Has(CFG_BLOCK_IF_QUEUE_FULL) && |
| producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).IsBoolean()) { |
| bool blockIfQueueFull = producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).ToBoolean().Value(); |
| pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig, blockIfQueueFull); |
| } |
| |
| if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) { |
| std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value(); |
| if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) |
| pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig, |
| MESSAGE_ROUTING_MODE.at(messageRoutingMode)); |
| } |
| |
| if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) { |
| std::string hashingScheme = producerConfig.Get(CFG_HASH_SCHEME).ToString().Utf8Value(); |
| if (HASHING_SCHEME.count(hashingScheme)) |
| pulsar_producer_configuration_set_hashing_scheme(this->cProducerConfig, |
| HASHING_SCHEME.at(hashingScheme)); |
| } |
| |
| if (producerConfig.Has(CFG_COMPRESS_TYPE) && producerConfig.Get(CFG_COMPRESS_TYPE).IsString()) { |
| std::string compressionType = producerConfig.Get(CFG_COMPRESS_TYPE).ToString().Utf8Value(); |
| if (COMPRESSION_TYPE.count(compressionType)) |
| pulsar_producer_configuration_set_compression_type(this->cProducerConfig, |
| COMPRESSION_TYPE.at(compressionType)); |
| } |
| |
| if (producerConfig.Has(CFG_BATCH_ENABLED) && producerConfig.Get(CFG_BATCH_ENABLED).IsBoolean()) { |
| bool batchingEnabled = producerConfig.Get(CFG_BATCH_ENABLED).ToBoolean().Value(); |
| pulsar_producer_configuration_set_batching_enabled(this->cProducerConfig, batchingEnabled); |
| } |
| |
| if (producerConfig.Has(CFG_BATCH_MAX_DELAY) && producerConfig.Get(CFG_BATCH_MAX_DELAY).IsNumber()) { |
| int64_t batchingMaxPublishDelayMs = producerConfig.Get(CFG_BATCH_MAX_DELAY).ToNumber().Int64Value(); |
| if (batchingMaxPublishDelayMs > 0) { |
| pulsar_producer_configuration_set_batching_max_publish_delay_ms(this->cProducerConfig, |
| (long)batchingMaxPublishDelayMs); |
| } |
| } |
| |
| if (producerConfig.Has(CFG_BATCH_MAX_MSG) && producerConfig.Get(CFG_BATCH_MAX_MSG).IsNumber()) { |
| uint32_t batchingMaxMessages = producerConfig.Get(CFG_BATCH_MAX_MSG).ToNumber().Uint32Value(); |
| if (batchingMaxMessages > 0) { |
| pulsar_producer_configuration_set_batching_max_messages(this->cProducerConfig, batchingMaxMessages); |
| } |
| } |
| |
| if (producerConfig.Has(CFG_PROPS) && producerConfig.Get(CFG_PROPS).IsObject()) { |
| Napi::Object propObj = producerConfig.Get(CFG_PROPS).ToObject(); |
| Napi::Array arr = propObj.GetPropertyNames(); |
| int size = arr.Length(); |
| for (int i = 0; i < size; i++) { |
| Napi::String key = arr.Get(i).ToString(); |
| Napi::String value = propObj.Get(key).ToString(); |
| pulsar_producer_configuration_set_property(this->cProducerConfig, key.Utf8Value().c_str(), |
| value.Utf8Value().c_str()); |
| } |
| } |
| } |
| |
| ProducerConfig::~ProducerConfig() { pulsar_producer_configuration_free(this->cProducerConfig); } |
| |
| pulsar_producer_configuration_t* ProducerConfig::GetCProducerConfig() { return this->cProducerConfig; } |
| |
| std::string ProducerConfig::GetTopic() { return this->topic; } |