Add tests to cover ClientRequest metrics

patch by Adam Holmberg; reviewed by Andrés de la Peña for CASSANDRA-16183
diff --git a/client_request_metrics_test.py b/client_request_metrics_test.py
new file mode 100644
index 0000000..6c5ef4d
--- /dev/null
+++ b/client_request_metrics_test.py
@@ -0,0 +1,787 @@
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from functools import partial
+from itertools import repeat
+import pytest
+import time
+
+from dtest import Tester, create_ks
+from tools.jmxutils import (JolokiaAgent, make_mbean)
+
+from cassandra import (ReadFailure, ReadTimeout, Unavailable,
+                       WriteFailure, WriteTimeout,
+                       ConsistencyLevel as CL)
+from cassandra.concurrent import execute_concurrent_with_args
+from cassandra.marshal import int32_pack
+from cassandra.policies import FallthroughRetryPolicy
+from cassandra.query import SimpleStatement
+
+since = pytest.mark.since
+
+jmx = None
+
+KEYSPACE = 'ks'
+FAIL_WRITE_KEYSPACE = 'fail_keyspace'
+VIEW_KEYSPACE = 'view_keyspace'
+TABLE = 't'
+VIEW = 'mv'
+TOMBSTONE_FAILURE_THRESHOLD = 20
+TOMBSTONE_FAIL_KEY = 10000001
+JVM_ARGS = [f"-Dcassandra.test.fail_writes_ks={FAIL_WRITE_KEYSPACE}"]
+
+
+class NoException(BaseException):
+    pass
+
+
+@since('4.0')
+class TestClientRequestMetrics(Tester):
+
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            'Testing write failures',  # The error to simulate a write failure
+            'ERROR WRITE_FAILURE',  # Logged in DEBUG mode for write failures
+            f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} tombstones during query"  # Caused by the read failure tests
+        )
+
+    def setup_once(self):
+        cluster = self.cluster
+        cluster.set_configuration_options({'read_request_timeout_in_ms': 3000,
+                                           'write_request_timeout_in_ms': 3000,
+                                           'phi_convict_threshold': 12,
+                                           'tombstone_failure_threshold': TOMBSTONE_FAILURE_THRESHOLD,
+                                           'enable_materialized_views': 'true'})
+        cluster.populate(2, debug=True)
+        cluster.start(jvm_args=JVM_ARGS)
+        node1 = cluster.nodelist()[0]
+
+        global jmx
+        jmx = JolokiaAgent(node1)
+        jmx.start()
+
+        s = self.session = self.patient_exclusive_cql_connection(node1, retry_policy=FallthroughRetryPolicy(), request_timeout=30)
+        for k in [KEYSPACE, FAIL_WRITE_KEYSPACE]:
+            create_ks(s, k, 2)
+            s.execute(f"CREATE TABLE {k}.{TABLE} (k int, c int, v int, PRIMARY KEY (k,c))")
+
+        create_ks(s, VIEW_KEYSPACE, 1)
+        s.execute(f"CREATE TABLE {VIEW_KEYSPACE}.{TABLE} (k int, c int, v int, PRIMARY KEY (k,c))")
+        s.execute(f"CREATE MATERIALIZED VIEW {VIEW_KEYSPACE}.{VIEW} AS SELECT * FROM {TABLE} WHERE c IS NOT NULL AND k IS NOT NULL PRIMARY KEY (c,k);")
+
+        # Here we're doing a series of deletions in order to create enough tombstones to exceed the configured fail threshold.
+        # This partition will be used to test read failures.
+        for c in range(TOMBSTONE_FAILURE_THRESHOLD + 1):
+            self.session.execute(f"DELETE FROM {KEYSPACE}.{TABLE} WHERE k={TOMBSTONE_FAIL_KEY} AND c={c}")
+
+        node1.watch_log_for("Created default superuser role 'cassandra'")  # don't race with async default role creation, which creates a write
+        node1.watch_log_for('Completed submission of build tasks for any materialized views defined at startup', filename='debug.log')  # view builds cause background reads
+
+    def test_client_request_metrics(self):
+        # this is written as a single test method in order to reuse the same cluster for all tests
+        # setup_once configures and starts the cluster with all schema and preconditions required by all tests.
+        self.setup_once()
+
+        self.write_nominal()
+        self.read_nominal()
+
+        self.write_failures()
+        self.write_unavailables()
+        self.write_timeouts()
+
+        self.read_failures()
+        self.read_unavailables()
+        self.read_timeouts()
+
+        self.range_slice_failures()
+        self.range_slice_unavailables()
+        self.range_slice_timeouts()
+
+        self.view_writes()
+
+        self.cas_read()
+        self.cas_read_contention()
+        self.cas_read_failures()
+        self.cas_read_unavailables()
+        self.cas_read_timeouts()
+
+        self.cas_write()
+        self.cas_write_contention()
+        self.cas_write_unavailables()
+        self.cas_write_timeouts()
+        self.cas_write_condition_not_met()
+
+    def write_nominal(self):
+        query_count = 5
+        global_diff, cl_diff = self.validate_nominal('Write',
+                                                     WriteMetrics,
+                                                     SimpleStatement(
+                                                         f"INSERT INTO {KEYSPACE}.{TABLE} (k,c) VALUES (0,0)",
+                                                         consistency_level=CL.ONE),
+                                                     query_count)
+
+        assert global_diff['MutationSizeHistogram.Count'] == query_count
+
+    def read_nominal(self):
+        query_count = 5
+        self.validate_nominal('Read',
+                              ClientRequestMetrics,
+                              SimpleStatement(f"SELECT k FROM {KEYSPACE}.{TABLE} WHERE k=0 AND c=0",
+                                              consistency_level=CL.LOCAL_ONE),
+                              query_count)
+
+    def validate_nominal(self, global_scope, metric_class, statement, query_count):
+        query_cl = statement.consistency_level
+        cl_scope = scope_for_cl(global_scope, query_cl)
+        # Validate other CLs metrics are not changing. We're not messing with Quorum because of the async write coming from
+        # role creation at startup.
+        # If the test takes long enough, it's possible to see some of the time-based metrics shift.
+        other_cls = [c for c in CL.value_to_name if c not in (query_cl, CL.QUORUM)]
+        global_baseline = metric_class(global_scope)
+        cl_baseline = metric_class(cl_scope)
+        other_baselines = [metric_class(scope_for_cl(global_scope, c)) for c in other_cls]
+
+        global_baseline.validate()
+        cl_baseline.validate()
+        for b in other_baselines:
+            b.validate()
+
+        for _ in range(query_count):
+            self.session.execute(statement)
+
+        global_updated = metric_class(global_scope)
+        cl_updated = metric_class(cl_scope)
+        global_updated.validate()
+        cl_updated.validate()
+
+        global_diff = global_updated.diff(global_baseline)
+        cl_diff = cl_updated.diff(cl_baseline)
+        for diff in [global_diff, cl_diff]:
+            assert diff['TotalLatency.Count'] > 0, diff.scope
+            assert diff['Latency.Count'] == query_count, diff.scope
+
+        other_updated = [metric_class(scope_for_cl(global_scope, c)) for c in other_cls]
+        for updated, baseline in zip(other_updated, other_baselines):
+            assert 'Latency.Count' not in updated.diff(baseline), updated.scope
+
+        return global_diff, cl_diff
+
+    def write_failures(self):
+        query_count = 2
+        diff = self.write_failures_variant('Write', 'WHERE k=0 AND c=0',
+                                           query_count,
+                                           self.validate_exception_metric_with_cl,
+                                           WriteMetrics)
+        assert diff['MutationSizeHistogram.Count'] == query_count
+
+    def write_failures_variant(self, scope, constraint, query_count, validator, metric_class):
+        query_cl = CL.ONE
+        diff = validator(scope,
+                         metric_class,
+                         SimpleStatement(
+                             f"UPDATE {FAIL_WRITE_KEYSPACE}.{TABLE} SET v=0 {constraint}",
+                             consistency_level=query_cl),
+                         query_count,
+                         'Failures',
+                         WriteFailure
+                         )
+        return diff
+
+    def write_unavailables(self):
+        # THREE will be unavailable since RF=2
+        query_cl = CL.THREE
+        query_count = 5
+        diff = self.validate_exception_metric_with_cl('Write',
+                                                      WriteMetrics,
+                                                      SimpleStatement(
+                                                          f"UPDATE {KEYSPACE}.{TABLE} SET v=0 WHERE k=0 AND c=0",
+                                                          consistency_level=query_cl),
+                                                      query_count,
+                                                      'Unavailables',
+                                                      Unavailable
+                                                      )
+        assert diff['MutationSizeHistogram.Count'] == query_count
+
+    def write_timeouts(self):
+        query_count = 2
+        diff = self.write_timeouts_variant('Write', 'WHERE k=0 AND c=0',
+                                           query_count,
+                                           self.validate_exception_metric_with_cl,
+                                           WriteMetrics)
+        assert diff['MutationSizeHistogram.Count'] == query_count  # only done in this variant because CAS times out the request before mutation size is known
+
+    def write_timeouts_variant(self, scope, constraint, query_count, validator, metric_class):
+        query_cl = CL.TWO
+        node2 = self.cluster.nodelist()[1]
+        node2.pause()
+        diff = validator(scope,
+                         metric_class,
+                         SimpleStatement(
+                             f"UPDATE {KEYSPACE}.{TABLE} SET v=0 {constraint}",
+                             consistency_level=query_cl),
+                         query_count,
+                         'Timeouts',
+                         WriteTimeout
+                         )
+        node2.resume()
+        return diff
+
+    def read_failures(self):
+        self.read_failures_variant('Read', f"WHERE k={TOMBSTONE_FAIL_KEY}",
+                                   CL.TWO,
+                                   self.validate_exception_metric_with_cl,
+                                   ClientRequestMetrics)
+
+    def read_unavailables(self):
+        self.read_unavailables_variant('Read', 'WHERE k=0 AND c=0',
+                                       self.validate_exception_metric_with_cl)
+
+    def read_unavailables_variant(self, scope, constraint, validator):
+        query_cl = CL.THREE
+        validator(scope,
+                  ClientRequestMetrics,
+                  SimpleStatement(f"SELECT k FROM {KEYSPACE}.{TABLE} {constraint}",
+                                  consistency_level=query_cl),
+                  5,
+                  'Unavailables',
+                  Unavailable
+                  )
+
+    def read_timeouts(self):
+        self.read_timeouts_variant('Read', 'WHERE k=0',
+                                   CL.TWO,
+                                   self.validate_exception_metric_with_cl,
+                                   ClientRequestMetrics)
+
+    def read_timeouts_variant(self, scope, constraint, query_cl, validator, metric_class):
+        node2 = self.cluster.nodelist()[1]
+        node2.pause()
+        validator(scope,
+                  metric_class,
+                  SimpleStatement(f"SELECT k FROM {KEYSPACE}.{TABLE} {constraint}",
+                                  consistency_level=query_cl),
+                  1,
+                  'Timeouts',
+                  ReadTimeout
+                  )
+        node2.resume()
+
+    def range_slice_failures(self):
+        self.read_failures_variant('RangeSlice', '',
+                                   CL.ONE,
+                                   self.validate_metric,
+                                   ClientRequestMetrics)
+
+    def range_slice_unavailables(self):
+        self.read_unavailables_variant('RangeSlice', '',
+                                       self.validate_metric)
+
+    def range_slice_timeouts(self):
+        self.read_timeouts_variant('RangeSlice', f" WHERE TOKEN(k) < TOKEN({TOMBSTONE_FAIL_KEY})",
+                                   CL.TWO,
+                                   self.validate_metric,
+                                   ClientRequestMetrics)
+
+    def view_writes(self):
+        # we need to know where the base table and MV replicas are going to have predictable metrics
+        def ip_of_key(ks, key):
+            hosts = self.session.cluster.metadata.get_replicas(ks, int32_pack(key))
+            host = hosts[0]
+            return host.address
+
+        def key_for(ks, ip):
+            return next(x for x in range(100000) if ip_of_key(ks, x) == ip)
+
+        # base partition and mv partition on this node
+        ks = VIEW_KEYSPACE
+        query_count = 5
+        key = key_for(ks, '127.0.0.1')
+        diff = self.run_collect_view_write_metrics(SimpleStatement(f"INSERT INTO {ks}.{TABLE} (k,c) VALUES ({key},{key})", consistency_level=CL.ONE),
+                                                   query_count)
+        assert diff['Latency.Count'] == query_count
+        assert diff['TotalLatency.Count'] > 0
+
+        # base partition this node, mv partition remote
+        key = key_for(ks, '127.0.0.1')
+        key2 = key_for(ks, '127.0.0.2')
+        diff = self.run_collect_view_write_metrics(SimpleStatement(f"INSERT INTO {ks}.{TABLE} (k,c) VALUES ({key},{key2})", consistency_level=CL.ONE),
+                                                   query_count)
+        assert diff['Latency.Count'] == query_count
+        assert diff['TotalLatency.Count'] > 0
+        assert diff['ViewReplicasAttempted.Count'] == query_count
+        assert diff['ViewReplicasSuccess.Count'] == query_count
+        assert diff['ViewWriteLatency.Count'] == query_count
+
+        # base partition and mv both remote
+        key = key_for(ks, '127.0.0.2')
+        key2 = key_for(ks, '127.0.0.2')
+        diff = self.run_collect_view_write_metrics(SimpleStatement(f"INSERT INTO {ks}.{TABLE} (k,c) VALUES ({key},{key2})", consistency_level=CL.ONE),
+                                                   query_count)
+        assert 'Latency.Count' not in diff
+
+    def run_collect_view_write_metrics(self, statement, query_count):
+        scope = 'ViewWrite'
+        baseline = ViewWriteMetrics(scope)
+        baseline.validate()
+        for _ in range(query_count):
+            self.session.execute(statement)
+
+        # These metrics are not updated synchronously with the request, so we have to use the deterministic 'Count'
+        # to watch for them to settle.
+        sample = ViewWriteMetrics(scope)
+        diff = sample.diff(baseline)
+        while diff and 'Latency.Count' in diff:
+            time.sleep(0.5)
+            last = sample
+            sample = ViewWriteMetrics(scope)
+            diff = sample.diff(last)
+
+        sample.validate()
+
+        return sample.diff(baseline)
+
+    def cas_read(self):
+        self.validate_metric('CASRead',
+                             CASClientRequestMetrics,
+                             SimpleStatement(f"SELECT k FROM {KEYSPACE}.{TABLE} WHERE k=0",
+                                             consistency_level=CL.SERIAL),
+                             2)
+
+    def cas_read_contention(self):
+        self.cas_contention(partial(CASClientRequestMetrics, 'CASRead'),
+                            SimpleStatement(f"SELECT k FROM {KEYSPACE}.{TABLE} WHERE k=0",
+                                            consistency_level=CL.SERIAL))
+
+    def cas_contention(self, metric_factory, statement):
+
+        query_count = 20
+
+        def sample():
+            baseline = metric_factory()
+            baseline.validate()
+
+            execute_concurrent_with_args(self.session,
+                                         statement,
+                                         repeat([], query_count), raise_on_first_error=False)
+
+            updated = metric_factory()
+            updated.validate()
+
+            return updated.diff(baseline)
+
+        for _ in range(10):
+            diff = sample()
+            if 'ContentionHistogram.Count' in diff:
+                break
+
+        assert diff['Latency.Count'] == query_count
+        assert diff['TotalLatency.Count'] > 0
+        assert 0 < diff['ContentionHistogram.Count'] <= query_count
+
+    def cas_read_failures(self):
+        self.read_failures_variant('CASRead', f"WHERE k={TOMBSTONE_FAIL_KEY}",
+                                   CL.SERIAL,
+                                   self.validate_metric,
+                                   CASClientRequestMetrics)
+
+    def read_failures_variant(self, scope, constraint, query_cl, validator, metric_class):
+        validator(scope,
+                  metric_class,
+                  SimpleStatement(f"SELECT k FROM {KEYSPACE}.{TABLE} {constraint}",
+                                  consistency_level=query_cl),
+                  5,
+                  'Failures',
+                  ReadFailure
+                  )
+
+    def cas_read_unavailables(self):
+        ks = KEYSPACE
+        self.cas_unavailables_variant('CASRead',
+                                      CASClientRequestMetrics,
+                                      SimpleStatement(f"SELECT k FROM {ks}.{TABLE} WHERE k=0 AND c=0",
+                                                      consistency_level=CL.SERIAL)
+                                      )
+
+    def cas_unavailables_variant(self, scope, metric_class, statement):
+        # can't use the other variant because we actually need to set a sane CL and stop a node for unavailable.
+        query_count = 5
+        node2 = self.cluster.nodelist()[1]
+        node2.stop()
+        self.validate_metric(scope,
+                             metric_class,
+                             statement,
+                             query_count,
+                             'Unavailables',
+                             Unavailable
+                             )
+        node2.start(jvm_args=JVM_ARGS)
+
+    def cas_read_timeouts(self):
+        self.read_timeouts_variant('CASRead', 'WHERE k=0',
+                                   CL.SERIAL,
+                                   self.validate_metric,
+                                   CASClientRequestMetrics)
+
+    def cas_write(self):
+        self.validate_metric('CASWrite',
+                             CASClientWriteRequestMetrics,
+                             SimpleStatement(f"INSERT INTO {KEYSPACE}.{TABLE} (k,c) VALUES (0,0) IF NOT EXISTS",
+                                             consistency_level=CL.ONE),
+                             2)
+
+    def cas_write_contention(self):
+        self.cas_contention(partial(CASClientWriteRequestMetrics, 'CASWrite'),
+                            SimpleStatement(
+                                f"INSERT INTO {KEYSPACE}.{TABLE} (k,c) VALUES ({new_key()},0) IF NOT EXISTS",
+                                consistency_level=CL.TWO))
+
+    def cas_write_failures(self):
+        query_count = 2
+        diff = self.write_failures_variant('CASWrite', f"WHERE k={new_key()} AND c=0 IF v!=0",
+                                           query_count,
+                                           self.validate_metric,
+                                           CASClientWriteRequestMetrics)
+        # The way we're failing writes causes a StorageProxy::cas to throw before the metric is incremented on each
+        # request after the first one.  We find the previous ballot in-progress and fail trying to commit it.
+        assert diff['MutationSizeHistogram.Count'] == 1
+        assert diff['UnfinishedCommit.Count'] == query_count - 1
+
+    def cas_write_unavailables(self):
+        ks = KEYSPACE
+        self.cas_unavailables_variant('CASWrite',
+                                      CASClientWriteRequestMetrics,
+                                      SimpleStatement(f"UPDATE {ks}.{TABLE} SET v=2 WHERE k=0 AND c=0 IF v!=0",
+                                                      consistency_level=CL.TWO)
+                                      )
+
+    def cas_write_timeouts(self):
+        self.write_timeouts_variant('CASWrite', 'WHERE k=0 AND c=0 IF v!=0',
+                                    2,
+                                    self.validate_metric,
+                                    CASClientWriteRequestMetrics)
+
+    def cas_write_condition_not_met(self):
+        scope = 'CASWrite'
+        baseline = CASClientWriteRequestMetrics(scope)
+        key = new_key()
+        query_count = 5
+        for _ in range(query_count):
+            self.session.execute(f"UPDATE {KEYSPACE}.{TABLE} SET v=0 WHERE k={key} AND c=0 IF v!=0")
+
+        updated = CASClientWriteRequestMetrics(scope)
+        diff = updated.diff(baseline)
+        assert diff['ConditionNotMet.Count'] == query_count - 1
+
+    def validate_exception_metric_with_cl(self, global_scope, metric_class, statement, query_count, secondary_meter, expected_exception):
+        query_cl = statement.consistency_level
+        cl_scope = scope_for_cl(global_scope, query_cl)
+
+        cl_baseline = metric_class(cl_scope)
+        cl_baseline.validate()
+
+        core_diff = self.validate_metric(global_scope, metric_class, statement, query_count, secondary_meter, expected_exception)
+
+        cl_updated = metric_class(cl_scope)
+        cl_diff = cl_updated.diff(cl_baseline)
+        assert cl_diff[f"{secondary_meter}.Count"] == query_count
+        assert cl_diff[f"{secondary_meter}.MeanRate"] > 0
+
+        return core_diff
+
+    def validate_metric(self, scope, metric_class, statement, query_count, secondary_meter=None, expected_exception=NoException):
+        baseline = metric_class(scope)
+
+        for _ in range(query_count):
+            try:
+                self.session.execute(statement)
+                if expected_exception != NoException:
+                    assert False, f"Request did not raise expected exception: {expected_exception}"
+            except expected_exception:
+                pass
+
+        updated = metric_class(scope)
+        diff = updated.diff(baseline)
+        assert diff['Latency.Count'] == query_count
+        if secondary_meter:
+            assert diff[f"{secondary_meter}.Count"] == query_count
+            assert diff[f"{secondary_meter}.MeanRate"] > 0
+
+        return diff
+
+
+##############
+# Utilities
+
+class AbstractPropertyValues(ABC):
+    def __init__(self, scope, name):
+        self.scope = scope
+        self.name = name
+        self.mbean = make_mbean('metrics', type='ClientRequest', scope=scope, name=name)
+        self.values = {}
+        self.init()
+
+    @abstractmethod
+    def init(self):
+        pass
+
+    @abstractmethod
+    def validate(self):
+        pass
+
+    def load(self, attr):
+        self.values[attr] = jmx.read_attribute(self.mbean, attr)
+
+    def diff(self, other):
+        d = {}
+        for k, v in self.values.items():
+            if 'RecentValues' in k:
+                continue
+            v2 = other.values[k]
+            if v != v2:
+                key = f"{self.name}.{k}"
+                d[key] = diff_value(v, v2)
+        return d
+
+
+class Counter(AbstractPropertyValues):
+    def init(self):
+        self.load("Count")
+
+    def validate(self):
+        v = self.values['Count']
+        assert isinstance(v, int), self.mbean
+        assert v >= 0, self.mbean
+
+
+class Meter(AbstractPropertyValues):
+    def init(self):
+        for a in ["Count",
+                  "MeanRate",
+                  "OneMinuteRate",
+                  "FiveMinuteRate",
+                  "FifteenMinuteRate",
+                  "RateUnit"]:
+            self.load(a)
+
+    def validate(self):
+        assert self.values['RateUnit'] == 'events/second'
+        for k, v in self.values.items():
+            if k == 'RateUnit':
+                continue
+            is_non_negative(k, v)
+
+
+stat_words = [
+    'Min',
+    'Max',
+    'Mean',
+    'StdDev',
+    '50thPercentile',
+    '75thPercentile',
+    '95thPercentile',
+    '98thPercentile',
+    '99thPercentile',
+    '999thPercentile',
+    'RecentValues']
+
+
+def validate_stat_values(prefix, values):
+    sample_count = values['Count']
+    if sample_count:
+        validate_sane_latency(prefix, values)
+    else:
+        validate_zero_latency(prefix, values)
+
+
+def validate_sane_latency(prefix, values):
+    validators = defaultdict(lambda: is_positive)
+    validators['RecentValues'] = is_histo_list
+    validators['StdDev'] = is_non_negative
+    validators['Min'] = is_non_negative
+
+    for k, v in ((k, v) for k, v in values.items() if k in stat_words):
+        validators[k](f"{prefix}.{k}", v)
+    assert values['Min'] <= values['Max'], prefix
+    assert values['Min'] <= values['Mean'], prefix
+    assert values['Mean'] <= values['Max'], prefix
+
+    last_pct = values['50thPercentile']
+    for s in ['75th', '95th', '98th', '99th', '999th']:
+        this_pct = values[f"{s}Percentile"]
+        assert this_pct >= last_pct, prefix + ' ' + s
+        last_pct = this_pct
+
+
+def validate_zero_latency(prefix, values):
+    validators = defaultdict(lambda: is_zero)
+    validators['RecentValues'] = is_zero_list
+    validators['Mean'] = is_none
+    validators['DurationUnit'] = is_microseconds
+    for k, v in values.items():
+        validators[k](f"{prefix}{k}", v)
+
+
+class Histogram(AbstractPropertyValues):
+    def init(self):
+        for a in stat_words:
+            self.load(a)
+        self.load('Count')
+
+    def validate(self):
+        validate_stat_values(self.mbean, self.values)
+        is_non_negative(self.mbean, self.values['Count'])
+
+
+class LatencyMetricsTimer(Counter):
+
+    def init(self):
+        super(LatencyMetricsTimer, self).init()
+        for a in stat_words:
+            self.load(a)
+        self.load('DurationUnit')
+
+    def validate(self):
+        validate_stat_values(self.mbean, self.values)
+        is_microseconds(self.mbean, self.values['DurationUnit'])
+
+
+class LatencyMetrics(object):
+    def __init__(self, scope):
+        self.scope = scope
+        self.values = {'TotalLatency': Counter(scope, 'TotalLatency'),
+                       'Latency': LatencyMetricsTimer(scope, 'Latency')}
+
+    def diff(self, other):
+        d = {}
+        for k, v in self.values.items():
+            d.update(other.values[k].diff(v))
+        return d
+
+    def validate(self):
+        for v in self.values.values():
+            v.validate()
+
+
+class ClientRequestMetrics(LatencyMetrics):
+    error_words = ['Failures', 'Timeouts', 'Unavailables']
+
+    def __init__(self, scope):
+        super(ClientRequestMetrics, self).__init__(scope)
+        for mb in self.error_words:
+            self.values[mb] = Meter(scope, mb)
+
+
+class WriteMetrics(ClientRequestMetrics):
+    def __init__(self, scope):
+        super(WriteMetrics, self).__init__(scope)
+        self.values['MutationSizeHistogram'] = Histogram(scope, 'MutationSizeHistogram')
+
+
+class ViewWriteMetrics(ClientRequestMetrics):
+    def __init__(self, scope):
+        super(ViewWriteMetrics, self).__init__(scope)
+        self.values['ViewReplicasAttempted'] = Counter(scope, 'ViewReplicasAttempted')
+        self.values['ViewReplicasSuccess'] = Counter(scope, 'ViewReplicasSuccess')
+        self.values['ViewWriteLatency'] = LatencyMetricsTimer(scope, 'ViewWriteLatency')
+
+    def validate(self):
+        super(ViewWriteMetrics, self).validate()
+        self.values['ViewReplicasAttempted'].validate()
+        self.values['ViewReplicasSuccess'].validate()
+        self.values['ViewWriteLatency'].validate()
+
+
+class CASClientRequestMetrics(ClientRequestMetrics):
+    def __init__(self, scope):
+        super(CASClientRequestMetrics, self).__init__(scope)
+        self.values['ContentionHistogram'] = Histogram(scope, 'ContentionHistogram')
+        self.values['UnfinishedCommit'] = Counter(scope, 'UnfinishedCommit')
+        self.values['UnknownResult'] = Meter(scope, 'UnknownResult')
+
+    def validate(self):
+        super(CASClientRequestMetrics, self).validate()
+        self.values['ContentionHistogram'].validate()
+        self.values['UnfinishedCommit'].validate()
+        self.values['UnknownResult'].validate()
+
+
+class CASClientWriteRequestMetrics(CASClientRequestMetrics):
+    def __init__(self, scope):
+        super(CASClientWriteRequestMetrics, self).__init__(scope)
+        self.values['MutationSizeHistogram'] = Histogram(scope, 'MutationSizeHistogram')
+        self.values['ConditionNotMet'] = Counter(scope, 'ConditionNotMet')
+
+    def validate(self):
+        super(CASClientWriteRequestMetrics, self).validate()
+        self.values['MutationSizeHistogram'].validate()
+        self.values['ConditionNotMet'].validate()
+
+
+def diff_num(v1, v2):
+    return v2 - v1
+
+
+def diff_str(v1, v2):
+    return f"'{v1}' --> '{v2}'"
+
+
+diff_function = {
+    int: diff_num,
+    float: diff_num,
+    list: diff_str,
+    str: diff_str,
+}
+
+
+def diff_value(v1, v2):
+    if v1 is None or v2 is None:  # Before it's set, Mean "null" is returned as None
+        return diff_str(v1, v2)
+    return diff_function[type(v1)](v1, v2)
+
+
+def scope_for_cl(scope, cl):
+    return scope + '-' + CL.value_to_name[cl]
+
+
+def is_zero(k, v):
+    assert v == 0, k
+
+
+def is_positive(k, v):
+    assert v > 0, k
+
+
+def is_non_negative(k, v):
+    assert v >= 0, k
+
+
+def is_none(k, v):
+    assert v is None, k
+
+
+def is_microseconds(k, v):
+    assert v == 'microseconds', k
+
+
+def is_zero_list(k, l):
+    assert not any(l), k
+
+
+def is_nonzero_list(k, l):
+    assert any(l), k
+
+
+def is_histo_list(k, l):
+    # since these values change on sampling, we can only generally verify it takes the proper form
+    # There are in-tree unit tests around ClearableHistogram and DecayingEstimatedHistogramReservoir
+    assert len(l) == 165, k
+    assert all(isinstance(i, int) for i in l), k
+
+
+last_key = 0
+
+
+def new_key():
+    global last_key
+    last_key += 1
+    return last_key