DL-85: Publish both per-stream stats and aggregation stats
merge twittter's changes from Jordan Bull.
Author: Jordan Bull <jbull@twitter.com>
Reviewers: Sijie Guo <sijie@apache.org>, Franck Cuny <fcuny@apache.org>
Closes #57 from sijie/merge/DL-85
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index de475ea..751e972 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -132,6 +132,7 @@
// Stats
private final StatsLogger statsLogger;
private final StatsLogger perStreamStatsLogger;
+ private final StreamPartitionConverter streamPartitionConverter;
private final StreamOpStats streamOpStats;
private final Counter bulkWritePendingStat;
private final Counter writePendingStat;
@@ -158,6 +159,7 @@
this.perStreamStatsLogger = perStreamStatsLogger;
this.dlsnVersion = serverConf.getDlsnVersion();
this.serverRegionId = serverConf.getRegionId();
+ this.streamPartitionConverter = converter;
int serverPort = serverConf.getServerPort();
int shard = serverConf.getServerShardId();
int numThreads = serverConf.getServerThreads();
@@ -396,8 +398,8 @@
public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
bulkWritePendingStat.inc();
receivedRecordCounter.add(data.size());
- BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, getChecksum(ctx),
- featureChecksumDisabled, accessControlManager);
+ BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+ getChecksum(ctx), featureChecksumDisabled, accessControlManager);
executeStreamOp(op);
return op.result().ensure(new Function0<BoxedUnit>() {
public BoxedUnit apply() {
@@ -675,8 +677,9 @@
ByteBuffer data,
Long checksum,
boolean isRecordSet) {
- return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, serverConfig, dlsnVersion,
- checksum, isRecordSet, featureChecksumDisabled, accessControlManager);
+ return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+ serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
+ accessControlManager);
}
@VisibleForTesting
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
index 96a37cd..c009bb9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
@@ -33,6 +33,8 @@
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.StatusCode;
@@ -88,6 +90,7 @@
List<ByteBuffer> buffers,
StatsLogger statsLogger,
StatsLogger perStreamStatsLogger,
+ StreamPartitionConverter streamPartitionConverter,
Long checksum,
Feature checksumDisabledFeature,
AccessControlManager accessControlManager) {
@@ -100,6 +103,7 @@
}
this.payloadSize = total;
+ final Partition partition = streamPartitionConverter.convert(stream);
// Write record stats
StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
@@ -107,8 +111,8 @@
this.failureRecordCounter = streamOpStats.recordsCounter("failure");
this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
- this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "bulkWrite");
- this.bytes = streamOpStats.streamRequestCounter(stream, "bulkWrite", "bytes");
+ this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite");
+ this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes");
this.accessControlManager = accessControlManager;
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 04f793f..45630fe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -194,12 +194,12 @@
this.dynConf = streamConf;
StatsLogger limiterStatsLogger = BroadCastStatsLogger.two(
streamOpStats.baseScope("stream_limiter"),
- streamOpStats.streamRequestScope(name, "limiter"));
+ streamOpStats.streamRequestScope(partition, "limiter"));
this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled);
this.requestTimer = requestTimer;
// Stats
- this.streamLogger = streamOpStats.streamRequestStatsLogger(name);
+ this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
this.streamExceptionStatLogger = streamLogger.scope("exceptions");
this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
index adf89a3..bfbc88c 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
@@ -17,6 +17,8 @@
*/
package com.twitter.distributedlog.service.stream;
+import com.twitter.distributedlog.stats.BroadCastStatsLogger;
+import com.twitter.distributedlog.service.streamset.Partition;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -81,19 +83,22 @@
return recordsStatsLogger.getCounter(counterName);
}
- public StatsLogger streamRequestStatsLogger(String streamName) {
- return streamStatsLogger.scope(streamName);
+ public StatsLogger streamRequestStatsLogger(Partition partition) {
+ return BroadCastStatsLogger.masterslave(
+ streamStatsLogger.scope(partition.getStream()).scope("partition")
+ .scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream())
+ .scope("aggregate"));
}
- public StatsLogger streamRequestScope(String streamName, String scopeName) {
- return streamRequestStatsLogger(streamName).scope(scopeName);
+ public StatsLogger streamRequestScope(Partition partition, String scopeName) {
+ return streamRequestStatsLogger(partition).scope(scopeName);
}
- public OpStatsLogger streamRequestLatencyStat(String streamName, String opName) {
- return streamRequestStatsLogger(streamName).getOpStatsLogger(opName);
+ public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) {
+ return streamRequestStatsLogger(partition).getOpStatsLogger(opName);
}
- public Counter streamRequestCounter(String streamName, String opName, String counterName) {
- return streamRequestScope(streamName, opName).getCounter(counterName);
+ public Counter streamRequestCounter(Partition partition, String opName, String counterName) {
+ return streamRequestScope(partition, opName).getCounter(counterName);
}
}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
index c74f2cd..e9f2f4e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
@@ -26,6 +26,8 @@
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.StatusCode;
@@ -67,6 +69,7 @@
ByteBuffer data,
StatsLogger statsLogger,
StatsLogger perStreamStatsLogger,
+ StreamPartitionConverter streamPartitionConverter,
ServerConfiguration conf,
byte dlsnVersion,
Long checksum,
@@ -78,14 +81,15 @@
data.get(payload);
this.isRecordSet = isRecordSet;
+ final Partition partition = streamPartitionConverter.convert(stream);
StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
this.successRecordCounter = streamOpStats.recordsCounter("success");
this.failureRecordCounter = streamOpStats.recordsCounter("failure");
this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write");
this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes");
- this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "write");
- this.bytes = streamOpStats.streamRequestCounter(stream, "write", "bytes");
+ this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write");
+ this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes");
this.dlsnVersion = dlsnVersion;
this.accessControlManager = accessControlManager;
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
index f207eee..d199f88 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
@@ -57,6 +57,14 @@
return id;
}
+ /**
+ * Get the 6 digit 0 padded id of this partition as a String.
+ * @return partition id
+ */
+ public String getPaddedId() {
+ return String.format("%06d", getId());
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
index 94e8755..63723ef 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
@@ -106,9 +106,9 @@
int numRead = 0;
LogRecord r = reader.readNext(false);
while (null != r) {
- int i = Integer.parseInt(new String(r.getPayload()));
- assertEquals(numRead + 1, i);
++numRead;
+ int i = Integer.parseInt(new String(r.getPayload()));
+ assertEquals(numRead, i);
r = reader.readNext(false);
}
assertEquals(10, numRead);
@@ -121,7 +121,7 @@
*/
@Test(timeout = 60000)
public void testChecksumFlag() throws Exception {
- String name = "dlserver-basic-write";
+ String name = "testChecksumFlag";
LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
routingService.addHost(name, dlServer.getAddress());
DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
@@ -134,7 +134,7 @@
.connectionTimeout(Duration.fromSeconds(1))
.requestTimeout(Duration.fromSeconds(60)))
.checksum(false);
- DistributedLogClient dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+ DistributedLogClient dlClient = dlClientBuilder.build();
Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
dlClient.close();
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index ed456b9..17fae4a 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -600,6 +600,7 @@
ByteBuffer.wrap("test".getBytes()),
new NullStatsLogger(),
new NullStatsLogger(),
+ new IdentityStreamPartitionConverter(),
new ServerConfiguration(),
(byte)0,
checksum,
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
index 93d900f..41b4c69 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
@@ -24,7 +24,7 @@
import com.twitter.distributedlog.exceptions.InternalServerException;
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.service.config.ServerConfiguration;
-import com.twitter.distributedlog.service.stream.WriteOp;
+import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.Sequencer;
@@ -67,6 +67,7 @@
ByteBuffer.wrap("test".getBytes()),
new NullStatsLogger(),
new NullStatsLogger(),
+ new IdentityStreamPartitionConverter(),
new ServerConfiguration(),
(byte)0,
null,