blob: 2b250e69871e18eaac23c513af80e374599bf134 [file] [log] [blame] [view]
# PIP-363: Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`.
# Background knowledge
As introduced in [PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md), Pulsar has been fully integrated into the `OpenTelemetry` system, which defines some metric specifications for [messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingpublishduration).
In the current Pulsar client code, it is not possible to obtain the number of messages sent in batches(as well as some other sending data), making it impossible to implement `messaging.publish.messages` metric.
In the `opentelemetry-java-instrumentation` code, the `org.apache.pulsar.client.impl.SendCallback` interface is used to instrument data points. For specific implementation details, we can refer to [this](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java#L89-L135).
# Motivation
In the current situation, `org.apache.pulsar.client.impl.ProducerImpl` does not provide a public method to obtain the `numMessagesInBatch`.
So, we can add some of `org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg`'s key data into the `org.apache.pulsar.client.impl.SendCallback.sendComplete` method.
# Detailed Design
Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`:
```java
public interface SendCallback {
/**
* invoked when send operation completes.
*
* @param e
*/
void sendComplete(Throwable e, OpSendMsgStats stats);
}
public interface OpSendMsgStats {
long getUncompressedSize();
long getSequenceId();
int getRetryCount();
long getBatchSizeByte();
int getNumMessagesInBatch();
long getHighestSequenceId();
int getTotalChunks();
int getChunkId();
}
@Builder
public class OpSendMsgStatsImpl implements OpSendMsgStats {
private long uncompressedSize;
private long sequenceId;
private int retryCount;
private long batchSizeByte;
private int numMessagesInBatch;
private long highestSequenceId;
private int totalChunks;
private int chunkId;
@Override
public long getUncompressedSize() {
return uncompressedSize;
}
@Override
public long getSequenceId() {
return sequenceId;
}
@Override
public int getRetryCount() {
return retryCount;
}
@Override
public long getBatchSizeByte() {
return batchSizeByte;
}
@Override
public int getNumMessagesInBatch() {
return numMessagesInBatch;
}
@Override
public long getHighestSequenceId() {
return highestSequenceId;
}
@Override
public int getTotalChunks() {
return totalChunks;
}
@Override
public int getChunkId() {
return chunkId;
}
}
```
# Links
<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/8pgmsvx1bxz4z1w8prpvpnfpt1kb57c9
* Mailing List voting thread: https://lists.apache.org/thread/t0olt3722j17gjtdxqqsl3cpy104ogpr