blob: cec9d3bf63b080117246d18c06db7369f96bf1d7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Commands.h"
#include <pulsar/MessageBuilder.h>
#include <pulsar/MessageIdBuilder.h>
#include <pulsar/Schema.h>
#include <pulsar/Version.h>
#include <algorithm>
#include <mutex>
#include "BatchMessageAcker.h"
#include "BatchedMessageIdImpl.h"
#include "BitSet.h"
#include "ChunkMessageIdImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "PulsarApi.pb.h"
#include "Url.h"
#include "checksum/ChecksumProvider.h"
using namespace pulsar;
namespace pulsar {
DECLARE_LOG_OBJECT();
using proto::AuthData;
using proto::BaseCommand;
using proto::CommandAck;
using proto::CommandAuthResponse;
using proto::CommandCloseConsumer;
using proto::CommandCloseProducer;
using proto::CommandConnect;
using proto::CommandConsumerStats;
using proto::CommandFlow;
using proto::CommandGetLastMessageId;
using proto::CommandGetTopicsOfNamespace;
using proto::CommandLookupTopic;
using proto::CommandPartitionedTopicMetadata;
using proto::CommandProducer;
using proto::CommandRedeliverUnacknowledgedMessages;
using proto::CommandSeek;
using proto::CommandSend;
using proto::CommandSubscribe;
using proto::CommandUnsubscribe;
using proto::FeatureFlags;
using proto::IntRange;
using proto::KeySharedMeta;
using proto::MessageIdData;
using proto::ProtocolVersion_MAX;
using proto::SingleMessageMetadata;
static inline bool isBuiltInSchema(SchemaType schemaType) {
switch (schemaType) {
case STRING:
case JSON:
case AVRO:
case PROTOBUF:
case PROTOBUF_NATIVE:
case KEY_VALUE:
return true;
default:
return false;
}
}
static inline proto::Schema_Type getSchemaType(SchemaType type) {
switch (type) {
case SchemaType::NONE:
return proto::Schema_Type_None;
case STRING:
return proto::Schema_Type_String;
case JSON:
return proto::Schema_Type_Json;
case PROTOBUF:
return proto::Schema_Type_Protobuf;
case AVRO:
return proto::Schema_Type_Avro;
case PROTOBUF_NATIVE:
return proto::Schema_Type_ProtobufNative;
case KEY_VALUE:
return proto::Schema_Type_KeyValue;
default:
return proto::Schema_Type_None;
}
}
static proto::Schema* getSchema(const SchemaInfo& schemaInfo) {
proto::Schema* schema = proto::Schema().New();
schema->set_name(schemaInfo.getName());
schema->set_schema_data(schemaInfo.getSchema());
schema->set_type(getSchemaType(schemaInfo.getSchemaType()));
for (const auto& kv : schemaInfo.getProperties()) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(kv.first);
keyValue->set_value(kv.second);
schema->mutable_properties()->AddAllocated(keyValue);
}
return schema;
}
SharedBuffer Commands::writeMessageWithSize(const BaseCommand& cmd) {
size_t cmdSize = cmd.ByteSize();
size_t frameSize = 4 + cmdSize;
size_t bufferSize = 4 + frameSize;
SharedBuffer buffer = SharedBuffer::allocate(bufferSize);
buffer.writeUnsignedInt(frameSize);
buffer.writeUnsignedInt(cmdSize);
cmd.SerializeToArray(buffer.mutableData(), cmdSize);
buffer.bytesWritten(cmdSize);
return buffer;
}
SharedBuffer Commands::newPartitionMetadataRequest(const std::string& topic, uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::PARTITIONED_METADATA);
CommandPartitionedTopicMetadata* partitionMetadata = cmd.mutable_partitionmetadata();
partitionMetadata->set_topic(topic);
partitionMetadata->set_request_id(requestId);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_partitionmetadata();
return buffer;
}
SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
const std::string& listenerName) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::LOOKUP);
CommandLookupTopic* lookup = cmd.mutable_lookuptopic();
lookup->set_topic(topic);
lookup->set_authoritative(authoritative);
lookup->set_request_id(requestId);
lookup->set_advertised_listener_name(listenerName);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_lookuptopic();
return buffer;
}
SharedBuffer Commands::newGetSchema(const std::string& topic, const std::string& version,
uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::GET_SCHEMA);
auto getSchema = cmd.mutable_getschema();
getSchema->set_topic(topic);
getSchema->set_request_id(requestId);
if (!version.empty()) {
getSchema->set_schema_version(version);
}
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_getschema();
return buffer;
}
SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) {
static BaseCommand cmd;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::CONSUMER_STATS);
CommandConsumerStats* consumerStats = cmd.mutable_consumerstats();
consumerStats->set_consumer_id(consumerId);
consumerStats->set_request_id(requestId);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_consumerstats();
return buffer;
}
PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint64_t producerId,
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload) {
cmd.set_type(BaseCommand::SEND);
CommandSend* send = cmd.mutable_send();
send->set_producer_id(producerId);
send->set_sequence_id(sequenceId);
if (metadata.has_num_messages_in_batch()) {
send->set_num_messages(metadata.num_messages_in_batch());
}
if (metadata.has_chunk_id()) {
send->set_is_chunk(true);
}
// / Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
int cmdSize = cmd.ByteSize();
int msgMetadataSize = metadata.ByteSize();
int payloadSize = payload.readableBytes();
int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic + checksumLength*/) : 0;
bool includeChecksum = magicAndChecksumLength > 0;
int headerContentSize =
4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize; // cmdLength + cmdSize + magicLength +
// checksumSize + msgMetadataLength + msgMetadataSize
int totalSize = headerContentSize + payloadSize;
int checksumReaderIndex = -1;
headers.reset();
assert(headers.writableBytes() >= (4 + headerContentSize)); // totalSize + headerLength
headers.writeUnsignedInt(totalSize); // External frame
// Write cmd
headers.writeUnsignedInt(cmdSize);
cmd.SerializeToArray(headers.mutableData(), cmdSize);
headers.bytesWritten(cmdSize);
// Create checksum placeholder
if (includeChecksum) {
headers.writeUnsignedShort(magicCrc32c);
checksumReaderIndex = headers.writerIndex();
headers.skipBytes(checksumSize); // skip 4 bytes of checksum
}
// Write metadata
headers.writeUnsignedInt(msgMetadataSize);
metadata.SerializeToArray(headers.mutableData(), msgMetadataSize);
headers.bytesWritten(msgMetadataSize);
PairSharedBuffer composite;
composite.set(0, headers);
composite.set(1, payload);
// Write checksum at created checksum-placeholder
if (includeChecksum) {
int writeIndex = headers.writerIndex();
int metadataStartIndex = checksumReaderIndex + checksumSize;
uint32_t metadataChecksum =
computeChecksum(0, headers.data() + metadataStartIndex, (writeIndex - metadataStartIndex));
uint32_t computedChecksum =
computeChecksum(metadataChecksum, payload.data(), payload.readableBytes());
// set computed checksum
headers.setWriterIndex(checksumReaderIndex);
headers.writeUnsignedInt(computedChecksum);
headers.setWriterIndex(writeIndex);
}
cmd.clear_send();
return composite;
}
SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy, const std::string& clientVersion,
Result& result) {
BaseCommand cmd;
cmd.set_type(BaseCommand::CONNECT);
CommandConnect* connect = cmd.mutable_connect();
connect->set_client_version(clientVersion);
connect->set_auth_method_name(authentication->getAuthMethodName());
connect->set_protocol_version(ProtocolVersion_MAX);
FeatureFlags* flags = connect->mutable_feature_flags();
flags->set_supports_auth_refresh(true);
if (connectingThroughProxy) {
Url logicalAddressUrl;
Url::parse(logicalAddress, logicalAddressUrl);
connect->set_proxy_to_broker_url(logicalAddressUrl.hostPort());
}
AuthenticationDataPtr authDataContent;
result = authentication->getAuthData(authDataContent);
if (result != ResultOk) {
return SharedBuffer{};
}
if (authDataContent->hasDataFromCommand()) {
connect->set_auth_data(authDataContent->getCommandData());
}
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication, Result& result) {
BaseCommand cmd;
cmd.set_type(BaseCommand::AUTH_RESPONSE);
CommandAuthResponse* authResponse = cmd.mutable_authresponse();
authResponse->set_client_version(std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR);
AuthData* authData = authResponse->mutable_response();
authData->set_auth_method_name(authentication->getAuthMethodName());
AuthenticationDataPtr authDataContent;
result = authentication->getAuthData(authDataContent);
if (result != ResultOk) {
return SharedBuffer{};
}
if (authDataContent->hasDataFromCommand()) {
authData->set_auth_data(authDataContent->getCommandData());
}
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
int priorityLevel) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
subscribe->set_topic(topic);
subscribe->set_subscription(subscription);
subscribe->set_subtype(static_cast<proto::CommandSubscribe_SubType>(subType));
subscribe->set_consumer_id(consumerId);
subscribe->set_request_id(requestId);
subscribe->set_consumer_name(consumerName);
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
subscribe->set_read_compacted(readCompacted);
subscribe->set_initialposition(
static_cast<proto::CommandSubscribe_InitialPosition>(subscriptionInitialPosition));
subscribe->set_replicate_subscription_state(replicateSubscriptionState);
subscribe->set_priority_level(priorityLevel);
if (isBuiltInSchema(schemaInfo.getSchemaType())) {
subscribe->set_allocated_schema(getSchema(schemaInfo));
}
if (startMessageId) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
messageIdData.set_entryid(startMessageId.value().entryId());
if (startMessageId.value().batchIndex() != -1) {
messageIdData.set_batch_index(startMessageId.value().batchIndex());
}
}
for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(it->first);
keyValue->set_value(it->second);
subscribe->mutable_metadata()->AddAllocated(keyValue);
}
for (const auto& subscriptionProperty : subscriptionProperties) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(subscriptionProperty.first);
keyValue->set_value(subscriptionProperty.second);
subscribe->mutable_subscription_properties()->AddAllocated(keyValue);
}
if (subType == CommandSubscribe_SubType_Key_Shared) {
KeySharedMeta& ksm = *subscribe->mutable_keysharedmeta();
switch (keySharedPolicy.getKeySharedMode()) {
case pulsar::AUTO_SPLIT:
ksm.set_keysharedmode(proto::KeySharedMode::AUTO_SPLIT);
break;
case pulsar::STICKY:
ksm.set_keysharedmode(proto::KeySharedMode::STICKY);
for (StickyRange range : keySharedPolicy.getStickyRanges()) {
IntRange* intRange = IntRange().New();
intRange->set_start(range.first);
intRange->set_end(range.second);
ksm.mutable_hashranges()->AddAllocated(intRange);
}
}
ksm.set_allowoutoforderdelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
}
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::UNSUBSCRIBE);
CommandUnsubscribe* unsubscribe = cmd.mutable_unsubscribe();
unsubscribe->set_consumer_id(consumerId);
unsubscribe->set_request_id(requestId);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
const std::string& initialSubscriptionName) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
producer->set_topic(topic);
producer->set_producer_id(producerId);
producer->set_request_id(requestId);
producer->set_epoch(epoch);
producer->set_user_provided_producer_name(userProvidedProducerName);
producer->set_encrypted(encrypted);
producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
if (topicEpoch) {
producer->set_topic_epoch(topicEpoch.value());
}
if (!initialSubscriptionName.empty()) {
producer->set_initial_subscription_name(initialSubscriptionName);
}
for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(it->first);
keyValue->set_value(it->second);
producer->mutable_metadata()->AddAllocated(keyValue);
}
if (isBuiltInSchema(schemaInfo.getSchemaType())) {
producer->set_allocated_schema(getSchema(schemaInfo));
}
if (!producerName.empty()) {
producer->set_producer_name(producerName);
}
return writeMessageWithSize(cmd);
}
static void configureCommandAck(CommandAck* ack, uint64_t consumerId, int64_t ledgerId, int64_t entryId,
const BitSet& ackSet, CommandAck_AckType ackType) {
ack->set_consumer_id(consumerId);
ack->set_ack_type(static_cast<proto::CommandAck_AckType>(ackType));
auto* msgId = ack->add_message_id();
msgId->set_ledgerid(ledgerId);
msgId->set_entryid(entryId);
for (auto x : ackSet) {
msgId->add_ack_set(x);
}
}
SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,
CommandAck_AckType ackType) {
BaseCommand cmd;
cmd.set_type(BaseCommand::ACK);
configureCommandAck(cmd.mutable_ack(), consumerId, ledgerId, entryId, ackSet, ackType);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,
CommandAck_AckType ackType, CommandAck_ValidationError validationError) {
BaseCommand cmd;
cmd.set_type(BaseCommand::ACK);
CommandAck* ack = cmd.mutable_ack();
ack->set_validation_error((proto::CommandAck_ValidationError)validationError);
configureCommandAck(ack, consumerId, ledgerId, entryId, ackSet, ackType);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,
CommandAck_AckType ackType, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::ACK);
CommandAck* ack = cmd.mutable_ack();
ack->set_request_id(requestId);
configureCommandAck(ack, consumerId, ledgerId, entryId, ackSet, ackType);
return writeMessageWithSize(cmd);
}
static void configureCommandAck(CommandAck* ack, uint64_t consumerId, const std::set<MessageId>& msgIds) {
ack->set_consumer_id(consumerId);
ack->set_ack_type(proto::CommandAck_AckType_Individual);
for (const auto& msgId : msgIds) {
auto newMsgId = ack->add_message_id();
newMsgId->set_ledgerid(msgId.ledgerId());
newMsgId->set_entryid(msgId.entryId());
for (auto x : Commands::getMessageIdImpl(msgId)->getBitSet()) {
newMsgId->add_ack_set(x);
}
}
}
SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::set<MessageId>& msgIds) {
BaseCommand cmd;
cmd.set_type(BaseCommand::ACK);
configureCommandAck(cmd.mutable_ack(), consumerId, msgIds);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::set<MessageId>& msgIds,
uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::ACK);
CommandAck* ack = cmd.mutable_ack();
ack->set_request_id(requestId);
configureCommandAck(ack, consumerId, msgIds);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newFlow(uint64_t consumerId, uint32_t messagePermits) {
BaseCommand cmd;
cmd.set_type(BaseCommand::FLOW);
CommandFlow* flow = cmd.mutable_flow();
flow->set_consumer_id(consumerId);
flow->set_messagepermits(messagePermits);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newCloseProducer(uint64_t producerId, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::CLOSE_PRODUCER);
CommandCloseProducer* close = cmd.mutable_close_producer();
close->set_producer_id(producerId);
close->set_request_id(requestId);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newCloseConsumer(uint64_t consumerId, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::CLOSE_CONSUMER);
CommandCloseConsumer* close = cmd.mutable_close_consumer();
close->set_consumer_id(consumerId);
close->set_request_id(requestId);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newPing() {
BaseCommand cmd;
cmd.set_type(BaseCommand::PING);
cmd.mutable_ping();
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newPong() {
BaseCommand cmd;
cmd.set_type(BaseCommand::PONG);
cmd.mutable_pong();
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId,
const std::set<MessageId>& messageIds) {
BaseCommand cmd;
cmd.set_type(BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES);
CommandRedeliverUnacknowledgedMessages* command = cmd.mutable_redeliverunacknowledgedmessages();
command->set_consumer_id(consumerId);
for (const auto& msgId : messageIds) {
MessageIdData* msgIdData = command->add_message_ids();
msgIdData->set_ledgerid(msgId.ledgerId());
msgIdData->set_entryid(msgId.entryId());
}
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const MessageId& messageId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SEEK);
CommandSeek* commandSeek = cmd.mutable_seek();
commandSeek->set_consumer_id(consumerId);
commandSeek->set_request_id(requestId);
MessageIdData& messageIdData = *commandSeek->mutable_message_id();
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
messageIdData.set_ledgerid(firstId->ledgerId_);
messageIdData.set_entryid(firstId->entryId_);
} else {
messageIdData.set_ledgerid(messageId.ledgerId());
messageIdData.set_entryid(messageId.entryId());
}
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, uint64_t timestamp) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SEEK);
CommandSeek* commandSeek = cmd.mutable_seek();
commandSeek->set_consumer_id(consumerId);
commandSeek->set_request_id(requestId);
commandSeek->set_message_publish_time(timestamp);
return writeMessageWithSize(cmd);
}
SharedBuffer Commands::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::GET_LAST_MESSAGE_ID);
CommandGetLastMessageId* getLastMessageId = cmd.mutable_getlastmessageid();
getLastMessageId->set_consumer_id(consumerId);
getLastMessageId->set_request_id(requestId);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_getlastmessageid();
return buffer;
}
SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
CommandGetTopicsOfNamespace* getTopics = cmd.mutable_gettopicsofnamespace();
getTopics->set_request_id(requestId);
getTopics->set_namespace_(nsName);
getTopics->set_mode(static_cast<proto::CommandGetTopicsOfNamespace_Mode>(mode));
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_gettopicsofnamespace();
return buffer;
}
std::string Commands::messageType(BaseCommand_Type type) {
switch (type) {
case BaseCommand::CONNECT:
return "CONNECT";
break;
case BaseCommand::CONNECTED:
return "CONNECTED";
break;
case BaseCommand::SUBSCRIBE:
return "SUBSCRIBE";
break;
case BaseCommand::PRODUCER:
return "PRODUCER";
break;
case BaseCommand::SEND:
return "SEND";
break;
case BaseCommand::SEND_RECEIPT:
return "SEND_RECEIPT";
break;
case BaseCommand::SEND_ERROR:
return "SEND_ERROR";
break;
case BaseCommand::MESSAGE:
return "MESSAGE";
break;
case BaseCommand::ACK:
return "ACK";
break;
case BaseCommand::FLOW:
return "FLOW";
break;
case BaseCommand::UNSUBSCRIBE:
return "UNSUBSCRIBE";
break;
case BaseCommand::SUCCESS:
return "SUCCESS";
break;
case BaseCommand::ERROR:
return "ERROR";
break;
case BaseCommand::CLOSE_PRODUCER:
return "CLOSE_PRODUCER";
break;
case BaseCommand::CLOSE_CONSUMER:
return "CLOSE_CONSUMER";
break;
case BaseCommand::PRODUCER_SUCCESS:
return "PRODUCER_SUCCESS";
break;
case BaseCommand::PING:
return "PING";
break;
case BaseCommand::PONG:
return "PONG";
break;
case BaseCommand::PARTITIONED_METADATA:
return "PARTITIONED_METADATA";
break;
case BaseCommand::PARTITIONED_METADATA_RESPONSE:
return "PARTITIONED_METADATA_RESPONSE";
break;
case BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES:
return "REDELIVER_UNACKNOWLEDGED_MESSAGES";
break;
case BaseCommand::LOOKUP:
return "LOOKUP";
break;
case BaseCommand::LOOKUP_RESPONSE:
return "LOOKUP_RESPONSE";
break;
case BaseCommand::CONSUMER_STATS:
return "CONSUMER_STATS";
break;
case BaseCommand::CONSUMER_STATS_RESPONSE:
return "CONSUMER_STATS_RESPONSE";
break;
case BaseCommand::REACHED_END_OF_TOPIC:
return "REACHED_END_OF_TOPIC";
break;
case BaseCommand::SEEK:
return "SEEK";
break;
case BaseCommand::ACTIVE_CONSUMER_CHANGE:
return "ACTIVE_CONSUMER_CHANGE";
break;
case BaseCommand::GET_LAST_MESSAGE_ID:
return "GET_LAST_MESSAGE_ID";
break;
case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
return "GET_LAST_MESSAGE_ID_RESPONSE";
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE:
return "GET_TOPICS_OF_NAMESPACE";
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
return "GET_TOPICS_OF_NAMESPACE_RESPONSE";
break;
case BaseCommand::GET_SCHEMA:
return "GET_SCHEMA";
break;
case BaseCommand::GET_SCHEMA_RESPONSE:
return "GET_SCHEMA_RESPONSE";
break;
case BaseCommand::AUTH_CHALLENGE:
return "AUTH_CHALLENGE";
break;
case BaseCommand::AUTH_RESPONSE:
return "AUTH_RESPONSE";
break;
case BaseCommand::ACK_RESPONSE:
return "ACK_RESPONSE";
break;
case BaseCommand::GET_OR_CREATE_SCHEMA:
return "GET_OR_CREATE_SCHEMA";
case BaseCommand::GET_OR_CREATE_SCHEMA_RESPONSE:
return "GET_OR_CREATE_SCHEMA_RESPONSE";
case BaseCommand::NEW_TXN:
return "NEW_TXN";
break;
case BaseCommand::NEW_TXN_RESPONSE:
return "NEW_TXN_RESPONSE";
break;
case BaseCommand::ADD_PARTITION_TO_TXN:
return "ADD_PARTITION_TO_TXN";
break;
case BaseCommand::ADD_PARTITION_TO_TXN_RESPONSE:
return "ADD_PARTITION_TO_TXN_RESPONSE";
break;
case BaseCommand::ADD_SUBSCRIPTION_TO_TXN:
return "ADD_SUBSCRIPTION_TO_TXN";
break;
case BaseCommand::ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
return "ADD_SUBSCRIPTION_TO_TXN_RESPONSE";
break;
case BaseCommand::END_TXN:
return "END_TXN";
break;
case BaseCommand::END_TXN_RESPONSE:
return "END_TXN_RESPONSE";
break;
case BaseCommand::END_TXN_ON_PARTITION:
return "END_TXN_ON_PARTITION";
break;
case BaseCommand::END_TXN_ON_PARTITION_RESPONSE:
return "END_TXN_ON_PARTITION_RESPONSE";
break;
case BaseCommand::END_TXN_ON_SUBSCRIPTION:
return "END_TXN_ON_SUBSCRIPTION";
break;
case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
return "END_TXN_ON_SUBSCRIPTION_RESPONSE";
break;
case BaseCommand::TC_CLIENT_CONNECT_REQUEST:
return "TC_CLIENT_CONNECT_REQUEST";
case BaseCommand::TC_CLIENT_CONNECT_RESPONSE:
return "TC_CLIENT_CONNECT_RESPONSE";
break;
case BaseCommand::WATCH_TOPIC_LIST:
return "WATCH_TOPIC_LIST";
break;
case BaseCommand::WATCH_TOPIC_LIST_SUCCESS:
return "WATCH_TOPIC_LIST_SUCCESS";
break;
case BaseCommand::WATCH_TOPIC_UPDATE:
return "WATCH_TOPIC_UPDATE";
break;
case BaseCommand::WATCH_TOPIC_LIST_CLOSE:
return "WATCH_TOPIC_LIST_CLOSE";
break;
};
BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration value"));
}
void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::MessageMetadata& batchMetadata) {
// metadata has already been set in ProducerImpl::setMessageMetadata
const proto::MessageMetadata& metadata = msg.impl_->metadata;
// required fields
batchMetadata.set_producer_name(metadata.producer_name());
batchMetadata.set_sequence_id(metadata.sequence_id());
batchMetadata.set_publish_time(metadata.publish_time());
// optional fields
if (metadata.has_partition_key()) {
batchMetadata.set_partition_key(metadata.partition_key());
}
if (metadata.has_ordering_key()) {
batchMetadata.set_ordering_key(metadata.ordering_key());
}
if (metadata.has_replicated_from()) {
batchMetadata.set_replicated_from(metadata.replicated_from());
}
if (metadata.replicate_to_size() > 0) {
for (int i = 0; i < metadata.replicate_to_size(); i++) {
batchMetadata.add_replicate_to(metadata.replicate_to(i));
}
}
if (metadata.has_schema_version()) {
batchMetadata.set_schema_version(metadata.schema_version());
}
}
uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
unsigned long maxMessageSizeInBytes) {
const auto& msgMetadata = msg.impl_->metadata;
SingleMessageMetadata metadata;
if (msgMetadata.has_partition_key()) {
metadata.set_partition_key(msgMetadata.partition_key());
}
if (msgMetadata.has_ordering_key()) {
metadata.set_ordering_key(msgMetadata.ordering_key());
}
metadata.mutable_properties()->Reserve(msgMetadata.properties_size());
for (int i = 0; i < msgMetadata.properties_size(); i++) {
auto keyValue = proto::KeyValue().New();
*keyValue = msgMetadata.properties(i);
metadata.mutable_properties()->AddAllocated(keyValue);
}
if (msgMetadata.has_event_time()) {
metadata.set_event_time(msgMetadata.event_time());
}
if (msgMetadata.has_sequence_id()) {
metadata.set_sequence_id(msgMetadata.sequence_id());
}
// Format of batch message
// Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
int payloadSize = msg.impl_->payload.readableBytes();
metadata.set_payload_size(payloadSize);
int msgMetadataSize = metadata.ByteSize();
unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize;
if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) {
LOG_DEBUG("remaining size of batchPayLoad buffer ["
<< batchPayLoad.writableBytes() << "] can't accomodate new payload [" << requiredSpace
<< "] - expanding the batchPayload buffer");
uint32_t new_size =
std::min(batchPayLoad.readableBytes() * 2, static_cast<uint32_t>(maxMessageSizeInBytes));
new_size = std::max(new_size, batchPayLoad.readableBytes() + static_cast<uint32_t>(requiredSpace));
SharedBuffer buffer = SharedBuffer::allocate(new_size);
// Adding batch created so far
buffer.write(batchPayLoad.data(), batchPayLoad.readableBytes());
batchPayLoad = buffer;
}
// Adding the new message
batchPayLoad.writeUnsignedInt(msgMetadataSize);
metadata.SerializeToArray(batchPayLoad.mutableData(), msgMetadataSize);
batchPayLoad.bytesWritten(msgMetadataSize);
batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
return msgMetadata.sequence_id();
}
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
int32_t batchSize, const BatchMessageAckerPtr& acker) {
SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
// Format of batch message
// Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
const int& singleMetaSize = uncompressedPayload.readUnsignedInt();
SingleMessageMetadata metadata;
metadata.ParseFromArray(uncompressedPayload.data(), singleMetaSize);
uncompressedPayload.consume(singleMetaSize);
const int& payloadSize = metadata.payload_size();
// Get a slice of size payloadSize from offset readIndex_
SharedBuffer payload = uncompressedPayload.slice(0, payloadSize);
uncompressedPayload.consume(payloadSize);
const MessageId& m = batchedMessage.impl_->messageId;
auto messageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
auto batchedMessageId = std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
Message singleMessage(MessageId{batchedMessageId}, batchedMessage.impl_->metadata, payload, metadata,
batchedMessage.impl_->topicName_);
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
return singleMessage;
}
MessageIdImplPtr Commands::getMessageIdImpl(const MessageId& messageId) { return messageId.impl_; }
bool Commands::peerSupportsGetLastMessageId(int32_t peerVersion) { return peerVersion >= proto::v12; }
bool Commands::peerSupportsActiveConsumerListener(int32_t peerVersion) { return peerVersion >= proto::v12; }
bool Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) {
return peerVersion >= proto::v12;
}
bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return peerVersion >= proto::v13; }
bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return peerVersion >= proto::v15; }
} // namespace pulsar
/* namespace pulsar */