| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include <config.h> |
| #include "moduleinfo.h" |
| #include <epan/expert.h> |
| #include <epan/packet.h> |
| #include <epan/prefs.h> |
| #include <epan/proto.h> |
| #include <epan/column-utils.h> |
| #include <epan/dissectors/packet-tcp.h> |
| #include <epan/value_string.h> |
| #include <wsutil/nstime.h> |
| #include <glib.h> |
| #include <gmodule.h> |
| |
| #include "PulsarApi.pb.h" |
| |
| const static int PULSAR_PORT = 6650; |
| |
| static int proto_pulsar = -1; |
| static int hf_pulsar_error = -1; |
| static int hf_pulsar_error_message = -1; |
| static int hf_pulsar_cmd_type = -1; |
| static int hf_pulsar_frame_size = -1; |
| static int hf_pulsar_cmd_size = -1; |
| |
| static int hf_pulsar_client_version = -1; |
| static int hf_pulsar_auth_method = -1; |
| static int hf_pulsar_auth_data = -1; |
| static int hf_pulsar_protocol_version = -1; |
| static int hf_pulsar_server_version = -1; |
| |
| static int hf_pulsar_topic = -1; |
| static int hf_pulsar_subscription = -1; |
| static int hf_pulsar_subType = -1; |
| static int hf_pulsar_consumer_id = -1; |
| static int hf_pulsar_producer_id = -1; |
| static int hf_pulsar_server_error = -1; |
| static int hf_pulsar_ack_type = -1; |
| static int hf_pulsar_request_id = -1; |
| static int hf_pulsar_consumer_name = -1; |
| static int hf_pulsar_producer_name = -1; |
| |
| static int hf_pulsar_publish_time = -1; |
| static int hf_pulsar_replicated_from = -1; |
| static int hf_pulsar_partition_key = -1; |
| static int hf_pulsar_replicate_to = -1; |
| static int hf_pulsar_property = -1; |
| |
| static int hf_pulsar_request_in = -1; |
| static int hf_pulsar_response_in = -1; |
| static int hf_pulsar_publish_latency = -1; |
| |
| static int hf_pulsar_sequence_id = -1; |
| static int hf_pulsar_message_id = -1; |
| static int hf_pulsar_message_permits = -1; |
| |
| static int ett_pulsar = -1; |
| |
| const static int FRAME_SIZE_LEN = 4; |
| |
| static pulsar::proto::BaseCommand command; |
| |
| using namespace pulsar::proto; |
| |
| static const value_string pulsar_cmd_names[] = { // |
| { BaseCommand::CONNECT, "Connect" }, // |
| { BaseCommand::CONNECTED, "Connected" }, // |
| { BaseCommand::SUBSCRIBE, "Subscribe" }, // |
| { BaseCommand::PRODUCER, "Producer" }, // |
| { BaseCommand::SEND, "Send" }, // |
| { BaseCommand::SEND_RECEIPT, "SendReceipt" }, // |
| { BaseCommand::SEND_ERROR, "SendError" }, // |
| { BaseCommand::MESSAGE, "Message" }, // |
| { BaseCommand::ACK, "Ack" }, // |
| { BaseCommand::FLOW, "Flow" }, // |
| { BaseCommand::UNSUBSCRIBE, "Unsubscribe" }, // |
| { BaseCommand::SUCCESS, "Success" }, // |
| { BaseCommand::ERROR, "Error" }, // |
| { BaseCommand::CLOSE_PRODUCER, "CloseProducer" }, // |
| { BaseCommand::CLOSE_CONSUMER, "CloseConsumer" }, // |
| { BaseCommand::PRODUCER_SUCCESS, "ProducerSuccess" }, // |
| { BaseCommand::PING, "Ping" }, // |
| { BaseCommand::PONG, "Pong" }, // |
| }; |
| |
| static const value_string auth_methods_vs[] = { { AuthMethodNone, "None" }, // |
| { AuthMethodYcaV1, "YCAv1" }, // |
| { AuthMethodAthens, "Athens" } // |
| }; |
| |
| static const value_string server_errors_vs[] = { { UnknownError, "UnknownError" }, // |
| { MetadataError, "MetadataError" }, // |
| { PersistenceError, "PersistenceError" }, // |
| { AuthenticationError, "AuthenticationError" }, // |
| { AuthorizationError, "AuthorizationError" }, // |
| { ConsumerBusy, "ConsumerBusy" }, // |
| { ServiceNotReady, "ServiceNotReady" }, // |
| { ProducerBlockedQuotaExceededError, "ProducerBlockedQuotaExceededError" }, // |
| { ProducerBlockedQuotaExceededException, "ProducerBlockedQuotaExceededException" } // |
| }; |
| |
| static const value_string ack_type_vs[] = { { CommandAck::Individual, "Individual" }, // |
| { CommandAck::Cumulative, "Cumulative" } // |
| }; |
| |
| static const value_string protocol_version_vs[] = { { v0, "v0" }, // |
| { v1, "v1" } // |
| }; |
| |
| static const value_string sub_type_names_vs[] = { // |
| { CommandSubscribe::Exclusive, "Exclusive" }, // |
| { CommandSubscribe::Shared, "Shared" }, // |
| { CommandSubscribe::Failover, "Failover" } // |
| }; |
| |
| static const char* to_str(int value, const value_string* values) { |
| return val_to_str(value, values, "Unknown (%d)"); |
| } |
| |
| struct MessageIdComparator { |
| bool operator()(const MessageIdData& a, const MessageIdData& b) const { |
| if (a.ledgerid() < b.ledgerid()) { |
| return true; |
| } else if (a.ledgerid() == b.ledgerid()) { |
| return a.entryid() < b.entryid(); |
| } else { |
| return false; |
| } |
| } |
| }; |
| |
| struct RequestData { |
| uint32_t requestFrame; |
| nstime_t requestTimestamp; |
| uint32_t ackFrame; |
| nstime_t ackTimestamp; |
| |
| RequestData() |
| : requestFrame(UINT32_MAX), |
| ackFrame(UINT32_MAX) { |
| } |
| }; |
| |
| struct RequestResponseData : public RequestData { |
| uint64_t id; // producer / consumer id |
| }; |
| |
| struct ProducerData { |
| std::string topic; |
| std::string producerName; |
| |
| std::map<uint64_t, RequestData> messages; |
| }; |
| |
| struct ConsumerData { |
| std::string topic; |
| std::string subscriptionName; |
| std::string consumerName; |
| CommandSubscribe::SubType subType; |
| |
| // Link messages to acks for the consumer |
| std::map<MessageIdData, RequestData, MessageIdComparator> messages; |
| }; |
| |
| struct ConnectionState { |
| std::map<uint64_t, ProducerData> producers; |
| std::map<uint64_t, ConsumerData> consumers; |
| std::map<uint64_t, RequestResponseData> requests; |
| }; |
| |
| static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t *tvb, int offset, |
| int maxOffset) { |
| // Decode message metadata |
| uint32_t metadataSize = (uint32_t) tvb_get_ntohl(tvb, offset); |
| offset += 4; |
| |
| if (offset + metadataSize > maxOffset) { |
| // Not enough data to dissect metadata |
| proto_tree_add_debug_text(frame_tree, "[Not enough data to dissect message metadata]"); |
| return; |
| } |
| |
| static MessageMetadata msgMetadata; |
| uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, metadataSize); |
| |
| if (!msgMetadata.ParseFromArray(ptr, metadataSize)) { |
| proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, offset, metadataSize, true, |
| "Error parsing protocol buffer message metadata"); |
| return; |
| } |
| |
| proto_item* md_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize, |
| ett_pulsar, |
| NULL, |
| "Message / %s / %llu", |
| msgMetadata.producer_name().c_str(), |
| msgMetadata.sequence_id()); |
| proto_tree_add_string(md_tree, hf_pulsar_producer_name, tvb, offset, metadataSize, |
| msgMetadata.producer_name().c_str()); |
| proto_tree_add_uint64(md_tree, hf_pulsar_sequence_id, tvb, offset, metadataSize, |
| msgMetadata.sequence_id()); |
| proto_tree_add_uint64(md_tree, hf_pulsar_publish_time, tvb, offset, metadataSize, |
| msgMetadata.publish_time()); |
| if (msgMetadata.has_replicated_from()) { |
| proto_tree_add_string(md_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize, |
| msgMetadata.replicated_from().c_str()); |
| } |
| |
| if (msgMetadata.properties_size() > 0) { |
| proto_item* properties_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset, |
| metadataSize, ett_pulsar, |
| NULL, |
| "Properties"); |
| for (int i = 0; i < msgMetadata.properties_size(); i++) { |
| const KeyValue& kv = msgMetadata.properties(i); |
| proto_tree_add_string_format(properties_tree, hf_pulsar_property, tvb, offset, |
| metadataSize, "", "%s : %s", kv.key().c_str(), |
| kv.value().c_str()); |
| } |
| } |
| |
| if (msgMetadata.replicate_to_size() > 0) { |
| proto_item* replicate_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset, |
| metadataSize, ett_pulsar, |
| NULL, |
| "Replicate to"); |
| for (int i = 0; i < msgMetadata.replicate_to_size(); i++) { |
| proto_tree_add_string_format(replicate_tree, hf_pulsar_replicated_from, tvb, offset, |
| metadataSize, "", "%s", |
| msgMetadata.replicate_to(i).c_str()); |
| } |
| } |
| |
| if (msgMetadata.has_partition_key()) { |
| proto_tree_add_string(md_tree, hf_pulsar_partition_key, tvb, offset, metadataSize, |
| msgMetadata.partition_key().c_str()); |
| } |
| |
| offset += metadataSize; |
| uint32_t payloadSize = maxOffset - offset; |
| proto_tree_add_subtree_format(md_tree, tvb, offset, payloadSize, ett_pulsar, NULL, |
| "Payload / size=%u", payloadSize); |
| } |
| |
| void link_to_request_frame(proto_tree* cmd_tree, tvbuff_t* tvb, int offset, int size, |
| const RequestData& reqData) { |
| if (reqData.requestFrame != UINT32_MAX) { |
| proto_tree* item = proto_tree_add_uint(cmd_tree, hf_pulsar_request_in, tvb, offset, size, |
| reqData.requestFrame); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| nstime_t latency; |
| nstime_delta(&latency, &reqData.ackTimestamp, &reqData.requestTimestamp); |
| item = proto_tree_add_time(cmd_tree, hf_pulsar_publish_latency, tvb, offset, size, &latency); |
| PROTO_ITEM_SET_GENERATED(item); |
| } |
| } |
| |
| void link_to_response_frame(proto_tree* cmd_tree, tvbuff_t* tvb, int offset, int size, |
| const RequestData& reqData) { |
| if (reqData.ackFrame != UINT32_MAX) { |
| proto_tree* item = proto_tree_add_uint(cmd_tree, hf_pulsar_response_in, tvb, offset, size, |
| reqData.ackFrame); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| nstime_t latency; |
| nstime_delta(&latency, &reqData.ackTimestamp, &reqData.requestTimestamp); |
| item = proto_tree_add_time(cmd_tree, hf_pulsar_publish_latency, tvb, offset, size, &latency); |
| PROTO_ITEM_SET_GENERATED(item); |
| } |
| } |
| |
| ////////// |
| |
| /* This method dissects fully reassembled messages */ |
| static int dissect_pulsar_message(tvbuff_t *tvb, packet_info* pinfo, proto_tree* tree, |
| void* data _U_) { |
| col_set_str(pinfo->cinfo, COL_PROTOCOL, "yahoo-Pulsar"); |
| |
| conversation_t* conversation = find_or_create_conversation(pinfo); |
| ConnectionState* state = (ConnectionState*) conversation_get_proto_data(conversation, |
| proto_pulsar); |
| if (state == NULL) { |
| state = new ConnectionState(); |
| conversation_add_proto_data(conversation, proto_pulsar, state); |
| } |
| |
| uint32_t offset = FRAME_SIZE_LEN; |
| int maxOffset = tvb_captured_length(tvb); |
| |
| uint32_t cmdSize = (uint32_t) tvb_get_ntohl(tvb, offset); |
| offset += 4; |
| |
| if (offset + cmdSize > maxOffset) { |
| // Not enough data to dissect |
| proto_tree_add_debug_text(tree, "[Not enough data to dissect command]"); |
| return maxOffset; |
| } |
| |
| uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, cmdSize); |
| if (!command.ParseFromArray(ptr, cmdSize)) { |
| proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true, |
| "Error parsing protocol buffer command"); |
| return maxOffset; |
| } |
| |
| int cmdOffset = offset; |
| offset += cmdSize; |
| |
| col_add_str(pinfo->cinfo, COL_INFO, |
| val_to_str_const(command.type(), pulsar_cmd_names, "Unknown (%d)")); |
| |
| proto_item* frame_tree = NULL; |
| proto_item* cmd_tree = NULL; |
| if (tree) { /* we are being asked for details */ |
| proto_item *ti = proto_tree_add_item(tree, proto_pulsar, tvb, 0, -1, ENC_NA); |
| frame_tree = proto_item_add_subtree(ti, ett_pulsar); |
| proto_tree_add_item(frame_tree, hf_pulsar_frame_size, tvb, 0, 4, ENC_BIG_ENDIAN); |
| proto_tree_add_item(frame_tree, hf_pulsar_cmd_size, tvb, 4, 4, ENC_BIG_ENDIAN); |
| cmd_tree = proto_tree_add_subtree_format(frame_tree, tvb, 8, cmdSize, ett_pulsar, |
| NULL, |
| "Command %s", |
| to_str(command.type(), pulsar_cmd_names)); |
| proto_tree_add_string(cmd_tree, hf_pulsar_cmd_type, tvb, 8, cmdSize, |
| to_str(command.type(), pulsar_cmd_names)); |
| } |
| |
| proto_tree* item = NULL; |
| |
| switch (command.type()) { |
| case BaseCommand::CONNECT: { |
| const CommandConnect& connect = command.connect(); |
| if (tree) { |
| proto_tree_add_string(cmd_tree, hf_pulsar_client_version, tvb, cmdOffset, cmdSize, |
| connect.client_version().c_str()); |
| proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize, |
| to_str(connect.protocol_version(), protocol_version_vs)); |
| proto_tree_add_string(cmd_tree, hf_pulsar_auth_method, tvb, cmdOffset, cmdSize, |
| to_str(connect.auth_method(), auth_methods_vs)); |
| if (connect.has_auth_data()) { |
| proto_tree_add_string(cmd_tree, hf_pulsar_auth_data, tvb, cmdOffset, cmdSize, |
| connect.auth_data().c_str()); |
| } |
| } |
| break; |
| } |
| case BaseCommand::CONNECTED: { |
| const CommandConnected& connected = command.connected(); |
| if (tree) { |
| proto_tree_add_string(cmd_tree, hf_pulsar_server_version, tvb, cmdOffset, cmdSize, |
| connected.server_version().c_str()); |
| proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize, |
| to_str(connected.protocol_version(), protocol_version_vs)); |
| } |
| break; |
| } |
| case BaseCommand::SUBSCRIBE: { |
| const CommandSubscribe& subscribe = command.subscribe(); |
| RequestData& reqData = state->requests[subscribe.request_id()]; |
| reqData.requestFrame = pinfo->fd->num; |
| reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| ConsumerData& consumerData = state->consumers[subscribe.consumer_id()]; |
| consumerData.topic = subscribe.topic(); |
| consumerData.subscriptionName = subscribe.subscription(); |
| consumerData.consumerName = subscribe.consumer_name(); |
| consumerData.subType = subscribe.subtype(); |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s", |
| to_str(subscribe.subtype(), sub_type_names_vs), |
| subscribe.topic().c_str(), subscribe.subscription().c_str(), |
| subscribe.consumer_name().c_str()); |
| |
| if (tree) { |
| proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| subscribe.topic().c_str()); |
| proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize, |
| subscribe.subscription().c_str()); |
| proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize, |
| to_str(subscribe.subtype(), sub_type_names_vs)); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize, |
| subscribe.consumer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| subscribe.request_id()); |
| proto_tree_add_string( |
| cmd_tree, |
| hf_pulsar_consumer_name, |
| tvb, |
| cmdOffset, |
| cmdSize, |
| subscribe.has_consumer_name() ? |
| subscribe.consumer_name().c_str() : "<none>"); |
| } |
| break; |
| } |
| case BaseCommand::PRODUCER: { |
| const CommandProducer& producer = command.producer(); |
| RequestResponseData& reqData = state->requests[producer.request_id()]; |
| reqData.requestFrame = pinfo->fd->num; |
| reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| reqData.id = producer.producer_id(); |
| |
| state->producers[producer.producer_id()].topic = producer.topic(); |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producer.topic().c_str()); |
| |
| if (tree) { |
| proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| producer.topic().c_str()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize, |
| producer.producer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| producer.request_id()); |
| proto_tree_add_string( |
| cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize, |
| producer.has_producer_name() ? producer.producer_name().c_str() : "<none>"); |
| |
| link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| case BaseCommand::SEND: { |
| const CommandSend& send = command.send(); |
| RequestData& data = state->producers[send.producer_id()].messages[send.sequence_id()]; |
| data.requestFrame = pinfo->fd->num; |
| data.requestTimestamp.secs = pinfo->fd->abs_ts.secs; |
| data.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| ProducerData& producerData = state->producers[send.producer_id()]; |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu", |
| producerData.producerName.c_str(), send.sequence_id()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize, |
| send.producer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize, |
| send.sequence_id()); |
| |
| // Decode message metadata |
| dissect_message_metadata(cmd_tree, tvb, offset, maxOffset); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, |
| cmdSize, producerData.producerName.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| producerData.topic.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| // Pair with frame information |
| link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, data); |
| } |
| break; |
| } |
| case BaseCommand::SEND_RECEIPT: { |
| const CommandSendReceipt& send_receipt = command.send_receipt(); |
| RequestData & data = state->producers[send_receipt.producer_id()].messages[send_receipt |
| .sequence_id()]; |
| data.ackFrame = pinfo->fd->num; |
| data.ackTimestamp.secs = pinfo->fd->abs_ts.secs; |
| data.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| ProducerData& producerData = state->producers[send_receipt.producer_id()]; |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu", |
| producerData.producerName.c_str(), send_receipt.sequence_id()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize, |
| send_receipt.producer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize, |
| send_receipt.sequence_id()); |
| if (send_receipt.has_message_id()) { |
| const MessageIdData& messageId = send_receipt.message_id(); |
| proto_tree_add_string_format(cmd_tree, hf_pulsar_message_id, tvb, cmdOffset, |
| cmdSize, "", "Message Id: %llu:%llu", |
| messageId.ledgerid(), messageId.entryid()); |
| } |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, |
| cmdSize, producerData.producerName.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| producerData.topic.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, data); |
| } |
| break; |
| } |
| case BaseCommand::SEND_ERROR: { |
| const CommandSendError& send_error = command.send_error(); |
| RequestData & reqData = state->producers[send_error.producer_id()].messages[send_error |
| .sequence_id()]; |
| reqData.ackFrame = pinfo->fd->num; |
| reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| ProducerData& producerData = state->producers[send_error.producer_id()]; |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu", |
| producerData.producerName.c_str(), send_error.sequence_id()); |
| |
| if (tree) { |
| proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, |
| true, "Error in sending operation"); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize, |
| send_error.producer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize, |
| send_error.sequence_id()); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize, |
| to_str(send_error.error(), server_errors_vs)); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, |
| cmdSize, producerData.producerName.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| producerData.topic.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| // Pair with frame information |
| link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| case BaseCommand::MESSAGE: { |
| const CommandMessage& message = command.message(); |
| RequestData& data = |
| state->consumers[message.consumer_id()].messages[message.message_id()]; |
| data.requestFrame = pinfo->fd->num; |
| data.requestTimestamp.secs = pinfo->fd->abs_ts.secs; |
| data.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| const ConsumerData& consumerData = state->consumers[message.consumer_id()]; |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu:%llu", |
| consumerData.consumerName.c_str(), message.message_id().ledgerid(), |
| message.message_id().entryid()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize, |
| message.consumer_id()); |
| |
| dissect_message_metadata(cmd_tree, tvb, offset, maxOffset); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, |
| cmdSize, consumerData.consumerName.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| consumerData.topic.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| // Pair with frame information |
| link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, data); |
| } |
| break; |
| } |
| case BaseCommand::ACK: { |
| const CommandAck& ack = command.ack(); |
| RequestData& data = state->consumers[ack.consumer_id()].messages[ack.message_id()]; |
| data.ackFrame = pinfo->fd->num; |
| data.ackTimestamp.secs = pinfo->fd->abs_ts.secs; |
| data.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| const ConsumerData& consumerData = state->consumers[ack.consumer_id()]; |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %llu:%llu", |
| consumerData.consumerName.c_str(), ack.message_id().ledgerid(), |
| ack.message_id().entryid()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize, |
| ack.consumer_id()); |
| proto_tree_add_string(cmd_tree, hf_pulsar_ack_type, tvb, cmdOffset, cmdSize, |
| to_str(ack.ack_type(), ack_type_vs)); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, |
| cmdSize, consumerData.consumerName.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| consumerData.topic.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| // Pair with frame information |
| link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, data); |
| } |
| break; |
| } |
| case BaseCommand::FLOW: { |
| const CommandFlow& flow = command.flow(); |
| const ConsumerData& consumerData = state->consumers[flow.consumer_id()]; |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %d", consumerData.consumerName.c_str(), |
| flow.messagepermits()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize, |
| flow.consumer_id()); |
| proto_tree_add_uint(cmd_tree, hf_pulsar_message_permits, tvb, cmdOffset, cmdSize, |
| flow.messagepermits()); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, |
| cmdSize, consumerData.consumerName.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| consumerData.topic.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| } |
| break; |
| } |
| case BaseCommand::UNSUBSCRIBE: { |
| const CommandUnsubscribe& unsubscribe = command.unsubscribe(); |
| RequestData& reqData = state->requests[unsubscribe.request_id()]; |
| reqData.requestFrame = pinfo->fd->num; |
| reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| ConsumerData& consumerData = state->consumers[unsubscribe.consumer_id()]; |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s", |
| to_str(consumerData.subType, sub_type_names_vs), |
| consumerData.topic.c_str(), consumerData.subscriptionName.c_str(), |
| consumerData.consumerName.c_str()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize, |
| unsubscribe.consumer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| unsubscribe.request_id()); |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| consumerData.topic.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize, |
| consumerData.subscriptionName.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize, |
| to_str(consumerData.subType, sub_type_names_vs)); |
| PROTO_ITEM_IS_GENERATED(item); |
| |
| proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize, |
| consumerData.consumerName.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| |
| link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| |
| case BaseCommand::SUCCESS: { |
| const CommandSuccess& success = command.success(); |
| RequestResponseData & reqData = state->requests[success.request_id()]; |
| reqData.ackFrame = pinfo->fd->num; |
| reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| success.request_id()); |
| |
| link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| case BaseCommand::ERROR: { |
| const CommandError& error = command.error(); |
| RequestResponseData & reqData = state->requests[error.request_id()]; |
| reqData.ackFrame = pinfo->fd->num; |
| reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| if (tree) { |
| proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, |
| true, "Request failed"); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| error.request_id()); |
| proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize, |
| to_str(error.error(), server_errors_vs)); |
| proto_tree_add_string(cmd_tree, hf_pulsar_error_message, tvb, cmdOffset, cmdSize, |
| error.message().c_str()); |
| |
| link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| case BaseCommand::CLOSE_PRODUCER: { |
| const CommandCloseProducer& close_producer = command.close_producer(); |
| RequestData& reqData = state->requests[close_producer.request_id()]; |
| reqData.requestFrame = pinfo->fd->num; |
| reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| ProducerData& producerData = state->producers[close_producer.producer_id()]; |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producerData.topic.c_str()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize, |
| close_producer.producer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| close_producer.request_id()); |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| producerData.topic.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| |
| proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize, |
| producerData.producerName.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| |
| link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| |
| case BaseCommand::CLOSE_CONSUMER: { |
| const CommandCloseConsumer& close_consumer = command.close_consumer(); |
| RequestData& reqData = state->requests[close_consumer.request_id()]; |
| reqData.requestFrame = pinfo->fd->num; |
| reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| |
| ConsumerData& consumerData = state->consumers[close_consumer.consumer_id()]; |
| |
| col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s", |
| to_str(consumerData.subType, sub_type_names_vs), |
| consumerData.topic.c_str(), consumerData.subscriptionName.c_str(), |
| consumerData.consumerName.c_str()); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize, |
| close_consumer.consumer_id()); |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| close_consumer.request_id()); |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| consumerData.topic.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize, |
| consumerData.subscriptionName.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize, |
| to_str(consumerData.subType, sub_type_names_vs)); |
| PROTO_ITEM_IS_GENERATED(item); |
| |
| proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize, |
| consumerData.consumerName.c_str()); |
| PROTO_ITEM_IS_GENERATED(item); |
| |
| link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| |
| case BaseCommand::PRODUCER_SUCCESS: { |
| const CommandProducerSuccess& success = command.producer_success(); |
| RequestResponseData & reqData = state->requests[success.request_id()]; |
| reqData.ackFrame = pinfo->fd->num; |
| reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs; |
| reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs; |
| uint64_t producerId = reqData.id; |
| ProducerData& producerData = state->producers[producerId]; |
| producerData.producerName = success.producer_name(); |
| |
| if (tree) { |
| proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize, |
| success.request_id()); |
| proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize, |
| success.producer_name().c_str()); |
| |
| proto_tree* item = proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, |
| cmdOffset, cmdSize, producerId); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize, |
| producerData.topic.c_str()); |
| PROTO_ITEM_SET_GENERATED(item); |
| |
| link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData); |
| } |
| break; |
| } |
| case BaseCommand::PING: |
| break; |
| case BaseCommand::PONG: |
| break; |
| } |
| |
| return maxOffset; |
| } |
| |
| /* determine PDU length of protocol Pulsar */ |
| static uint32_t get_pulsar_message_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset, |
| void *data _U_) { |
| uint32_t len = (uint32_t) tvb_get_ntohl(tvb, offset); |
| return FRAME_SIZE_LEN + len; |
| } |
| |
| static int dissect_pulsar(tvbuff_t *tvb, packet_info* pinfo, proto_tree* tree, void* data _U_) { |
| tcp_dissect_pdus(tvb, pinfo, tree, 1, FRAME_SIZE_LEN, get_pulsar_message_len, dissect_pulsar_message, |
| data); |
| return tvb_captured_length(tvb); |
| } |
| |
| static hf_register_info hf[] = { // |
| { &hf_pulsar_error, { "Error", "yahoo.pulsar.error", FT_BOOLEAN, BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_error_message, { "Message", "yahoo.pulsar.error_message", FT_STRING, 0, |
| NULL, 0x0, |
| NULL, HFILL } }, // |
| { &hf_pulsar_cmd_type, { "Command Type", "yahoo.pulsar.cmd.type", FT_STRING, 0, |
| NULL, 0x0, |
| NULL, HFILL } }, // |
| { &hf_pulsar_frame_size, { "Frame size", "yahoo.pulsar.frame_size", FT_UINT32, BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_cmd_size, { "Command size", "yahoo.pulsar.cmd_size", FT_UINT32, BASE_DEC, |
| NULL, 0x0, |
| NULL, HFILL } }, // |
| |
| { &hf_pulsar_client_version, { "Client version", "yahoo.pulsar.client_version", FT_STRING, |
| 0, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_auth_method, { "Auth method", "yahoo.pulsar.auth_method", FT_STRING, 0, NULL, |
| 0x0, |
| NULL, HFILL } }, // |
| { &hf_pulsar_auth_data, { "Auth data", "yahoo.pulsar.auth_data", FT_STRING, 0, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_protocol_version, { "Protocol version", "yahoo.pulsar.protocol_version", |
| FT_STRING, 0, |
| NULL, 0x0, NULL, HFILL } }, |
| |
| { &hf_pulsar_server_version, { "Server version", "yahoo.pulsar.server_version", FT_STRING, |
| 0, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_topic, { "Topic", "yahoo.pulsar.topic", FT_STRING, 0, NULL, 0x0, |
| NULL, HFILL } }, // |
| { &hf_pulsar_subscription, { "Subscription", "yahoo.pulsar.subscription", FT_STRING, 0, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_subType, { "Subscription type:", "yahoo.pulsar.sub_type", FT_STRING, 0, NULL, |
| 0x0, |
| NULL, HFILL } }, // |
| { &hf_pulsar_consumer_id, { "Consumer Id", "yahoo.pulsar.consumer_id", FT_UINT64, |
| BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_producer_id, { "Producer Id", "yahoo.pulsar.producer_id", FT_UINT64, |
| BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| |
| { &hf_pulsar_server_error, { "Server error", "yahoo.pulsar.server_error", FT_STRING, 0, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_ack_type, { "Ack type", "yahoo.pulsar.ack_type", FT_STRING, 0, |
| NULL, 0x0, NULL, HFILL } }, // |
| |
| { &hf_pulsar_request_id, { "Request Id", "yahoo.pulsar.request_id", FT_UINT64, BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_consumer_name, { "Consumer Name", "yahoo.pulsar.consumer_name", FT_STRING, |
| BASE_NONE, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_producer_name, { "Producer Name", "yahoo.pulsar.producer_name", FT_STRING, |
| BASE_NONE, |
| NULL, 0x0, NULL, HFILL } }, // |
| |
| { &hf_pulsar_sequence_id, { "Sequence Id", "yahoo.pulsar.sequence_id", FT_UINT64, |
| BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_message_id, { "Message Id", "yahoo.pulsar.message_id", FT_STRING, BASE_NONE, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_message_permits, { "Message Permits", "yahoo.pulsar.message_permits", |
| FT_UINT32, BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| |
| { &hf_pulsar_publish_time, { "Publish time", "yahoo.pulsar.publish_time", FT_UINT64, |
| BASE_DEC, |
| NULL, 0x0, NULL, HFILL } }, // |
| |
| { &hf_pulsar_replicated_from, { "Replicated from", "yahoo.pulsar.replicated_from", |
| FT_STRING, BASE_NONE, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_partition_key, { "Partition key", "yahoo.pulsar.partition_key", FT_STRING, |
| BASE_NONE, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_replicate_to, { "Replicate to", "yahoo.pulsar.replicate_to", FT_STRING, |
| BASE_NONE, |
| NULL, 0x0, NULL, HFILL } }, // |
| { &hf_pulsar_property, { "Property", "yahoo.pulsar.property", FT_STRING, BASE_NONE, |
| NULL, 0x0, NULL, HFILL } }, // |
| |
| { &hf_pulsar_request_in, { "Request in frame", "yahoo.pulsar.request_in", FT_FRAMENUM, |
| BASE_NONE, |
| NULL, 0, "This packet is a response to the packet with this number", |
| HFILL } }, // |
| { &hf_pulsar_response_in, { "Response in frame", "yahoo.pulsar.response_in", FT_FRAMENUM, |
| BASE_NONE, |
| NULL, 0, "This packet will be responded in the packet with this number", |
| HFILL } }, // |
| { &hf_pulsar_publish_latency, { "Latency", "yahoo.pulsar.publish_latency", |
| FT_RELATIVE_TIME, BASE_NONE, NULL, 0x0, |
| "How long time it took to ACK message", HFILL } }, }; |
| |
| //////////////// |
| /// |
| void proto_register_pulsar() { |
| // register the new protocol, protocol fields, and subtrees |
| |
| proto_pulsar = proto_register_protocol("Pulsar Wire Protocol", /* name */ |
| "Yahoo Pulsar", /* short name */ |
| "yahoo.pulsar" /* abbrev */ |
| ); |
| |
| /* Setup protocol subtree array */ |
| static int *ett[] = { &ett_pulsar }; |
| |
| proto_register_field_array(proto_pulsar, hf, array_length(hf)); |
| proto_register_subtree_array(ett, array_length(ett)); |
| } |
| |
| void proto_reg_handoff_pulsar() { |
| static dissector_handle_t pulsar_handle; |
| |
| pulsar_handle = create_dissector_handle(&dissect_pulsar, proto_pulsar); |
| dissector_add_uint("tcp.port", PULSAR_PORT, pulsar_handle); |
| } |
| |
| extern "C" { |
| |
| G_MODULE_EXPORT const char* version = VERSION; |
| |
| /* Start the functions we need for the plugin stuff */ |
| G_MODULE_EXPORT void plugin_register(void) { |
| if (proto_pulsar == -1) { |
| proto_register_pulsar(); |
| } |
| } |
| |
| G_MODULE_EXPORT void plugin_reg_handoff(void) { |
| proto_reg_handoff_pulsar(); |
| } |
| |
| } |