Breakdown resource stats by role

Currently Aurora exports total quota and resource reservation over time. This can be very useful to see changes in trends of production and free tier capacity. One challenge (particularly in a self-serve capacity environment) is identifying and tracking where large deltas came from. This change exports both quota and resource usage per role to help with this.

Reviewed at https://reviews.apache.org/r/66806/
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
index a3e9bc7..7726337 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -17,6 +17,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
@@ -97,6 +98,23 @@
   }
 
   /**
+   * Returns quota allocations by role (as metrics).
+   *
+   * @return A map of role to allocated quota metrics.
+   * @throws StorageException if there was a problem fetching quotas from storage.
+   */
+  public Map<String, Metric> computeQuotaAllocationByRole() throws StorageException {
+    return storage.read(storeProvider -> {
+      return storeProvider.getQuotaStore().fetchQuotas().entrySet().stream()
+          .collect(Collectors.toMap(e -> e.getKey(), e -> {
+            Metric allocation = new Metric();
+            allocation.accumulate(e.getValue());
+            return allocation;
+          }));
+    });
+  }
+
+  /**
    * Computes arbitrary resource aggregates based on a query, a filter, and a grouping function.
    *
    * @param query Query to select tasks for aggregation.
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java b/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
index ac5cf24..fa99dd4 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
@@ -17,6 +17,8 @@
 
 import com.google.common.base.Joiner;
 
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.stats.ResourceCounter.Metric;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -49,13 +51,25 @@
     });
   }
 
+  @Timed("task_stat_calculator_run")
   @Override
   public void run() {
     try {
       for (Metric metric : resourceCounter.computeConsumptionTotals()) {
         update("resources_" + metric.type.name(), metric);
       }
+      // Export consumption per role
+      for (ResourceCounter.MetricType type: ResourceCounter.MetricType.values()) {
+        resourceCounter.computeAggregates(
+            Query.unscoped().active(),
+            type.filter,
+            (taskConfig) -> type.name() + "_" + taskConfig.getJob().getRole())
+            .forEach((name, metric) -> update("resources_per_role_" + name, metric));
+      }
+
       update("resources_allocated_quota", resourceCounter.computeQuotaAllocationTotals());
+      resourceCounter.computeQuotaAllocationByRole()
+          .forEach((role, metric) -> update("quota_per_role_" + role, metric));
     } catch (StorageException e) {
       LOG.debug("Unable to fetch metrics, storage is likely not ready.");
     }
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
index a30d74e..b1c8c6d 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
@@ -131,6 +131,22 @@
   }
 
   @Test
+  public void testComputeQuotaAllocationByRole() {
+    storage.write((NoResult.Quiet) storeProvider -> {
+      storeProvider.getQuotaStore().saveQuota("a", ResourceTestUtil.aggregate(1, 1, 1));
+      storeProvider.getQuotaStore().saveQuota("b", ResourceTestUtil.aggregate(2, 3, 4));
+    });
+
+    assertEquals(
+        ImmutableMap.of(
+            "a",
+            new Metric(TOTAL_CONSUMED, bag(1, 1, 1)),
+            "b",
+            new Metric(TOTAL_CONSUMED, bag(2, 3, 4))),
+        resourceCounter.computeQuotaAllocationByRole());
+  }
+
+  @Test
   public void testComputeAggregates() {
     insertTasks(
         task("bob", "jobA", "a",  1, GB, GB, PRODUCTION,    RUNNING,    NOT_DEDICATED),