| # PIP-363: Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`. |
| |
| # Background knowledge |
| |
| |
| As introduced in [PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md), Pulsar has been fully integrated into the `OpenTelemetry` system, which defines some metric specifications for [messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingpublishduration). |
| |
| In the current Pulsar client code, it is not possible to obtain the number of messages sent in batches(as well as some other sending data), making it impossible to implement `messaging.publish.messages` metric. |
| |
| In the `opentelemetry-java-instrumentation` code, the `org.apache.pulsar.client.impl.SendCallback` interface is used to instrument data points. For specific implementation details, we can refer to [this](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java#L89-L135). |
| |
| # Motivation |
| |
| |
| In the current situation, `org.apache.pulsar.client.impl.ProducerImpl` does not provide a public method to obtain the `numMessagesInBatch`. |
| |
| So, we can add some of `org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg`'s key data into the `org.apache.pulsar.client.impl.SendCallback.sendComplete` method. |
| |
| # Detailed Design |
| |
| Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`: |
| |
| ```java |
| public interface SendCallback { |
| |
| /** |
| * invoked when send operation completes. |
| * |
| * @param e |
| */ |
| void sendComplete(Throwable e, OpSendMsgStats stats); |
| } |
| |
| public interface OpSendMsgStats { |
| long getUncompressedSize(); |
| |
| long getSequenceId(); |
| |
| int getRetryCount(); |
| |
| long getBatchSizeByte(); |
| |
| int getNumMessagesInBatch(); |
| |
| long getHighestSequenceId(); |
| |
| int getTotalChunks(); |
| |
| int getChunkId(); |
| } |
| |
| @Builder |
| public class OpSendMsgStatsImpl implements OpSendMsgStats { |
| private long uncompressedSize; |
| private long sequenceId; |
| private int retryCount; |
| private long batchSizeByte; |
| private int numMessagesInBatch; |
| private long highestSequenceId; |
| private int totalChunks; |
| private int chunkId; |
| |
| @Override |
| public long getUncompressedSize() { |
| return uncompressedSize; |
| } |
| |
| @Override |
| public long getSequenceId() { |
| return sequenceId; |
| } |
| |
| @Override |
| public int getRetryCount() { |
| return retryCount; |
| } |
| |
| @Override |
| public long getBatchSizeByte() { |
| return batchSizeByte; |
| } |
| |
| @Override |
| public int getNumMessagesInBatch() { |
| return numMessagesInBatch; |
| } |
| |
| @Override |
| public long getHighestSequenceId() { |
| return highestSequenceId; |
| } |
| |
| @Override |
| public int getTotalChunks() { |
| return totalChunks; |
| } |
| |
| @Override |
| public int getChunkId() { |
| return chunkId; |
| } |
| } |
| ``` |
| |
| # Links |
| |
| <!-- |
| Updated afterwards |
| --> |
| * Mailing List discussion thread: https://lists.apache.org/thread/8pgmsvx1bxz4z1w8prpvpnfpt1kb57c9 |
| * Mailing List voting thread: https://lists.apache.org/thread/t0olt3722j17gjtdxqqsl3cpy104ogpr |