CASSANDRASC-68: Publish bytes streamed and written metrics

Patch by Saranya Krishnakumar; Reviewed by Dinesh Joshi, Yifan Cai for CASSANDRASC-68
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 6611efb..013a356 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -66,8 +66,8 @@
     steps:
      - checkout
 
-     - run: BRANCHES="cassandra-4.0 cassandra-4.1" scripts/build-dtest-jars.sh
-     - run: echo 'export DTEST_JAR=$(cd dtest-jars && find . -name 'dtest-4.1*.jar' -maxdepth 1)' >> "$BASH_ENV"
+     - run: CASSANDRA_SHAS="d13b3ef61b9afbd04878c988c7b722507674228c 8666265521c97a5e726c9d38762028a14325e4dc" BRANCHES="cassandra-4.0 cassandra-4.1" scripts/build-dtest-jars.sh
+     - run: echo 'export DTEST_JAR=$(find dtest-jars -name 'dtest-4.1*.jar' -type f -maxdepth 1 -exec basename {} \;)' >> "$BASH_ENV"
      - run: ./gradlew --info check --stacktrace -Dcassandra.sidecar.versions_to_test="4.0,4.1"
 
      - store_artifacts:
@@ -85,7 +85,8 @@
     steps:
       - checkout
 
-      - run: CASSANDRA_USE_JDK11=true scripts/build-dtest-jars.sh
+      - run: CASSANDRA_SHAS="d13b3ef61b9afbd04878c988c7b722507674228c cbaef9094e83364e6812c65b8411ff7dbffaf9c6" BRANCHES="cassandra-4.0 trunk" CASSANDRA_USE_JDK11=true scripts/build-dtest-jars.sh
+      - run: echo 'export DTEST_JAR=$(find dtest-jars -name 'dtest-5.*.jar' -type f -maxdepth 1 -exec basename {} \;)' >> "$BASH_ENV"
       - run: ./gradlew --info check --stacktrace
 
       - store_artifacts:
@@ -159,3 +160,4 @@
           requires:
             - java8
             - java11
+
diff --git a/CHANGES.txt b/CHANGES.txt
index be2fd52..5453a0e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Publish bytes streamed and written metrics (CASSANDRASC-68)
  * Extract the in-jvm dtest template for use in other projects (CASSANDRASC-55)
  * Fix relocation of native libraries for vertx-client-shaded (CASSANDRASC-67)
  * Enrich RetriesExhaustedException to have more information for better visibility (CASSANDRASC-65)
diff --git a/build.gradle b/build.gradle
index a76ff66..6a4fe1d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -53,7 +53,7 @@
 }
 
 
-ext.dtestJar = System.getenv("DTEST_JAR") ?: "dtest-5.0.jar" // trunk is currently 5.0.jar - update when trunk moves
+ext.dtestJar = System.getenv("DTEST_JAR") ?: "dtest-5.1.jar" // trunk is currently 5.0.jar - update when trunk moves
 println("Using DTest jar: ${ext.dtestJar}")
 
 // Force checkstyle, rat, and spotBugs to run before test tasks for faster feedback
diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh
index 9f4d79c..07742ac 100755
--- a/scripts/build-dtest-jars.sh
+++ b/scripts/build-dtest-jars.sh
@@ -18,7 +18,8 @@
 #
 
 set -xe
-BRANCHES=${BRANCHES:-cassandra-4.0 trunk}
+BRANCHES=( ${BRANCHES:-cassandra-4.0 trunk} )
+CASSANDRA_SHAS_ARRAY=( ${CASSANDRA_SHAS} )
 REPO=${REPO:-"https://github.com/apache/cassandra.git"}
 SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
 DTEST_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dtest-jars"
@@ -31,17 +32,33 @@
 REPO_HOST=$(get_hostname "${REPO}")
 ssh-keyscan "${REPO_HOST}" >> ~/.ssh/known_hosts || true
 
-for branch in $BRANCHES; do
+for index in "${!BRANCHES[@]}"; do
   cd "${BUILD_DIR}"
