Replace Stream iteration with for-loop for StorageProxy::updateCoordinatorWriteLatencyTableMetric

patch by Samuel Lightfoot; reviewed by Stefan Miklosovic, Brandon Williams for CASSANDRA-19676
diff --git a/CHANGES.txt b/CHANGES.txt
index 26e0297..0379e6f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-beta2
+ * Replace Stream iteration with for-loop for StorageProxy::updateCoordinatorWriteLatencyTableMetric (CASSANDRA-19676)
  * Enforce metric naming contract if scope is used in a metric name (CASSANDRA-19619)
  * Avoid reading of the same IndexInfo from disk many times for a large partition (CASSANDRA-19557)
  * Resolve the oldest hints just from descriptors and current writer if available (CASSANDRA-19600)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 7bf8227..bb6dc8d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1265,10 +1265,16 @@
         {
             //We could potentially pass a callback into performWrite. And add callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints)
             //However, Trade off between write metric per CF accuracy vs performance hit due to callbacks. Similar issue exists with CoordinatorReadLatency metric.
-            mutations.stream()
-                     .flatMap(m -> m.getTableIds().stream().map(tableId -> Keyspace.open(m.getKeyspaceName()).getColumnFamilyStore(tableId)))
-                     .distinct()
-                     .forEach(store -> store.metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS));
+            Set<ColumnFamilyStore> uniqueColumnFamilyStores = new HashSet<>();
+            for (IMutation mutation : mutations)
+            {
+                for (TableId tableId : mutation.getTableIds())
+                {
+                    ColumnFamilyStore store = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId);
+                    if (uniqueColumnFamilyStores.add(store))
+                        store.metric.coordinatorWriteLatency.update(latency, NANOSECONDS);
+                }
+            }
         }
         catch (Exception ex)
         {