DL-85: Publish both per-stream stats and aggregation stats

merge twittter's changes from Jordan Bull.

Author: Jordan Bull <jbull@twitter.com>

Reviewers: Sijie Guo <sijie@apache.org>, Franck Cuny <fcuny@apache.org>

Closes #57 from sijie/merge/DL-85
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index de475ea..751e972 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -132,6 +132,7 @@
     // Stats
     private final StatsLogger statsLogger;
     private final StatsLogger perStreamStatsLogger;
+    private final StreamPartitionConverter streamPartitionConverter;
     private final StreamOpStats streamOpStats;
     private final Counter bulkWritePendingStat;
     private final Counter writePendingStat;
@@ -158,6 +159,7 @@
         this.perStreamStatsLogger = perStreamStatsLogger;
         this.dlsnVersion = serverConf.getDlsnVersion();
         this.serverRegionId = serverConf.getRegionId();
+        this.streamPartitionConverter = converter;
         int serverPort = serverConf.getServerPort();
         int shard = serverConf.getServerShardId();
         int numThreads = serverConf.getServerThreads();
@@ -396,8 +398,8 @@
     public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
         bulkWritePendingStat.inc();
         receivedRecordCounter.add(data.size());
-        BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, getChecksum(ctx),
-            featureChecksumDisabled, accessControlManager);
+        BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+            getChecksum(ctx), featureChecksumDisabled, accessControlManager);
         executeStreamOp(op);
         return op.result().ensure(new Function0<BoxedUnit>() {
             public BoxedUnit apply() {
@@ -675,8 +677,9 @@
                        ByteBuffer data,
                        Long checksum,
                        boolean isRecordSet) {
-        return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, serverConfig, dlsnVersion,
-            checksum, isRecordSet, featureChecksumDisabled, accessControlManager);
+        return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+            serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
+            accessControlManager);
     }
 
     @VisibleForTesting
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
index 96a37cd..c009bb9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
@@ -33,6 +33,8 @@
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
@@ -88,6 +90,7 @@
                        List<ByteBuffer> buffers,
                        StatsLogger statsLogger,
                        StatsLogger perStreamStatsLogger,
+                       StreamPartitionConverter streamPartitionConverter,
                        Long checksum,
                        Feature checksumDisabledFeature,
                        AccessControlManager accessControlManager) {
@@ -100,6 +103,7 @@
         }
         this.payloadSize = total;
 
+        final Partition partition = streamPartitionConverter.convert(stream);
         // Write record stats
         StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
         this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
@@ -107,8 +111,8 @@
         this.failureRecordCounter = streamOpStats.recordsCounter("failure");
         this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
         this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
-        this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "bulkWrite");
-        this.bytes = streamOpStats.streamRequestCounter(stream, "bulkWrite", "bytes");
+        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite");
+        this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes");
 
         this.accessControlManager = accessControlManager;
 
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 04f793f..45630fe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -194,12 +194,12 @@
         this.dynConf = streamConf;
         StatsLogger limiterStatsLogger = BroadCastStatsLogger.two(
             streamOpStats.baseScope("stream_limiter"),
-            streamOpStats.streamRequestScope(name, "limiter"));
+            streamOpStats.streamRequestScope(partition, "limiter"));
         this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled);
         this.requestTimer = requestTimer;
 
         // Stats
-        this.streamLogger = streamOpStats.streamRequestStatsLogger(name);
+        this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
         this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
         this.streamExceptionStatLogger = streamLogger.scope("exceptions");
         this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
index adf89a3..bfbc88c 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
@@ -17,6 +17,8 @@
  */
 package com.twitter.distributedlog.service.stream;
 
+import com.twitter.distributedlog.stats.BroadCastStatsLogger;
+import com.twitter.distributedlog.service.streamset.Partition;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -81,19 +83,22 @@
         return recordsStatsLogger.getCounter(counterName);
     }
 
-    public StatsLogger streamRequestStatsLogger(String streamName) {
-        return streamStatsLogger.scope(streamName);
+    public StatsLogger streamRequestStatsLogger(Partition partition) {
+        return BroadCastStatsLogger.masterslave(
+            streamStatsLogger.scope(partition.getStream()).scope("partition")
+                .scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream())
+                .scope("aggregate"));
     }
 