+  branch=${BRANCHES[$index]}
+  sha=${CASSANDRA_SHAS_ARRAY[$index]}
+  echo "index ${index} branch ${branch} sha ${sha}"
   # check out the correct cassandra version:
   if [ ! -d "${branch}" ] ; then
-    git clone --depth 1 --single-branch --branch "${branch}" "${REPO}" "${branch}"
-    cd "${branch}"
+    if [ -n "${sha}" ] ; then
+      mkdir -p "${branch}"
+      cd "${branch}"
+      git init
+      git remote add upstream "${REPO}"
+      git fetch --depth=1 upstream "${sha}"
+      git reset --hard FETCH_HEAD
+    else
+      git clone --depth 1 --single-branch --branch "${branch}" "${REPO}" "${branch}"
+      cd "${branch}"
+    fi
   else
     cd "${branch}"
-    git pull
+    if [ -z "${sha}" ] ; then
+      git pull
+    fi
   fi
-  git checkout "${branch}"
+  if [ -z "${sha}" ] ; then
+    git checkout "${branch}"
+  fi
   git clean -fd
   CASSANDRA_VERSION=$(cat build.xml | grep 'property name="base.version"' | awk -F "\"" '{print $4}')
   # Loop to prevent failure due to maven-ant-tasks not downloading a jar.
diff --git a/src/main/java/org/apache/cassandra/sidecar/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
index 97b6554..66b2c81 100644
--- a/src/main/java/org/apache/cassandra/sidecar/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
@@ -64,6 +64,7 @@
 import org.apache.cassandra.sidecar.routes.sstableuploads.SSTableCleanupHandler;
 import org.apache.cassandra.sidecar.routes.sstableuploads.SSTableImportHandler;
 import org.apache.cassandra.sidecar.routes.sstableuploads.SSTableUploadHandler;
+import org.apache.cassandra.sidecar.stats.SidecarStats;
 import org.apache.cassandra.sidecar.utils.ChecksumVerifier;
 import org.apache.cassandra.sidecar.utils.MD5ChecksumVerifier;
 import org.apache.cassandra.sidecar.utils.TimeProvider;
@@ -304,4 +305,11 @@
     {
         return new SidecarVersionProvider("/sidecar.version");
     }
+
+    @Provides
+    @Singleton
+    public SidecarStats sidecarStats()
+    {
+        return SidecarStats.INSTANCE;
+    }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index 5497b4d..4a55101 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -39,6 +39,8 @@
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.data.SSTableUploadRequest;
 import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.stats.SSTableStats;
+import org.apache.cassandra.sidecar.stats.SidecarStats;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.SSTableUploader;
 import org.apache.cassandra.sidecar.utils.SSTableUploadsPathBuilder;
@@ -57,6 +59,7 @@
     private final SSTableUploader uploader;
     private final SSTableUploadsPathBuilder uploadPathBuilder;
     private final ConcurrencyLimiter limiter;
