blob: 6541278d9648c5b40a73fcc9515a3f200ced46bf [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <config.h>
#include "moduleinfo.h"
#include <epan/expert.h>
#include <epan/packet.h>
#include <epan/prefs.h>
#include <epan/proto.h>
#include <epan/column-utils.h>
#include <epan/dissectors/packet-tcp.h>
#include <epan/value_string.h>
#include <wsutil/nstime.h>
#include <glib.h>
#include <gmodule.h>
#include "PulsarApi.pb.h"
const static int PULSAR_PORT = 6650;
static int proto_pulsar = -1;
static int hf_pulsar_error = -1;
static int hf_pulsar_error_message = -1;
static int hf_pulsar_cmd_type = -1;
static int hf_pulsar_frame_size = -1;
static int hf_pulsar_cmd_size = -1;
static int hf_pulsar_client_version = -1;
static int hf_pulsar_auth_method = -1;
static int hf_pulsar_auth_data = -1;
static int hf_pulsar_protocol_version = -1;
static int hf_pulsar_server_version = -1;
static int hf_pulsar_topic = -1;
static int hf_pulsar_subscription = -1;
static int hf_pulsar_subType = -1;
static int hf_pulsar_consumer_id = -1;
static int hf_pulsar_producer_id = -1;
static int hf_pulsar_server_error = -1;
static int hf_pulsar_ack_type = -1;
static int hf_pulsar_request_id = -1;
static int hf_pulsar_consumer_name = -1;
static int hf_pulsar_producer_name = -1;
static int hf_pulsar_publish_time = -1;
static int hf_pulsar_replicated_from = -1;
static int hf_pulsar_partition_key = -1;
static int hf_pulsar_replicate_to = -1;
static int hf_pulsar_property = -1;
static int hf_pulsar_request_in = -1;
static int hf_pulsar_response_in = -1;
static int hf_pulsar_publish_latency = -1;
static int hf_pulsar_sequence_id = -1;
static int hf_pulsar_message_id = -1;
static int hf_pulsar_message_permits = -1;
static int ett_pulsar = -1;
const static int FRAME_SIZE_LEN = 4;
static pulsar::proto::BaseCommand command;
using namespace pulsar::proto;
static const value_string pulsar_cmd_names[] = { //
{ BaseCommand::CONNECT, "Connect" }, //
{ BaseCommand::CONNECTED, "Connected" }, //
{ BaseCommand::SUBSCRIBE, "Subscribe" }, //
{ BaseCommand::PRODUCER, "Producer" }, //
{ BaseCommand::SEND, "Send" }, //
{ BaseCommand::SEND_RECEIPT, "SendReceipt" }, //
{ BaseCommand::SEND_ERROR, "SendError" }, //
{ BaseCommand::MESSAGE, "Message" }, //
{ BaseCommand::ACK, "Ack" }, //
{ BaseCommand::FLOW, "Flow" }, //
{ BaseCommand::UNSUBSCRIBE, "Unsubscribe" }, //
{ BaseCommand::SUCCESS, "Success" }, //
{ BaseCommand::ERROR, "Error" }, //
{ BaseCommand::CLOSE_PRODUCER, "CloseProducer" }, //
{ BaseCommand::CLOSE_CONSUMER, "CloseConsumer" }, //
{ BaseCommand::PRODUCER_SUCCESS, "ProducerSuccess" }, //
{ BaseCommand::PING, "Ping" }, //
{ BaseCommand::PONG, "Pong" }, //
};
static const value_string auth_methods_vs[] = { { AuthMethodNone, "None" }, //
{ AuthMethodYcaV1, "YCAv1" }, //
{ AuthMethodAthens, "Athens" } //
};
static const value_string server_errors_vs[] = { { UnknownError, "UnknownError" }, //
{ MetadataError, "MetadataError" }, //
{ PersistenceError, "PersistenceError" }, //
{ AuthenticationError, "AuthenticationError" }, //
{ AuthorizationError, "AuthorizationError" }, //
{ ConsumerBusy, "ConsumerBusy" }, //
{ ServiceNotReady, "ServiceNotReady" }, //
{ ProducerBlockedQuotaExceededError, "ProducerBlockedQuotaExceededError" }, //
{ ProducerBlockedQuotaExceededException, "ProducerBlockedQuotaExceededException" } //
};
static const value_string ack_type_vs[] = { { CommandAck::Individual, "Individual" }, //
{ CommandAck::Cumulative, "Cumulative" } //
};
static const value_string protocol_version_vs[] = { { v0, "v0" }, //
{ v1, "v1" } //
};
static const value_string sub_type_names_vs[] = { //
{ CommandSubscribe::Exclusive, "Exclusive" }, //
{ CommandSubscribe::Shared, "Shared" }, //
{ CommandSubscribe::Failover, "Failover" } //
};
static const char* to_str(int value, const value_string* values) {
return val_to_str(value, values, "Unknown (%d)");
}
struct MessageIdComparator {
bool operator()(const MessageIdData& a, const MessageIdData& b) const {
if (a.ledgerid() < b.ledgerid()) {
return true;
} else if (a.ledgerid() == b.ledgerid()) {
return a.entryid() < b.entryid();
} else {
return false;
}
}
};
struct RequestData {
uint32_t requestFrame;
nstime_t requestTimestamp;
uint32_t ackFrame;
nstime_t ackTimestamp;
RequestData()
: requestFrame(UINT32_MAX),
ackFrame(UINT32_MAX) {
}
};
struct RequestResponseData : public RequestData {
uint64_t id; // producer / consumer id
};
struct ProducerData {
std::string topic;
std::string producerName;
std::map<uint64_t, RequestData> messages;
};
struct ConsumerData {
std::string topic;
std::string subscriptionName;
std::string consumerName;
CommandSubscribe::SubType subType;
// Link messages to acks for the consumer
std::map<MessageIdData, RequestData, MessageIdComparator> messages;
};
struct ConnectionState {
std::map<uint64_t, ProducerData> producers;
std::map<uint64_t, ConsumerData> consumers;
std::map<uint64_t, RequestResponseData> requests;
};
static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t *tvb, int offset,
int maxOffset) {
// Decode message metadata
uint32_t metadataSize = (uint32_t) tvb_get_ntohl(tvb, offset);
offset += 4;
if (offset + metadataSize > maxOffset) {
// Not enough data to dissect metadata
proto_tree_add_debug_text(frame_tree, "[Not enough data to dissect message metadata]");
return;
}
static MessageMetadata msgMetadata;
uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, metadataSize);
if (!msgMetadata.ParseFromArray(ptr, metadataSize)) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, offset, metadataSize, true,
"Error parsing protocol buffer message metadata");
return;
}
proto_item* md_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize,
ett_pulsar,
NULL,
"Message / %s / %llu",
msgMetadata.producer_name().c_str(),
msgMetadata.sequence_id());
proto_tree_add_string(md_tree, hf_pulsar_producer_name, tvb, offset, metadataSize,
msgMetadata.producer_name().c_str());
proto_tree_add_uint64(md_tree, hf_pulsar_sequence_id, tvb, offset, metadataSize,
msgMetadata.sequence_id());
proto_tree_add_uint64(md_tree, hf_pulsar_publish_time, tvb, offset, metadataSize,
msgMetadata.publish_time());
if (msgMetadata.has_replicated_from()) {
proto_tree_add_string(md_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
msgMetadata.replicated_from().c_str());
}
if (msgMetadata.properties_size() > 0) {
proto_item* properties_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset,
metadataSize, ett_pulsar,
NULL,
"Properties");
for (int i = 0; i < msgMetadata.properties_size(); i++) {
const KeyValue& kv = msgMetadata.properties(i);
proto_tree_add_string_format(properties_tree, hf_pulsar_property, tvb, offset,
metadataSize, "", "%s : %s", kv.key().c_str(),
kv.value().c_str());
}
}
if (msgMetadata.replicate_to_size() > 0) {
proto_item* replicate_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset,
metadataSize, ett_pulsar,
NULL,
"Replicate to");
for (int i = 0; i < msgMetadata.replicate_to_size(); i++) {
proto_tree_add_string_format(replicate_tree, hf_pulsar_replicated_from, tvb, offset,
metadataSize, "", "%s",
msgMetadata.replicate_to(i).c_str());
}
}
if (msgMetadata.has_partition_key()) {
proto_tree_add_string(md_tree, hf_pulsar_partition_key, tvb, offset, metadataSize,
msgMetadata.partition_key().c_str());
}
offset += metadataSize;
uint32_t payloadSize = maxOffset - offset;
proto_tree_add_subtree_format(md_tree, tvb, offset, payloadSize, ett_pulsar, NULL,
"Payload / size=%u", payloadSize);
}
void link_to_request_frame(proto_tree* cmd_tree, tvbuff_t* tvb, int offset, int size,
const RequestData& reqData) {
if (reqData.requestFrame != UINT32_MAX) {
proto_tree* item = proto_tree_add_uint(cmd_tree, hf_pulsar_request_in, tvb, offset, size,
reqData.requestFrame);
PROTO_ITEM_SET_GENERATED(item);
nstime_t latency;
nstime_delta(&latency, &reqData.ackTimestamp, &reqData.requestTimestamp);
item = proto_tree_add_time(cmd_tree, hf_pulsar_publish_latency, tvb, offset, size, &latency);
PROTO_ITEM_SET_GENERATED(item);
}
}
void link_to_response_frame(proto_tree* cmd_tree, tvbuff_t* tvb, int offset, int size,
const RequestData& reqData) {
if (reqData.ackFrame != UINT32_MAX) {
proto_tree* item = proto_tree_add_uint(cmd_tree, hf_pulsar_response_in, tvb, offset, size,
reqData.ackFrame);
PROTO_ITEM_SET_GENERATED(item);
nstime_t latency;
nstime_delta(&latency, &reqData.ackTimestamp, &reqData.requestTimestamp);
item = proto_tree_add_time(cmd_tree, hf_pulsar_publish_latency, tvb, offset, size, &latency);
PROTO_ITEM_SET_GENERATED(item);
}
}
//////////
/* This method dissects fully reassembled messages */
static int dissect_pulsar_message(tvbuff_t *tvb, packet_info* pinfo, proto_tree* tree,
void* data _U_) {
col_set_str(pinfo->cinfo, COL_PROTOCOL, "yahoo-Pulsar");
conversation_t* conversation = find_or_create_conversation(pinfo);
ConnectionState* state = (ConnectionState*) conversation_get_proto_data(conversation,
proto_pulsar);
if (state == NULL) {
state = new ConnectionState();
conversation_add_proto_data(conversation, proto_pulsar, state);
}
uint32_t offset = FRAME_SIZE_LEN;
int maxOffset = tvb_captured_length(tvb);
uint32_t cmdSize = (uint32_t) tvb_get_ntohl(tvb, offset);
offset += 4;
if (offset + cmdSize > maxOffset) {
// Not enough data to dissect
proto_tree_add_debug_text(tree, "[Not enough data to dissect command]");
return maxOffset;
}
uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, cmdSize);
if (!command.ParseFromArray(ptr, cmdSize)) {
proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true,
"Error parsing protocol buffer command");
return maxOffset;
}
int cmdOffset = offset;
offset += cmdSize;
col_add_str(pinfo->cinfo, COL_INFO,
val_to_str_const(command.type(), pulsar_cmd_names, "Unknown (%d)"));
proto_item* frame_tree = NULL;
proto_item* cmd_tree = NULL;
if (tree) { /* we are being asked for details */
proto_item *ti = proto_tree_add_item(tree, proto_pulsar, tvb, 0, -1, ENC_NA);
frame_tree = proto_item_add_subtree(ti, ett_pulsar);
proto_tree_add_item(frame_tree, hf_pulsar_frame_size, tvb, 0, 4, ENC_BIG_ENDIAN);
proto_tree_add_item(frame_tree, hf_pulsar_cmd_size, tvb, 4, 4, ENC_BIG_ENDIAN);
cmd_tree = proto_tree_add_subtree_format(frame_tree, tvb, 8, cmdSize, ett_pulsar,
NULL,
"Command %s",
to_str(command.type(), pulsar_cmd_names));
proto_tree_add_string(cmd_tree, hf_pulsar_cmd_type, tvb, 8, cmdSize,
to_str(command.type(), pulsar_cmd_names));
}
proto_tree* item = NULL;
switch (command.type()) {
case BaseCommand::CONNECT: {
const CommandConnect& connect = command.connect();
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_client_version, tvb, cmdOffset, cmdSize,
connect.client_version().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
to_str(connect.protocol_version(), protocol_version_vs));
proto_tree_add_string(cmd_tree, hf_pulsar_auth_method, tvb, cmdOffset, cmdSize,
to_str(connect.auth_method(), auth_methods_vs));
if (connect.has_auth_data()) {
proto_tree_add_string(cmd_tree, hf_pulsar_auth_data, tvb, cmdOffset, cmdSize,
connect.auth_data().c_str());
}
}
break;
}
case BaseCommand::CONNECTED: {
const CommandConnected& connected = command.connected();
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_server_version, tvb, cmdOffset, cmdSize,
connected.server_version().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
to_str(connected.protocol_version(), protocol_version_vs));
}
break;
}
case BaseCommand::SUBSCRIBE: {
const CommandSubscribe& subscribe = command.subscribe();
RequestData& reqData = state->requests[subscribe.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[subscribe.consumer_id()];
consumerData.topic = subscribe.topic();
consumerData.subscriptionName = subscribe.subscription();
consumerData.consumerName = subscribe.consumer_name();
consumerData.subType = subscribe.subtype();
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(subscribe.subtype(), sub_type_names_vs),
subscribe.topic().c_str(), subscribe.subscription().c_str(),
subscribe.consumer_name().c_str());
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
subscribe.topic().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
subscribe.subscription().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(subscribe.subtype(), sub_type_names_vs));
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
subscribe.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
subscribe.request_id());
proto_tree_add_string(
cmd_tree,
hf_pulsar_consumer_name,
tvb,
cmdOffset,
cmdSize,
subscribe.has_consumer_name() ?
subscribe.consumer_name().c_str() : "<none>");
}
break;
}
case BaseCommand::PRODUCER: {
const CommandProducer& producer = command.producer();
RequestResponseData& reqData = state->requests[producer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
reqData.id = producer.producer_id();
state->producers[producer.producer_id()].topic = producer.topic();
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producer.topic().c_str());
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producer.topic().c_str());
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
producer.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
producer.request_id());
proto_tree_add_string(
cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producer.has_producer_name() ? producer.producer_name().c_str() : "<none>");
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SEND: {
const CommandSend& send = command.send();
RequestData& data = state->producers[send.producer_id()].messages[send.sequence_id()];
data.requestFrame = pinfo->fd->num;
data.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
data.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu",
producerData.producerName.c_str(), send.sequence_id());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send.sequence_id());
// Decode message metadata
dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);
item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset,
cmdSize, producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, data);
}
break;
}
case BaseCommand::SEND_RECEIPT: {
const CommandSendReceipt& send_receipt = command.send_receipt();
RequestData & data = state->producers[send_receipt.producer_id()].messages[send_receipt
.sequence_id()];
data.ackFrame = pinfo->fd->num;
data.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
data.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send_receipt.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu",
producerData.producerName.c_str(), send_receipt.sequence_id());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send_receipt.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send_receipt.sequence_id());
if (send_receipt.has_message_id()) {
const MessageIdData& messageId = send_receipt.message_id();
proto_tree_add_string_format(cmd_tree, hf_pulsar_message_id, tvb, cmdOffset,
cmdSize, "", "Message Id: %llu:%llu",
messageId.ledgerid(), messageId.entryid());
}
item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset,
cmdSize, producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, data);
}
break;
}
case BaseCommand::SEND_ERROR: {
const CommandSendError& send_error = command.send_error();
RequestData & reqData = state->producers[send_error.producer_id()].messages[send_error
.sequence_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send_error.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu",
producerData.producerName.c_str(), send_error.sequence_id());
if (tree) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize,
true, "Error in sending operation");
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send_error.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send_error.sequence_id());
item = proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
to_str(send_error.error(), server_errors_vs));
item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset,
cmdSize, producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::MESSAGE: {
const CommandMessage& message = command.message();
RequestData& data =
state->consumers[message.consumer_id()].messages[message.message_id()];
data.requestFrame = pinfo->fd->num;
data.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
data.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
const ConsumerData& consumerData = state->consumers[message.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu:%llu",
consumerData.consumerName.c_str(), message.message_id().ledgerid(),
message.message_id().entryid());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
message.consumer_id());
dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);
item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset,
cmdSize, consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, data);
}
break;
}
case BaseCommand::ACK: {
const CommandAck& ack = command.ack();
RequestData& data = state->consumers[ack.consumer_id()].messages[ack.message_id()];
data.ackFrame = pinfo->fd->num;
data.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
data.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
const ConsumerData& consumerData = state->consumers[ack.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu:%llu",
consumerData.consumerName.c_str(), ack.message_id().ledgerid(),
ack.message_id().entryid());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
ack.consumer_id());
proto_tree_add_string(cmd_tree, hf_pulsar_ack_type, tvb, cmdOffset, cmdSize,
to_str(ack.ack_type(), ack_type_vs));
item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset,
cmdSize, consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, data);
}
break;
}
case BaseCommand::FLOW: {
const CommandFlow& flow = command.flow();
const ConsumerData& consumerData = state->consumers[flow.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %d", consumerData.consumerName.c_str(),
flow.messagepermits());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
flow.consumer_id());
proto_tree_add_uint(cmd_tree, hf_pulsar_message_permits, tvb, cmdOffset, cmdSize,
flow.messagepermits());
item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset,
cmdSize, consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
}
break;
}
case BaseCommand::UNSUBSCRIBE: {
const CommandUnsubscribe& unsubscribe = command.unsubscribe();
RequestData& reqData = state->requests[unsubscribe.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[unsubscribe.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(consumerData.subType, sub_type_names_vs),
consumerData.topic.c_str(), consumerData.subscriptionName.c_str(),
consumerData.consumerName.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
unsubscribe.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
unsubscribe.request_id());
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
consumerData.subscriptionName.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(consumerData.subType, sub_type_names_vs));
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SUCCESS: {
const CommandSuccess& success = command.success();
RequestResponseData & reqData = state->requests[success.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
success.request_id());
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::ERROR: {
const CommandError& error = command.error();
RequestResponseData & reqData = state->requests[error.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
if (tree) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize,
true, "Request failed");
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
error.request_id());
proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
to_str(error.error(), server_errors_vs));
proto_tree_add_string(cmd_tree, hf_pulsar_error_message, tvb, cmdOffset, cmdSize,
error.message().c_str());
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::CLOSE_PRODUCER: {
const CommandCloseProducer& close_producer = command.close_producer();
RequestData& reqData = state->requests[close_producer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[close_producer.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producerData.topic.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
close_producer.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
close_producer.request_id());
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::CLOSE_CONSUMER: {
const CommandCloseConsumer& close_consumer = command.close_consumer();
RequestData& reqData = state->requests[close_consumer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[close_consumer.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(consumerData.subType, sub_type_names_vs),
consumerData.topic.c_str(), consumerData.subscriptionName.c_str(),
consumerData.consumerName.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
close_consumer.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
close_consumer.request_id());
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
consumerData.subscriptionName.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(consumerData.subType, sub_type_names_vs));
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::PRODUCER_SUCCESS: {
const CommandProducerSuccess& success = command.producer_success();
RequestResponseData & reqData = state->requests[success.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
uint64_t producerId = reqData.id;
ProducerData& producerData = state->producers[producerId];
producerData.producerName = success.producer_name();
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
success.request_id());
proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
success.producer_name().c_str());
proto_tree* item = proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb,
cmdOffset, cmdSize, producerId);
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::PING:
break;
case BaseCommand::PONG:
break;
}
return maxOffset;
}
/* determine PDU length of protocol Pulsar */
static uint32_t get_pulsar_message_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset,
void *data _U_) {
uint32_t len = (uint32_t) tvb_get_ntohl(tvb, offset);
return FRAME_SIZE_LEN + len;
}
static int dissect_pulsar(tvbuff_t *tvb, packet_info* pinfo, proto_tree* tree, void* data _U_) {
tcp_dissect_pdus(tvb, pinfo, tree, 1, FRAME_SIZE_LEN, get_pulsar_message_len, dissect_pulsar_message,
data);
return tvb_captured_length(tvb);
}
static hf_register_info hf[] = { //
{ &hf_pulsar_error, { "Error", "yahoo.pulsar.error", FT_BOOLEAN, BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_error_message, { "Message", "yahoo.pulsar.error_message", FT_STRING, 0,
NULL, 0x0,
NULL, HFILL } }, //
{ &hf_pulsar_cmd_type, { "Command Type", "yahoo.pulsar.cmd.type", FT_STRING, 0,
NULL, 0x0,
NULL, HFILL } }, //
{ &hf_pulsar_frame_size, { "Frame size", "yahoo.pulsar.frame_size", FT_UINT32, BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_cmd_size, { "Command size", "yahoo.pulsar.cmd_size", FT_UINT32, BASE_DEC,
NULL, 0x0,
NULL, HFILL } }, //
{ &hf_pulsar_client_version, { "Client version", "yahoo.pulsar.client_version", FT_STRING,
0,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_auth_method, { "Auth method", "yahoo.pulsar.auth_method", FT_STRING, 0, NULL,
0x0,
NULL, HFILL } }, //
{ &hf_pulsar_auth_data, { "Auth data", "yahoo.pulsar.auth_data", FT_STRING, 0,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_protocol_version, { "Protocol version", "yahoo.pulsar.protocol_version",
FT_STRING, 0,
NULL, 0x0, NULL, HFILL } },
{ &hf_pulsar_server_version, { "Server version", "yahoo.pulsar.server_version", FT_STRING,
0,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_topic, { "Topic", "yahoo.pulsar.topic", FT_STRING, 0, NULL, 0x0,
NULL, HFILL } }, //
{ &hf_pulsar_subscription, { "Subscription", "yahoo.pulsar.subscription", FT_STRING, 0,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_subType, { "Subscription type:", "yahoo.pulsar.sub_type", FT_STRING, 0, NULL,
0x0,
NULL, HFILL } }, //
{ &hf_pulsar_consumer_id, { "Consumer Id", "yahoo.pulsar.consumer_id", FT_UINT64,
BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_producer_id, { "Producer Id", "yahoo.pulsar.producer_id", FT_UINT64,
BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_server_error, { "Server error", "yahoo.pulsar.server_error", FT_STRING, 0,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_ack_type, { "Ack type", "yahoo.pulsar.ack_type", FT_STRING, 0,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_request_id, { "Request Id", "yahoo.pulsar.request_id", FT_UINT64, BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_consumer_name, { "Consumer Name", "yahoo.pulsar.consumer_name", FT_STRING,
BASE_NONE,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_producer_name, { "Producer Name", "yahoo.pulsar.producer_name", FT_STRING,
BASE_NONE,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_sequence_id, { "Sequence Id", "yahoo.pulsar.sequence_id", FT_UINT64,
BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_message_id, { "Message Id", "yahoo.pulsar.message_id", FT_STRING, BASE_NONE,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_message_permits, { "Message Permits", "yahoo.pulsar.message_permits",
FT_UINT32, BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_publish_time, { "Publish time", "yahoo.pulsar.publish_time", FT_UINT64,
BASE_DEC,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_replicated_from, { "Replicated from", "yahoo.pulsar.replicated_from",
FT_STRING, BASE_NONE,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_partition_key, { "Partition key", "yahoo.pulsar.partition_key", FT_STRING,
BASE_NONE,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_replicate_to, { "Replicate to", "yahoo.pulsar.replicate_to", FT_STRING,
BASE_NONE,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_property, { "Property", "yahoo.pulsar.property", FT_STRING, BASE_NONE,
NULL, 0x0, NULL, HFILL } }, //
{ &hf_pulsar_request_in, { "Request in frame", "yahoo.pulsar.request_in", FT_FRAMENUM,
BASE_NONE,
NULL, 0, "This packet is a response to the packet with this number",
HFILL } }, //
{ &hf_pulsar_response_in, { "Response in frame", "yahoo.pulsar.response_in", FT_FRAMENUM,
BASE_NONE,
NULL, 0, "This packet will be responded in the packet with this number",
HFILL } }, //
{ &hf_pulsar_publish_latency, { "Latency", "yahoo.pulsar.publish_latency",
FT_RELATIVE_TIME, BASE_NONE, NULL, 0x0,
"How long time it took to ACK message", HFILL } }, };
////////////////
///
void proto_register_pulsar() {
// register the new protocol, protocol fields, and subtrees
proto_pulsar = proto_register_protocol("Pulsar Wire Protocol", /* name */
"Yahoo Pulsar", /* short name */
"yahoo.pulsar" /* abbrev */
);
/* Setup protocol subtree array */
static int *ett[] = { &ett_pulsar };
proto_register_field_array(proto_pulsar, hf, array_length(hf));
proto_register_subtree_array(ett, array_length(ett));
}
void proto_reg_handoff_pulsar() {
static dissector_handle_t pulsar_handle;
pulsar_handle = create_dissector_handle(&dissect_pulsar, proto_pulsar);
dissector_add_uint("tcp.port", PULSAR_PORT, pulsar_handle);
}
extern "C" {
G_MODULE_EXPORT const char* version = VERSION;
/* Start the functions we need for the plugin stuff */
G_MODULE_EXPORT void plugin_register(void) {
if (proto_pulsar == -1) {
proto_register_pulsar();
}
}
G_MODULE_EXPORT void plugin_reg_handoff(void) {
proto_reg_handoff_pulsar();
}
}