-    public StatsLogger streamRequestScope(String streamName, String scopeName) {
-        return streamRequestStatsLogger(streamName).scope(scopeName);
+    public StatsLogger streamRequestScope(Partition partition, String scopeName) {
+        return streamRequestStatsLogger(partition).scope(scopeName);
     }
 
-    public OpStatsLogger streamRequestLatencyStat(String streamName, String opName) {
-        return streamRequestStatsLogger(streamName).getOpStatsLogger(opName);
+    public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) {
+        return streamRequestStatsLogger(partition).getOpStatsLogger(opName);
     }
 
-    public Counter streamRequestCounter(String streamName, String opName, String counterName) {
-        return streamRequestScope(streamName, opName).getCounter(counterName);
+    public Counter streamRequestCounter(Partition partition, String opName, String counterName) {
+        return streamRequestScope(partition, opName).getCounter(counterName);
     }
 }
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
index c74f2cd..e9f2f4e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
@@ -26,6 +26,8 @@
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
@@ -67,6 +69,7 @@
                    ByteBuffer data,
                    StatsLogger statsLogger,
                    StatsLogger perStreamStatsLogger,
+                   StreamPartitionConverter streamPartitionConverter,
                    ServerConfiguration conf,
                    byte dlsnVersion,
                    Long checksum,
@@ -78,14 +81,15 @@
         data.get(payload);
         this.isRecordSet = isRecordSet;
 
+        final Partition partition = streamPartitionConverter.convert(stream);
         StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
         this.successRecordCounter = streamOpStats.recordsCounter("success");
         this.failureRecordCounter = streamOpStats.recordsCounter("failure");
         this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
         this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write");
         this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes");
-        this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "write");
-        this.bytes = streamOpStats.streamRequestCounter(stream, "write", "bytes");
+        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write");
+        this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes");
 
         this.dlsnVersion = dlsnVersion;
         this.accessControlManager = accessControlManager;
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
index f207eee..d199f88 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
@@ -57,6 +57,14 @@
         return id;
     }
 
+    /**
+     * Get the 6 digit 0 padded id of this partition as a String.
+     * @return partition id
+     */
+    public String getPaddedId() {
+        return String.format("%06d", getId());
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
index 94e8755..63723ef 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
@@ -106,9 +106,9 @@
         int numRead = 0;
         LogRecord r = reader.readNext(false);
         while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
             ++numRead;
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead, i);
             r = reader.readNext(false);
         }
         assertEquals(10, numRead);
@@ -121,7 +121,7 @@
      */
     @Test(timeout = 60000)
     public void testChecksumFlag() throws Exception {
-        String name = "dlserver-basic-write";
+        String name = "testChecksumFlag";
         LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
         routingService.addHost(name, dlServer.getAddress());
         DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
@@ -134,7 +134,7 @@
                 .connectionTimeout(Duration.fromSeconds(1))
                 .requestTimeout(Duration.fromSeconds(60)))
             .checksum(false);
-        DistributedLogClient dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+        DistributedLogClient dlClient = dlClientBuilder.build();
         Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
         dlClient.close();
 
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index ed456b9..17fae4a 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -600,6 +600,7 @@
             ByteBuffer.wrap("test".getBytes()),
             new NullStatsLogger(),
             new NullStatsLogger(),
+            new IdentityStreamPartitionConverter(),
             new ServerConfiguration(),
             (byte)0,
             checksum,
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
index 93d900f..41b4c69 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
@@ -24,7 +24,7 @@
 import com.twitter.distributedlog.exceptions.InternalServerException;
 import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
-import com.twitter.distributedlog.service.stream.WriteOp;
+import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.StatusCode;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
@@ -67,6 +67,7 @@
             ByteBuffer.wrap("test".getBytes()),
             new NullStatsLogger(),
             new NullStatsLogger(),
+            new IdentityStreamPartitionConverter(),
             new ServerConfiguration(),
             (byte)0,
             null,