+    private final SSTableStats stats;
 
     /**
      * Constructs a handler with the provided params.
@@ -68,6 +71,7 @@
      * @param uploadPathBuilder a class that provides SSTableUploads directories
      * @param executorPools     executor pools for blocking executions
      * @param validator         a validator instance to validate Cassandra-specific input
+     * @param sidecarStats      an interface holding all stats related to main sidecar process
      */
     @Inject
     protected SSTableUploadHandler(Vertx vertx,
@@ -76,7 +80,8 @@
                                    SSTableUploader uploader,
                                    SSTableUploadsPathBuilder uploadPathBuilder,
                                    ExecutorPools executorPools,
-                                   CassandraInputValidator validator)
+                                   CassandraInputValidator validator,
+                                   SidecarStats sidecarStats)
     {
         super(metadataFetcher, executorPools, validator);
         this.fs = vertx.fileSystem();
@@ -84,6 +89,7 @@
         this.uploader = uploader;
         this.uploadPathBuilder = uploadPathBuilder;
         this.limiter = new ConcurrencyLimiter(configuration::getConcurrentUploadsLimit);
+        this.stats = sidecarStats.ssTableStats();
     }
 
     /**
@@ -124,6 +130,7 @@
             logger.info("Successfully uploaded SSTable component for request={}, remoteAddress={}, " +
                         "instance={}, sizeInBytes={}, serviceTimeMillis={}",
                         request, remoteAddress, host, fileProps.size(), serviceTimeMillis);
+            stats.onBytesUploaded(fileProps.size());
             context.json(new SSTableUploadResponse(request.uploadId(), fileProps.size(), serviceTimeMillis));
         })
         .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request));
diff --git a/src/main/java/org/apache/cassandra/sidecar/stats/SSTableStats.java b/src/main/java/org/apache/cassandra/sidecar/stats/SSTableStats.java
new file mode 100644
index 0000000..55921e8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/stats/SSTableStats.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.stats;
+
+/**
+ * Interface to hold statistics specific to SSTable related operations
+ */
+public interface SSTableStats
+{
+    SSTableStats INSTANCE = new SSTableStats()
+    {
+    };
+
+    default void onBytesUploaded(long byteCount)
+    {
+        // no op
+    }
+
+    default void onBytesStreamed(long byteCount)
+    {
+        // no op
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/stats/SidecarStats.java b/src/main/java/org/apache/cassandra/sidecar/stats/SidecarStats.java
new file mode 100644
index 0000000..a387c79
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/stats/SidecarStats.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.stats;
+
+/**
+ * Interface to collect statistics related to main sidecar process
+ */
+public interface SidecarStats
+{
+    SidecarStats INSTANCE = new SidecarStats()
+    {
+    };
+
+    default SSTableStats ssTableStats()
+    {
+        return SSTableStats.INSTANCE;
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
index 8a66eec..369d238 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
@@ -21,6 +21,7 @@
 import java.time.Duration;
 import java.time.Instant;
 import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
 
 import com.google.common.util.concurrent.SidecarRateLimiter;
 import org.slf4j.Logger;
@@ -36,8 +37,9 @@
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.stats.SSTableStats;
+import org.apache.cassandra.sidecar.stats.SidecarStats;
 
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
 import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS;
 
@@ -53,13 +55,16 @@
     private final ExecutorPools executorPools;
     private final Configuration config;
     private final SidecarRateLimiter rateLimiter;
+    private final SSTableStats stats;
 
     @Inject
-    public FileStreamer(ExecutorPools executorPools, Configuration config, SidecarRateLimiter rateLimiter)
+    public FileStreamer(ExecutorPools executorPools, Configuration config, SidecarRateLimiter rateLimiter,
+                        SidecarStats stats)
     {
         this.executorPools = executorPools;
         this.config = config;
         this.rateLimiter = rateLimiter;
+        this.stats = stats.ssTableStats();
     }
 
     /**
@@ -123,6 +128,7 @@
                                {
                                    LOGGER.debug("Streamed file {} successfully to client {}. Instance: {}", filename,
                                                 response.remoteAddress(), response.host());
+                                   stats.onBytesStreamed(fileLength);
                                    promise.complete();
                                })
                     .onFailure(promise::fail);
diff --git a/src/test/integration/org/apache/cassandra/testing/TestVersionSupplier.java b/src/test/integration/org/apache/cassandra/testing/TestVersionSupplier.java
index 46c7958..1f69472 100644
--- a/src/test/integration/org/apache/cassandra/testing/TestVersionSupplier.java
+++ b/src/test/integration/org/apache/cassandra/testing/TestVersionSupplier.java
@@ -35,7 +35,7 @@
     Stream<TestVersion> testVersions()
     {
         // By default, we test 2 versions that will exercise oldest and newest supported versions
-        String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0,5.0");
+        String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0,5.1");
         return Arrays.stream(versions.split(",")).map(String::trim).map(TestVersion::new);
     }
 }