blob: 3e9087fcf6c9df503202529723c53486f2889566 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#if WITH_WS_VERSION
#include <ws_version.h>
constexpr int kWiresharkMajorVersion = WIRESHARK_VERSION_MAJOR;
constexpr int kWiresharkMinorVersion = WIRESHARK_VERSION_MINOR;
#else
#include <config.h>
constexpr int kWiresharkMajorVersion = VERSION_MAJOR;
constexpr int kWiresharkMinorVersion = VERSION_MINOR;
#endif
#include <epan/column-utils.h>
#include <epan/dissectors/packet-tcp.h>
#include <epan/packet.h>
#include <epan/proto.h>
#include <epan/value_string.h>
#include <glib.h>
#include <wsutil/nstime.h>
#include <map>
#include "PulsarApi.pb.h"
#ifdef VERSION
#undef VERSION
#endif
/* Version number of package */
#define VERSION "0.0.1"
const static int PULSAR_PORT = 6650;
static int proto_pulsar = -1;
static int hf_pulsar_error = -1;
static int hf_pulsar_error_message = -1;
static int hf_pulsar_cmd_type = -1;
static int hf_pulsar_frame_size = -1;
static int hf_pulsar_cmd_size = -1;
static int hf_pulsar_client_version = -1;
static int hf_pulsar_auth_method = -1;
static int hf_pulsar_auth_data = -1;
static int hf_pulsar_protocol_version = -1;
static int hf_pulsar_server_version = -1;
static int hf_pulsar_topic = -1;
static int hf_pulsar_subscription = -1;
static int hf_pulsar_subType = -1;
static int hf_pulsar_consumer_id = -1;
static int hf_pulsar_producer_id = -1;
static int hf_pulsar_server_error = -1;
static int hf_pulsar_ack_type = -1;
static int hf_pulsar_request_id = -1;
static int hf_pulsar_consumer_name = -1;
static int hf_pulsar_producer_name = -1;
static int hf_pulsar_publish_time = -1;
static int hf_pulsar_deliver_at_time = -1;
static int hf_pulsar_event_time = -1;
static int hf_pulsar_deliver_after_time = -1;
static int hf_pulsar_chunk_id = -1;
static int hf_pulsar_num_chunks_from_msg = -1;
static int hf_pulsar_uuid = -1;
static int hf_pulsar_compression_type = -1;
static int hf_pulsar_uncompressed_size = -1;
static int hf_pulsar_partition_key = -1;
static int hf_pulsar_ordering_key = -1;
static int hf_pulsar_encryption_algo = -1;
static int hf_pulsar_encryption_param = -1;
static int hf_pulsar_encryption_keys = -1;
static int hf_pulsar_replicated_from = -1;
static int hf_pulsar_replicate_to = -1;
static int hf_pulsar_property = -1;
static int hf_pulsar_txnid_least_bits = -1;
static int hf_pulsar_txnid_most_bits = -1;
static int hf_pulsar_request_in = -1;
static int hf_pulsar_response_in = -1;
static int hf_pulsar_publish_latency = -1;
static int hf_pulsar_sequence_id = -1;
static int hf_pulsar_highest_sequence_id = -1;
static int hf_pulsar_message_id = -1;
static int hf_pulsar_message_permits = -1;
static int ett_pulsar = -1;
const static int FRAME_SIZE_LEN = 4;
const static guint16 MAGIC_BROKER_ENTRY_METADATA = 0x0e02;
const static guint16 MAGIC_CRC32C = 0x0e01;
static pulsar::proto::BaseCommand command;
using namespace pulsar::proto;
static const value_string pulsar_cmd_names[] = {
{BaseCommand::CONNECT, "Connect"},
{BaseCommand::CONNECTED, "Connected"},
{BaseCommand::SUBSCRIBE, "Subscribe"},
{BaseCommand::PRODUCER, "Producer"},
{BaseCommand::SEND, "Send"},
{BaseCommand::SEND_RECEIPT, "SendReceipt"},
{BaseCommand::SEND_ERROR, "SendError"},
{BaseCommand::MESSAGE, "Message"},
{BaseCommand::ACK, "Ack"},
{BaseCommand::FLOW, "Flow"},
{BaseCommand::UNSUBSCRIBE, "Unsubscribe"},
{BaseCommand::SUCCESS, "Success"},
{BaseCommand::ERROR, "Error"},
{BaseCommand::CLOSE_PRODUCER, "CloseProducer"},
{BaseCommand::CLOSE_CONSUMER, "CloseConsumer"},
{BaseCommand::PRODUCER_SUCCESS, "ProducerSuccess"},
{BaseCommand::PING, "Ping"},
{BaseCommand::PONG, "Pong"},
{BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES, "RedeliverUnacknowledgedMessages"},
{BaseCommand::PARTITIONED_METADATA, "PartitionedMetadata"},
{BaseCommand::PARTITIONED_METADATA_RESPONSE, "PartitionedMetadataResponse"},
{BaseCommand::LOOKUP, "Lookup"},
{BaseCommand::LOOKUP_RESPONSE, "LookupResponse"},
{BaseCommand::CONSUMER_STATS, "ConsumerStats"},
{BaseCommand::CONSUMER_STATS_RESPONSE, "ConsumerStatsResponse"},
{BaseCommand::SEEK, "Seek"},
{BaseCommand::GET_LAST_MESSAGE_ID, "GetLastMessageId"},
{BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE, "GetLastMessageIdResponse"},
{BaseCommand::ACTIVE_CONSUMER_CHANGE, "ActiveConsumerChange"},
{BaseCommand::GET_TOPICS_OF_NAMESPACE, "GetTopicsOfNamespace"},
{BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE, "GetTopicsOfNamespaceResponse"},
{BaseCommand::GET_SCHEMA, "GetSchema"},
{BaseCommand::GET_SCHEMA_RESPONSE, "GetSchemaResponse"},
{BaseCommand::AUTH_CHALLENGE, "AuthChallenge"},
{BaseCommand::AUTH_RESPONSE, "AuthResponse"},
{BaseCommand::ACK_RESPONSE, "AckResponse"},
{BaseCommand::GET_OR_CREATE_SCHEMA, "GetOrCreateSchema"},
{BaseCommand::AUTH_CHALLENGE, "AuthChallenge"},
{BaseCommand::AUTH_RESPONSE, "AuthResponse"},
{BaseCommand::ACK_RESPONSE, "AckResponse"},
{BaseCommand::GET_OR_CREATE_SCHEMA, "GetOrCreateSchema"},
{BaseCommand::GET_OR_CREATE_SCHEMA_RESPONSE, "GetOrCreateSchemaResponse"},
{BaseCommand::NEW_TXN, "NewTxn"},
{BaseCommand::NEW_TXN_RESPONSE, "NewTxnResponse"},
{BaseCommand::ADD_PARTITION_TO_TXN, "AddPartitionToTxn"},
{BaseCommand::ADD_SUBSCRIPTION_TO_TXN, "AddSubscriptionToTxn"},
{BaseCommand::ADD_SUBSCRIPTION_TO_TXN_RESPONSE, "AddSubscriptionToTxnResponse"},
{BaseCommand::END_TXN, "EndTxn"},
{BaseCommand::END_TXN_RESPONSE, "EndTxnResponse"},
{BaseCommand::END_TXN_ON_PARTITION, "EndTxnOnPartition"},
{BaseCommand::END_TXN_ON_PARTITION_RESPONSE, "EndTxnOnPartitionResponse"},
{BaseCommand::END_TXN_ON_SUBSCRIPTION, "EndTxnOnSubscription"},
{BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE, "EndTxnOnSubscriptionResponse"},
{BaseCommand::TC_CLIENT_CONNECT_REQUEST, "TcClientConnectRequest"},
{BaseCommand::TC_CLIENT_CONNECT_RESPONSE, "TcClientConnectResponse"},
};
static const value_string auth_methods_vs[] = {
{AuthMethodNone, "None"}, //
{AuthMethodYcaV1, "YCAv1"}, //
{AuthMethodAthens, "Athens"} //
};
static const value_string server_errors_vs[] = {
{UnknownError, "UnknownError"},
{MetadataError, "MetadataError"},
{PersistenceError, "PersistenceError"},
{AuthenticationError, "AuthenticationError"},
{AuthorizationError, "AuthorizationError"},
{ConsumerBusy, "ConsumerBusy"},
{ServiceNotReady, "ServiceNotReady"},
{ProducerBlockedQuotaExceededError, "ProducerBlockedQuotaExceededError"},
{ProducerBlockedQuotaExceededException, "ProducerBlockedQuotaExceededException"},
{ChecksumError, "ChecksumError"},
{UnsupportedVersionError, "UnsupportedVersionError"},
{TopicNotFound, "TopicNotFound"},
{SubscriptionNotFound, "SubscriptionNotFound"},
{ConsumerNotFound, "ConsumerNotFound"},
{TooManyRequests, "TooManyRequests"},
{TopicTerminatedError, "TopicTerminatedError"},
{ProducerBusy, "ProducerBusy"},
{InvalidTopicName, "InvalidTopicName"},
{IncompatibleSchema, "IncompatibleSchema"},
{ConsumerAssignError, "ConsumerAssignError"},
{TransactionCoordinatorNotFound, "TransactionCoordinatorNotFound"},
{InvalidTxnStatus, "InvalidTxnStatus"},
{NotAllowedError, "NotAllowedError"},
{TransactionConflict, "TransactionConflict"},
{TransactionNotFound, "TransactionNotFound"},
{ProducerFenced, "ProducerFenced"},
};
static const value_string ack_type_vs[] = {{CommandAck::Individual, "Individual"},
{CommandAck::Cumulative, "Cumulative"}};
static const value_string protocol_version_vs[] = {
{v0, "v0"}, {v1, "v1"}, {v2, "v2"}, {v3, "v3"}, {v4, "v4"}, {v5, "v5"}, {v6, "v6"},
{v7, "v7"}, {v8, "v8"}, {v9, "v9"}, {v10, "v10"}, {v11, "v11"}, {v12, "v12"}, {v13, "v13"},
{v14, "v14"}, {v15, "v15"}, {v16, "v16"}, {v17, "v17"}, {v18, "v18"}, {v19, "v19"},
};
static const value_string sub_type_names_vs[] = {
{CommandSubscribe::Exclusive, "Exclusive"},
{CommandSubscribe::Shared, "Shared"},
{CommandSubscribe::Failover, "Failover"},
{CommandSubscribe::Key_Shared, "Key_Shared"},
};
static const value_string compression_type_name_vs[] = {
{CompressionType::NONE, "NONE"}, {CompressionType::LZ4, "LZ4"}, {CompressionType::ZLIB, "ZLIB"},
{CompressionType::ZSTD, "ZSTD"}, {CompressionType::SNAPPY, "SNAPPY"},
};
static const char* to_str(int value, const value_string* values) {
return val_to_str(value, values, "Unknown (%d)");
}
struct MessageIdComparator {
bool operator()(const MessageIdData& a, const MessageIdData& b) const {
if (a.ledgerid() < b.ledgerid()) {
return true;
} else if (a.ledgerid() == b.ledgerid()) {
return a.entryid() < b.entryid();
} else {
return false;
}
}
};
struct RequestData {
uint32_t requestFrame;
nstime_t requestTimestamp;
uint32_t ackFrame;
nstime_t ackTimestamp;
RequestData() : requestFrame(UINT32_MAX), ackFrame(UINT32_MAX) {}
};
struct RequestResponseData : public RequestData {
uint64_t id; // producer / consumer id
};
struct ProducerData {
std::string topic;
std::string producerName;
std::map<uint64_t, RequestData> messages;
};
struct ConsumerData {
std::string topic;
std::string subscriptionName;
std::string consumerName;
CommandSubscribe::SubType subType;
// Link messages to acks for the consumer
std::map<MessageIdData, RequestData, MessageIdComparator> messages;
};
struct ConnectionState {
std::map<uint64_t, ProducerData> producers;
std::map<uint64_t, ConsumerData> consumers;
std::map<uint64_t, RequestResponseData> requests;
};
static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t* tvb, int offset, int maxOffset) {
if (tvb_get_ntohs(tvb, offset) == MAGIC_BROKER_ENTRY_METADATA) {
offset += 2;
auto brokerEntryMetadataSize = (int)tvb_get_ntohl(tvb, offset);
offset += brokerEntryMetadataSize + 2;
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MAGIC_BROKER_ENTRY_METADATA %d",
brokerEntryMetadataSize);
#endif
}
if (tvb_get_ntohs(tvb, offset) == MAGIC_CRC32C) {
#ifdef DEBUG
auto checksum = tvb_get_ntohl(tvb, offset);
proto_tree_add_debug_text(frame_tree, "[DEBUG] CRC32C %d", checksum);
#endif
// Skip CRC32C Magic (2) and CRC32C checksum (4)
offset += 2;
offset += 4;
}
// Decode message metadata
auto metadataSize = tvb_get_ntohl(tvb, offset);
offset += 4;
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MetadataSize %d, maxOffset %d", metadataSize, maxOffset);
#endif
if (offset + metadataSize > maxOffset) {
// Not enough data to dissect metadata
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] Not enough data to dissect message metadata");
#endif
return;
}
static MessageMetadata msgMetadata;
auto ptr = tvb_get_ptr(tvb, offset, metadataSize);
if (!msgMetadata.ParseFromArray(ptr, metadataSize)) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, offset, metadataSize, true,
"Error parsing protocol buffer message metadata");
return;
}
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata Utf8DebugString : %s",
msgMetadata.Utf8DebugString().c_str());
proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata SerializeAsString : %s",
msgMetadata.SerializeAsString().c_str());
#endif
proto_item* md_tree =
proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize, ett_pulsar, nullptr,
"MessageMetadata / %s / %" G_GINT64_MODIFIER "u",
msgMetadata.producer_name().c_str(), msgMetadata.sequence_id());
proto_tree_add_string(md_tree, hf_pulsar_producer_name, tvb, offset, metadataSize,
msgMetadata.producer_name().c_str());
// IDs
proto_tree_add_uint64(md_tree, hf_pulsar_sequence_id, tvb, offset, metadataSize,
msgMetadata.sequence_id());
if (msgMetadata.has_highest_sequence_id()) {
proto_tree_add_uint64(md_tree, hf_pulsar_highest_sequence_id, tvb, offset, metadataSize,
msgMetadata.highest_sequence_id());
}
if (msgMetadata.has_chunk_id()) {
proto_tree_add_uint(md_tree, hf_pulsar_chunk_id, tvb, offset, metadataSize, msgMetadata.chunk_id());
if (msgMetadata.has_num_chunks_from_msg()) {
proto_tree_add_uint(md_tree, hf_pulsar_num_chunks_from_msg, tvb, offset, metadataSize,
msgMetadata.num_chunks_from_msg());
}
}
if (msgMetadata.has_uuid()) {
proto_tree_add_string(md_tree, hf_pulsar_uuid, tvb, offset, metadataSize, msgMetadata.uuid().c_str());
}
// Times
proto_tree_add_uint64(md_tree, hf_pulsar_publish_time, tvb, offset, metadataSize,
msgMetadata.publish_time());
if (msgMetadata.has_deliver_at_time()) {
proto_tree_add_uint64(md_tree, hf_pulsar_deliver_at_time, tvb, offset, metadataSize,
msgMetadata.deliver_at_time());
proto_tree_add_uint64(md_tree, hf_pulsar_deliver_after_time, tvb, offset, metadataSize,
msgMetadata.deliver_at_time() - msgMetadata.publish_time());
}
if (msgMetadata.has_event_time()) {
proto_tree_add_uint64(md_tree, hf_pulsar_event_time, tvb, offset, metadataSize,
msgMetadata.event_time());
}
// Compression
if (msgMetadata.has_compression()) {
proto_tree_add_string(md_tree, hf_pulsar_compression_type, tvb, offset, metadataSize,
to_str(msgMetadata.compression(), compression_type_name_vs));
}
if (msgMetadata.has_uncompressed_size()) {
proto_tree_add_uint(md_tree, hf_pulsar_uncompressed_size, tvb, offset, metadataSize,
msgMetadata.uncompressed_size());
}
// Keys
if (msgMetadata.has_partition_key()) {
proto_tree_add_string(md_tree, hf_pulsar_partition_key, tvb, offset, metadataSize,
msgMetadata.partition_key().c_str());
}
if (msgMetadata.has_ordering_key()) {
proto_tree_add_string(md_tree, hf_pulsar_ordering_key, tvb, offset, metadataSize,
msgMetadata.ordering_key().c_str());
}
// Encryption
if (msgMetadata.has_encryption_algo()) {
proto_tree_add_string(md_tree, hf_pulsar_encryption_algo, tvb, offset, metadataSize,
msgMetadata.encryption_algo().c_str());
}
if (msgMetadata.has_encryption_param()) {
proto_tree_add_string(md_tree, hf_pulsar_encryption_param, tvb, offset, metadataSize,
msgMetadata.encryption_param().c_str());
}
if (msgMetadata.encryption_keys_size() > 0) {
proto_item* encryption_keys_tree = proto_tree_add_subtree_format(
md_tree, tvb, offset, msgMetadata.encryption_param().size(), ett_pulsar, nullptr,
"EncryptionParam / %s", msgMetadata.encryption_algo().c_str());
for (int i = 0; i < msgMetadata.encryption_keys().size(); i++) {
const auto& encryption_key = msgMetadata.encryption_keys(i);
proto_tree_add_string_format(
encryption_keys_tree, hf_pulsar_encryption_keys, tvb, offset, metadataSize, "", "%s : %s",
encryption_key.has_key() ? encryption_key.key().c_str() : "<none>",
encryption_key.has_value() ? encryption_key.value().c_str() : "<none>");
}
}
// Properties
if (msgMetadata.properties_size() > 0) {
proto_item* properties_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
ett_pulsar, nullptr, "Properties");
for (int i = 0; i < msgMetadata.properties_size(); i++) {
const KeyValue& kv = msgMetadata.properties(i);
proto_tree_add_string_format(properties_tree, hf_pulsar_property, tvb, offset, metadataSize, "",
"%s : %s", kv.key().c_str(), kv.value().c_str());
}
}
// Replication
if (msgMetadata.has_replicated_from()) {
proto_tree_add_string(md_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
msgMetadata.replicated_from().c_str());
}
if (msgMetadata.replicate_to_size() > 0) {
proto_item* replicate_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
ett_pulsar, nullptr, "Replicate to");
for (int i = 0; i < msgMetadata.replicate_to_size(); i++) {
proto_tree_add_string_format(replicate_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
"", "%s", msgMetadata.replicate_to(i).c_str());
}
}
// Transaction
if (msgMetadata.has_txnid_least_bits()) {
proto_tree_add_uint64(md_tree, hf_pulsar_txnid_least_bits, tvb, offset, metadataSize,
msgMetadata.txnid_least_bits());
}
if (msgMetadata.has_txnid_most_bits()) {
proto_tree_add_uint64(md_tree, hf_pulsar_txnid_most_bits, tvb, offset, metadataSize,
msgMetadata.txnid_most_bits());
}
// Payloads
offset += metadataSize;
uint32_t payloadSize = maxOffset - offset;
proto_tree_add_subtree_format(md_tree, tvb, offset, payloadSize, ett_pulsar, nullptr, "Payload / size=%u",
payloadSize);
}
void link_to_request_frame(proto_tree* cmd_tree, tvbuff_t* tvb, int offset, int size,
const RequestData& reqData) {
if (reqData.requestFrame != UINT32_MAX) {
proto_tree* item =
proto_tree_add_uint(cmd_tree, hf_pulsar_request_in, tvb, offset, size, reqData.requestFrame);
PROTO_ITEM_SET_GENERATED(item);
nstime_t latency;
nstime_delta(&latency, &reqData.ackTimestamp, &reqData.requestTimestamp);
item = proto_tree_add_time(cmd_tree, hf_pulsar_publish_latency, tvb, offset, size, &latency);
PROTO_ITEM_SET_GENERATED(item);
}
}
void link_to_response_frame(proto_tree* cmd_tree, tvbuff_t* tvb, int offset, int size,
const RequestData& reqData) {
if (reqData.ackFrame != UINT32_MAX) {
proto_tree* item =
proto_tree_add_uint(cmd_tree, hf_pulsar_response_in, tvb, offset, size, reqData.ackFrame);
PROTO_ITEM_SET_GENERATED(item);
nstime_t latency;
nstime_delta(&latency, &reqData.ackTimestamp, &reqData.requestTimestamp);
item = proto_tree_add_time(cmd_tree, hf_pulsar_publish_latency, tvb, offset, size, &latency);
PROTO_ITEM_SET_GENERATED(item);
}
}
//////////
/* This method dissects fully reassembled messages */
static int dissect_pulsar_message(tvbuff_t* tvb, packet_info* pinfo, proto_tree* tree, void* data _U_) {
uint32_t offset = FRAME_SIZE_LEN;
int maxOffset = tvb_captured_length(tvb);
auto cmdSize = (uint32_t)tvb_get_ntohl(tvb, offset);
offset += 4;
if (offset + cmdSize > maxOffset) {
// Not enough data to dissect
#ifdef DEBUG
proto_tree_add_debug_text(tree, "[Debug] Not enough data to dissect command");
#endif
return maxOffset;
}
col_set_str(pinfo->cinfo, COL_PROTOCOL, "Pulsar");
conversation_t* conversation = find_or_create_conversation(pinfo);
auto state = (ConnectionState*)conversation_get_proto_data(conversation, proto_pulsar);
if (state == nullptr) {
state = new ConnectionState();
conversation_add_proto_data(conversation, proto_pulsar, state);
}
auto ptr = (uint8_t*)tvb_get_ptr(tvb, offset, cmdSize);
if (!command.ParseFromArray(ptr, cmdSize)) {
proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true,
"Error parsing protocol buffer command");
return maxOffset;
}
int cmdOffset = offset;
offset += cmdSize;
col_add_str(pinfo->cinfo, COL_INFO, to_str(command.type(), pulsar_cmd_names));
proto_item* frame_tree = nullptr;
proto_item* cmd_tree = nullptr;
if (tree) { /* we are being asked for details */
proto_item* ti = proto_tree_add_item(tree, proto_pulsar, tvb, 0, -1, ENC_NA);
frame_tree = proto_item_add_subtree(ti, ett_pulsar);
proto_tree_add_item(frame_tree, hf_pulsar_frame_size, tvb, 0, 4, ENC_BIG_ENDIAN);
proto_tree_add_item(frame_tree, hf_pulsar_cmd_size, tvb, 4, 4, ENC_BIG_ENDIAN);
cmd_tree = proto_tree_add_subtree_format(frame_tree, tvb, 8, cmdSize, ett_pulsar, nullptr,
"Command %s", to_str(command.type(), pulsar_cmd_names));
proto_tree_add_string(cmd_tree, hf_pulsar_cmd_type, tvb, 8, cmdSize,
to_str(command.type(), pulsar_cmd_names));
}
switch (command.type()) {
case BaseCommand::CONNECT: {
const CommandConnect& connect = command.connect();
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_client_version, tvb, cmdOffset, cmdSize,
connect.client_version().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
to_str(connect.protocol_version(), protocol_version_vs));
proto_tree_add_string(cmd_tree, hf_pulsar_auth_method, tvb, cmdOffset, cmdSize,
to_str(connect.auth_method(), auth_methods_vs));
if (connect.has_auth_data()) {
proto_tree_add_string(cmd_tree, hf_pulsar_auth_data, tvb, cmdOffset, cmdSize,
connect.auth_data().c_str());
}
}
break;
}
case BaseCommand::CONNECTED: {
const CommandConnected& connected = command.connected();
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_server_version, tvb, cmdOffset, cmdSize,
connected.server_version().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
to_str(connected.protocol_version(), protocol_version_vs));
}
break;
}
case BaseCommand::SUBSCRIBE: {
const CommandSubscribe& subscribe = command.subscribe();
RequestData& reqData = state->requests[subscribe.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[subscribe.consumer_id()];
consumerData.topic = subscribe.topic();
consumerData.subscriptionName = subscribe.subscription();
consumerData.consumerName = subscribe.consumer_name();
consumerData.subType = subscribe.subtype();
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(subscribe.subtype(), sub_type_names_vs), subscribe.topic().c_str(),
subscribe.subscription().c_str(), subscribe.consumer_name().c_str());
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
subscribe.topic().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
subscribe.subscription().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(subscribe.subtype(), sub_type_names_vs));
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
subscribe.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
subscribe.request_id());
proto_tree_add_string(
cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
subscribe.has_consumer_name() ? subscribe.consumer_name().c_str() : "<none>");
}
break;
}
case BaseCommand::PRODUCER: {
const CommandProducer& producer = command.producer();
RequestResponseData& reqData = state->requests[producer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
reqData.id = producer.producer_id();
state->producers[producer.producer_id()].topic = producer.topic();
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producer.topic().c_str());
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producer.topic().c_str());
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
producer.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
producer.request_id());
proto_tree_add_string(
cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producer.has_producer_name() ? producer.producer_name().c_str() : "<none>");
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SEND: {
const CommandSend& send = command.send();
RequestData& reqData = state->producers[send.producer_id()].messages[send.sequence_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
producerData.producerName.c_str(), send.sequence_id());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send.sequence_id());
// Decode message metadata
dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SEND_RECEIPT: {
const CommandSendReceipt& send_receipt = command.send_receipt();
RequestData& reqData =
state->producers[send_receipt.producer_id()].messages[send_receipt.sequence_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send_receipt.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
producerData.producerName.c_str(), send_receipt.sequence_id());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send_receipt.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send_receipt.sequence_id());
if (send_receipt.has_message_id()) {
const MessageIdData& messageId = send_receipt.message_id();
proto_tree_add_string_format(cmd_tree, hf_pulsar_message_id, tvb, cmdOffset, cmdSize, "",
"Message Id: %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER
"u",
messageId.ledgerid(), messageId.entryid());
}
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SEND_ERROR: {
const CommandSendError& send_error = command.send_error();
RequestData& reqData =
state->producers[send_error.producer_id()].messages[send_error.sequence_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send_error.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
producerData.producerName.c_str(), send_error.sequence_id());
if (tree) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, true,
"Error in sending operation");
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send_error.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send_error.sequence_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
to_str(send_error.error(), server_errors_vs));
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::MESSAGE: {
const CommandMessage& message = command.message();
state->consumers[message.consumer_id()].messages[message.message_id()];
RequestData& reqData = state->consumers[message.consumer_id()].messages[message.message_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
const ConsumerData& consumerData = state->consumers[message.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER "u",
consumerData.consumerName.c_str(), message.message_id().ledgerid(),
message.message_id().entryid());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
message.consumer_id());
dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::ACK: {
const CommandAck& ack = command.ack();
RequestData& reqData = state->consumers[ack.consumer_id()].messages[ack.message_id().Get(0)];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
const ConsumerData& consumerData = state->consumers[ack.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER "u",
consumerData.consumerName.c_str(), ack.message_id().Get(0).ledgerid(),
ack.message_id().Get(0).entryid());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
ack.consumer_id());
proto_tree_add_string(cmd_tree, hf_pulsar_ack_type, tvb, cmdOffset, cmdSize,
to_str(ack.ack_type(), ack_type_vs));
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::FLOW: {
const CommandFlow& flow = command.flow();
const ConsumerData& consumerData = state->consumers[flow.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %d", consumerData.consumerName.c_str(),
flow.messagepermits());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
flow.consumer_id());
proto_tree_add_uint(cmd_tree, hf_pulsar_message_permits, tvb, cmdOffset, cmdSize,
flow.messagepermits());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
}
break;
}
case BaseCommand::UNSUBSCRIBE: {
const CommandUnsubscribe& unsubscribe = command.unsubscribe();
RequestData& reqData = state->requests[unsubscribe.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[unsubscribe.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(consumerData.subType, sub_type_names_vs), consumerData.topic.c_str(),
consumerData.subscriptionName.c_str(), consumerData.consumerName.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
unsubscribe.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
unsubscribe.request_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
consumerData.subscriptionName.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(consumerData.subType, sub_type_names_vs));
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SUCCESS: {
const CommandSuccess& success = command.success();
RequestResponseData& reqData = state->requests[success.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
success.request_id());
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::ERROR: {
const CommandError& error = command.error();
RequestResponseData& reqData = state->requests[error.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
if (tree) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, true,
"Request failed");
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
error.request_id());
proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
to_str(error.error(), server_errors_vs));
proto_tree_add_string(cmd_tree, hf_pulsar_error_message, tvb, cmdOffset, cmdSize,
error.message().c_str());
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::CLOSE_PRODUCER: {
const CommandCloseProducer& close_producer = command.close_producer();
RequestData& reqData = state->requests[close_producer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[close_producer.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producerData.topic.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
close_producer.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
close_producer.request_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::CLOSE_CONSUMER: {
const CommandCloseConsumer& close_consumer = command.close_consumer();
RequestData& reqData = state->requests[close_consumer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[close_consumer.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(consumerData.subType, sub_type_names_vs), consumerData.topic.c_str(),
consumerData.subscriptionName.c_str(), consumerData.consumerName.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
close_consumer.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
close_consumer.request_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
consumerData.subscriptionName.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(consumerData.subType, sub_type_names_vs));
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::PRODUCER_SUCCESS: {
const CommandProducerSuccess& success = command.producer_success();
RequestResponseData& reqData = state->requests[success.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
uint64_t producerId = reqData.id;
ProducerData& producerData = state->producers[producerId];
producerData.producerName = success.producer_name();
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
success.request_id());
proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
success.producer_name().c_str());
auto item = proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
producerId);
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::PING:
break;
case BaseCommand::PONG:
break;
case BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES:
break;
case BaseCommand::PARTITIONED_METADATA:
break;
case BaseCommand::PARTITIONED_METADATA_RESPONSE:
break;
case BaseCommand::LOOKUP:
break;
case BaseCommand::LOOKUP_RESPONSE:
break;
case BaseCommand::CONSUMER_STATS:
break;
case BaseCommand::CONSUMER_STATS_RESPONSE:
break;
case BaseCommand::REACHED_END_OF_TOPIC:
break;
case BaseCommand::SEEK:
break;
case BaseCommand::GET_LAST_MESSAGE_ID:
break;
case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
break;
case BaseCommand::ACTIVE_CONSUMER_CHANGE:
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE:
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
break;
case BaseCommand::GET_SCHEMA:
break;
case BaseCommand::GET_SCHEMA_RESPONSE:
break;
case BaseCommand::AUTH_CHALLENGE:
break;
case BaseCommand::AUTH_RESPONSE:
break;
case BaseCommand::ACK_RESPONSE:
break;
case BaseCommand::GET_OR_CREATE_SCHEMA:
break;
case BaseCommand::GET_OR_CREATE_SCHEMA_RESPONSE:
break;
case BaseCommand::NEW_TXN:
break;
case BaseCommand::NEW_TXN_RESPONSE:
break;
case BaseCommand::ADD_PARTITION_TO_TXN:
break;
case BaseCommand::ADD_PARTITION_TO_TXN_RESPONSE:
break;
case BaseCommand::ADD_SUBSCRIPTION_TO_TXN:
break;
case BaseCommand::ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
break;
case BaseCommand::END_TXN:
break;
case BaseCommand::END_TXN_RESPONSE:
break;
case BaseCommand::END_TXN_ON_PARTITION:
break;
case BaseCommand::END_TXN_ON_PARTITION_RESPONSE:
break;
case BaseCommand::END_TXN_ON_SUBSCRIPTION:
break;
case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
break;
case BaseCommand::TC_CLIENT_CONNECT_REQUEST:
break;
case BaseCommand::TC_CLIENT_CONNECT_RESPONSE:
break;
case BaseCommand::WATCH_TOPIC_LIST:
break;
case BaseCommand::WATCH_TOPIC_LIST_SUCCESS:
break;
case BaseCommand::WATCH_TOPIC_UPDATE:
break;
case BaseCommand::WATCH_TOPIC_LIST_CLOSE:
break;
}
return maxOffset;
}
/* determine PDU length of protocol Pulsar */
static uint32_t get_pulsar_message_len(packet_info* pinfo _U_, tvbuff_t* tvb, int offset, void* data _U_) {
auto len = (uint32_t)tvb_get_ntohl(tvb, offset);
return FRAME_SIZE_LEN + len;
}
static int dissect_pulsar(tvbuff_t* tvb, packet_info* pinfo, proto_tree* tree, void* data _U_) {
tcp_dissect_pdus(tvb, pinfo, tree, 1, FRAME_SIZE_LEN, get_pulsar_message_len, dissect_pulsar_message,
data);
return tvb_captured_length(tvb);
}
static hf_register_info hf[] = {
//
{&hf_pulsar_error, {"Error", "apache.pulsar.error", FT_BOOLEAN, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_error_message,
{"Message", "apache.pulsar.error_message", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_cmd_type,
{"Command Type", "apache.pulsar.cmd.type", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_frame_size,
{"Frame size", "apache.pulsar.frame_size", FT_UINT32, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_cmd_size,
{"Command size", "apache.pulsar.cmd_size", FT_UINT32, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_client_version,
{"Client version", "apache.pulsar.client_version", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_auth_method,
{"Auth method", "apache.pulsar.auth_method", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_auth_data,
{"Auth data", "apache.pulsar.auth_data", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_protocol_version,
{"Protocol version", "apache.pulsar.protocol_version", FT_STRING, 0, NULL, 0x0, NULL, HFILL}},
{&hf_pulsar_server_version,
{"Server version", "apache.pulsar.server_version", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_topic, {"Topic", "apache.pulsar.topic", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_subscription,
{"Subscription", "apache.pulsar.subscription", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_subType,
{"Subscription type:", "apache.pulsar.sub_type", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_consumer_id,
{"Consumer Id", "apache.pulsar.consumer_id", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_producer_id,
{"Producer Id", "apache.pulsar.producer_id", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_server_error,
{"Server error", "apache.pulsar.server_error", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_ack_type, {"Ack type", "apache.pulsar.ack_type", FT_STRING, 0, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_request_id,
{"Request Id", "apache.pulsar.request_id", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_consumer_name,
{"Consumer Name", "apache.pulsar.consumer_name", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_producer_name,
{"Producer Name", "apache.pulsar.producer_name", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_sequence_id,
{"Sequence Id", "apache.pulsar.sequence_id", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_highest_sequence_id,
{"Highest Sequence Id", "apache.pulsar.highest_sequence_id", FT_UINT64, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_uuid, {"UUID", "apache.pulsar.uuid", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_message_id,
{"Message Id", "apache.pulsar.message_id", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_message_permits,
{"Message Permits", "apache.pulsar.message_permits", FT_UINT32, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_publish_time,
{"Publish Time", "apache.pulsar.publish_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_deliver_at_time,
{"Deliver At Time", "apache.pulsar.deliver_at_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_event_time,
{"Event Time", "apache.pulsar.event_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_deliver_after_time,
{"Deliver After Time", "apache.pulsar.deliver_after_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_chunk_id,
{"Chunk Id", "apache.pulsar.chunk_id", FT_UINT32, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_num_chunks_from_msg,
{"Num Chunks From Msg", "apache.pulsar.num_chunks_from_msg", FT_UINT32, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_compression_type,
{"Compression Type", "apache.pulsar.compression_type", FT_STRING, BASE_NONE, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_uncompressed_size,
{"UnCompression Size", "apache.pulsar.uncompressed_size", FT_UINT32, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_replicated_from,
{"Replicated from", "apache.pulsar.replicated_from", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_partition_key,
{"Partition Key", "apache.pulsar.partition_key", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_ordering_key,
{"Ordering Key", "apache.pulsar.ordering_key", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_encryption_algo,
{"Encryption Algo", "apache.pulsar.encryption_algo", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_encryption_param,
{"Encryption Param", "apache.pulsar.encryption_param", FT_STRING, BASE_NONE, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_replicate_to,
{"Replicate to", "apache.pulsar.replicate_to", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_property,
{"Property", "apache.pulsar.property", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_encryption_keys,
{"Encryption Keys", "apache.pulsar.encryption_keys", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_txnid_least_bits,
{"TxnId Least Bits", "apache.pulsar.txnid_least_bits", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_txnid_most_bits,
{"TxnId Most Bits", "apache.pulsar.txnid_most_bits", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_request_in,
{"Request in frame", "apache.pulsar.request_in", FT_FRAMENUM, BASE_NONE, NULL, 0,
"This packet is a response to the packet with this number", HFILL}}, //
{&hf_pulsar_response_in,
{"Response in frame", "apache.pulsar.response_in", FT_FRAMENUM, BASE_NONE, NULL, 0,
"This packet will be responded in the packet with this number", HFILL}}, //
{&hf_pulsar_publish_latency,
{"Latency", "apache.pulsar.publish_latency", FT_RELATIVE_TIME, BASE_NONE, NULL, 0x0,
"How long time it took to ACK message", HFILL}},
};
////////////////
///
void proto_register_pulsar() {
// register the new protocol, protocol fields, and subtrees
static dissector_handle_t pulsar_handle;
proto_pulsar = proto_register_protocol("Pulsar Wire Protocol", /* name */
"Apache Pulsar", /* short name */
"apache.pulsar" /* abbrev */
);
/* Setup protocol subtree array */
static int* ett[] = {&ett_pulsar};
proto_register_field_array(proto_pulsar, hf, array_length(hf));
proto_register_subtree_array(ett, array_length(ett));
pulsar_handle = create_dissector_handle(&dissect_pulsar, proto_pulsar);
dissector_add_uint("tcp.port", PULSAR_PORT, pulsar_handle);
register_postdissector(pulsar_handle);
}
extern "C" {
extern __attribute__((unused)) WS_DLL_PUBLIC_DEF const gchar plugin_version[] = VERSION;
extern __attribute__((unused)) WS_DLL_PUBLIC_DEF const int plugin_want_major = kWiresharkMajorVersion;
extern __attribute__((unused)) WS_DLL_PUBLIC_DEF const int plugin_want_minor = kWiresharkMinorVersion;
WS_DLL_PUBLIC void plugin_register(void);
__attribute__((unused)) static void proto_reg_handoff_pulsar(void) {}
/* Start the functions we need for the plugin stuff */
void plugin_register(void) {
static proto_plugin plug;
plug.register_protoinfo = proto_register_pulsar;
plug.register_handoff = proto_reg_handoff_pulsar; /* or nullptr */
proto_register_plugin(&plug);
}
}