blob: d828d729cc70831bb57f1f9ae2efdd026162ee90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.yahoo.sketches.quantiles.DoublesSketch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
private static final long serialVersionUID = 1L;
private transient TimerTask stat;
private transient Timeout statTimeout;
private transient ProducerImpl<?> producer;
private transient PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
private final LongAdder numMsgsSent;
private final LongAdder numBytesSent;
private final LongAdder numSendFailed;
private final LongAdder numAcksReceived;
private final LongAdder totalMsgsSent;
private final LongAdder totalBytesSent;
private final LongAdder totalSendFailed;
private final LongAdder totalAcksReceived;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
private final transient DoublesSketch ds;
private final transient DoublesSketch batchSizeDs;
private final transient DoublesSketch msgSizeDs;
private volatile double sendMsgsRate;
private volatile double sendBytesRate;
private volatile double[] latencyPctValues = new double[PERCENTILES.length];
private volatile double[] batchSizePctValues = new double[PERCENTILES.length];
private volatile double[] msgSizePctValues = new double[PERCENTILES.length];
private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99, 0.999, 1.0 };
public ProducerStatsRecorderImpl() {
numMsgsSent = new LongAdder();
numBytesSent = new LongAdder();
numSendFailed = new LongAdder();
numAcksReceived = new LongAdder();
totalMsgsSent = new LongAdder();
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
}
public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, ProducerConfigurationData conf,
ProducerImpl<?> producer) {
this.pulsarClient = pulsarClient;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
this.producer = producer;
numMsgsSent = new LongAdder();
numBytesSent = new LongAdder();
numSendFailed = new LongAdder();
numAcksReceived = new LongAdder();
totalMsgsSent = new LongAdder();
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
init(conf);
}
private void init(ProducerConfigurationData conf) {
ObjectWriter w = ObjectMapperFactory.getMapperWithIncludeAlways().writer()
.without(SerializationFeature.FAIL_ON_EMPTY_BEANS);
try {
log.info("Starting Pulsar producer perf with config: {}", w.writeValueAsString(conf));
log.info("Pulsar client config: {}", w.writeValueAsString(pulsarClient.getConfiguration()));
} catch (IOException e) {
log.error("Failed to dump config info", e);
}
stat = (timeout) -> {
if (timeout.isCancelled()) {
return;
}
try {
updateStats();
} catch (Exception e) {
log.error("[{}] [{}]: {}", producer.getTopic(), producer.getProducerName(), e.getMessage());
} finally {
// schedule the next stat info
statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
}
};
oldTime = System.nanoTime();
statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
}
Timeout getStatTimeout() {
return statTimeout;
}
protected void updateStats() {
long now = System.nanoTime();
double elapsed = (now - oldTime) / 1e9;
oldTime = now;
long currentNumMsgsSent = numMsgsSent.sumThenReset();
long currentNumBytesSent = numBytesSent.sumThenReset();
long currentNumSendFailedMsgs = numSendFailed.sumThenReset();
long currentNumAcksReceived = numAcksReceived.sumThenReset();
totalMsgsSent.add(currentNumMsgsSent);
totalBytesSent.add(currentNumBytesSent);
totalSendFailed.add(currentNumSendFailedMsgs);
totalAcksReceived.add(currentNumAcksReceived);
synchronized (ds) {
latencyPctValues = ds.getQuantiles(PERCENTILES);
ds.reset();
}
synchronized (batchSizeDs) {
batchSizePctValues = batchSizeDs.getQuantiles(PERCENTILES);
batchSizeDs.reset();
}
synchronized (msgSizeDs) {
msgSizePctValues = msgSizeDs.getQuantiles(PERCENTILES);
msgSizeDs.reset();
}
sendMsgsRate = currentNumMsgsSent / elapsed;
sendBytesRate = currentNumBytesSent / elapsed;
if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived
| currentNumMsgsSent) != 0) {
for (int i = 0; i < latencyPctValues.length; i++) {
if (Double.isNaN(latencyPctValues[i])) {
latencyPctValues[i] = 0;
}
}
log.info("[{}] [{}] --- Publish throughput: {} msg/s --- {} Mbit/s --- "
+ "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- "
+ "BatchSize: med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - max: {} --- "
+ "MsgSize: med: {} bytes - 95pct: {} bytes - 99pct: {} bytes - 99.9pct: {} bytes "
+ "- max: {} bytes --- "
+ "Ack received rate: {} ack/s --- Failed messages: {} --- Pending messages: {}",
producer.getTopic(),
producer.getProducerName(),
THROUGHPUT_FORMAT.format(sendMsgsRate),
THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
DEC.format(latencyPctValues[0]), DEC.format(latencyPctValues[2]),
DEC.format(latencyPctValues[3]), DEC.format(latencyPctValues[4]),
DEC.format(latencyPctValues[5]),
DEC.format(batchSizePctValues[0]), DEC.format(batchSizePctValues[2]),
DEC.format(batchSizePctValues[3]), DEC.format(batchSizePctValues[4]),
DEC.format(batchSizePctValues[5]),
DEC.format(msgSizePctValues[0]), DEC.format(msgSizePctValues[2]),
DEC.format(msgSizePctValues[3]), DEC.format(msgSizePctValues[4]),
DEC.format(msgSizePctValues[5]),
THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs,
getPendingQueueSize());
}
}
@Override
public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
numMsgsSent.add(numMsgs);
numBytesSent.add(totalMsgsSize);
synchronized (batchSizeDs) {
batchSizeDs.update(numMsgs);
}
synchronized (msgSizeDs) {
msgSizeDs.update(totalMsgsSize);
}
}
@Override
public void incrementSendFailed() {
numSendFailed.increment();
}
@Override
public void incrementSendFailed(long numMsgs) {
numSendFailed.add(numMsgs);
}
@Override
public void incrementNumAcksReceived(long latencyNs) {
numAcksReceived.increment();
synchronized (ds) {
ds.update(TimeUnit.NANOSECONDS.toMillis(latencyNs));
}
}
void reset() {
numMsgsSent.reset();
numBytesSent.reset();
numSendFailed.reset();
numAcksReceived.reset();
totalMsgsSent.reset();
totalBytesSent.reset();
totalSendFailed.reset();
totalAcksReceived.reset();
}
void updateCumulativeStats(ProducerStats stats) {
if (stats == null) {
return;
}
numMsgsSent.add(stats.getNumMsgsSent());
numBytesSent.add(stats.getNumBytesSent());
numSendFailed.add(stats.getNumSendFailed());
numAcksReceived.add(stats.getNumAcksReceived());
totalMsgsSent.add(stats.getTotalMsgsSent());
totalBytesSent.add(stats.getTotalBytesSent());
totalSendFailed.add(stats.getTotalSendFailed());
totalAcksReceived.add(stats.getTotalAcksReceived());
}
@Override
public long getNumMsgsSent() {
return numMsgsSent.longValue();
}
@Override
public long getNumBytesSent() {
return numBytesSent.longValue();
}
@Override
public long getNumSendFailed() {
return numSendFailed.longValue();
}
@Override
public long getNumAcksReceived() {
return numAcksReceived.longValue();
}
public long getTotalMsgsSent() {
return totalMsgsSent.longValue();
}
public long getTotalBytesSent() {
return totalBytesSent.longValue();
}
public long getTotalSendFailed() {
return totalSendFailed.longValue();
}
public long getTotalAcksReceived() {
return totalAcksReceived.longValue();
}
@Override
public double getSendMsgsRate() {
return sendMsgsRate;
}
@Override
public double getSendBytesRate() {
return sendBytesRate;
}
@Override
public double getSendLatencyMillis50pct() {
return latencyPctValues[0];
}
@Override
public double getSendLatencyMillis75pct() {
return latencyPctValues[1];
}
@Override
public double getSendLatencyMillis95pct() {
return latencyPctValues[2];
}
@Override
public double getSendLatencyMillis99pct() {
return latencyPctValues[3];
}
@Override
public double getSendLatencyMillis999pct() {
return latencyPctValues[4];
}
@Override
public double getSendLatencyMillisMax() {
return latencyPctValues[5];
}
@Override
public int getPendingQueueSize() {
return producer.getPendingQueueSize();
}
public void cancelStatsTimeout() {
this.updateStats();
if (statTimeout != null) {
statTimeout.cancel();
statTimeout = null;
}
}
private static final Logger log = LoggerFactory.getLogger(ProducerStatsRecorderImpl.class);
}