| # PIP-204: Extensions for broker interceptor |
| |
| Currently, we have a reconciliation system that compares the messages(or entries) produced and consumed at a minute level to indicate whether all the data has been consumed completely by the consumers. |
| |
| We want to track the message(or entry) properties (including the timestamp and msgSize) for messages has been persistent to bookie and messages that have been consumed & acked. Also, we want to track the event of the producer and consumer closed. |
| |
| A good way to achieve this is using the `BrokerInterceptor ` to interceptor the message at certain points. So we want to expand some interfaces for `BrokerInterceptor ` like what #12858 did. |
| |
| ### Goal |
| |
| 1. Get timestamp and size for the entry which has been persistent to bookie |
| 2. trace the producer or consumer closed |
| 3. Get the timestamp and size for entry that has been consumed and acked(which is already supported by BrokerInterceptor) |
| |
| ### API Changes |
| |
| interceptor message when broker received a send request to get the timestamp and size |
| |
| ```java |
| /** |
| * Interceptor message when broker received a send request |
| * |
| * @param headersAndPayload entry's header and payload |
| * @param publishContext Publish Context |
| */ |
| default void onMessagePublish(Producer producer, |
| ByteBuf headersAndPayload, |
| Topic.PublishContext publishContext) { |
| |
| } |
| ``` |
| |
| Add interfaces for a producer or consumer closed |
| |
| ```JAVA |
| /** |
| * Called by the broker when a producer is closed. |
| * |
| * @param cnx client Connection |
| * @param producer Consumer object |
| * @param metadata A map of metadata |
| */ |
| default void producerClosed(ServerCnx cnx, |
| Producer producer, |
| Map<String, String> metadata) { |
| } |
| |
| /** |
| * Called by the broker when a consumer is closed. |
| * |
| * @param cnx client Connection |
| * @param consumer Consumer object |
| * @param metadata A map of metadata |
| */ |
| default void consumerClosed(ServerCnx cnx, |
| Consumer consumer, |
| Map<String, String> metadata) { |
| } |
| ``` |
| |
| expand the `beforeSendMessage` to support consumer |
| |
| ```JAVA |
| /** |
| * Intercept messages before sending them to the consumers. |
| * |
| * @param subscription pulsar subscription |
| * @param entry entry |
| * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets. |
| * @param msgMetadata message metadata. The message metadata will be recycled after this call. |
| * @param consumer consumer. Consumer which entry are sent to. |
| */ |
| default void beforeSendMessage(Subscription subscription, |
| Entry entry, |
| long[] ackSet, |
| MessageMetadata msgMetadata, |
| Consumer consumer) { |
| } |
| ``` |
| |
| |
| |
| ### Implementation |
| |
| First, change the APIs as described above. |
| |
| Then implements all the newly added interfaces in `BrokerInterceptors.java` and `BrokerInterceptorWithClassLoader` . |
| |
| Set all interested properties(like timestamp and msgSize) to `MessagePublishContext` in `onMessagePublish` interface before persistent entry to bookie. |
| |
| When after entry persistent to the bookie, invoke `BrokerInterceptor.messageProduced()` and get the properties from the `MessagePublishContext` for reconciliation. |
| |
| For consumption, record the interested properties before sending to the consumer by `beforeSendMessage`, and then at the point f message acked, getting all the properties for reconciliation. |