Export number of tasks lost per dedicated role.

When there are 100s of dedicated roles in a cluster
the task_LOST_<job> metric is not enough. Introduce
per dedicated role metric for easier diagnosis.

Testing Done:
./gradlew test

**Tested on Vagrant**
tasks_lost_dedicated____web.multi 0
tasks_lost_dedicated_vagrant 2

Reviewed at https://reviews.apache.org/r/67638/
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index ee20ed3..8bb5f3e 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -17,7 +17,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.stream.StreamSupport;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -28,7 +28,6 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -114,6 +113,11 @@
   }
 
   @VisibleForTesting
+  static String dedicatedRoleStatName(String role) {
+    return "tasks_lost_dedicated_" + role.replace("*", "_");
+  }
+
+  @VisibleForTesting
   static String jobStatName(IScheduledTask task, ScheduleStatus status) {
     return String.format(
         "tasks_%s_%s",
@@ -144,20 +148,18 @@
     if (Strings.isNullOrEmpty(task.getAssignedTask().getSlaveHost())) {
       rack = Optional.empty();
     } else {
-      rack = storage.read(storeProvider -> {
-        Optional<IAttribute> rack1 = FluentIterable
-            .from(AttributeStore.Util.attributesOrNone(storeProvider, host))
-            .firstMatch(IS_RACK)
-            .toJavaUtil();
-        return rack1.map(ATTR_VALUE);
-      });
+      rack = storage.read(storeProvider ->
+          StreamSupport.stream(
+              AttributeStore.Util.attributesOrNone(storeProvider, host).spliterator(),
+              false)
+            .filter(IS_RACK)
+            .findFirst()
+            .map(ATTR_VALUE));
     }
 
     // Always dummy-read the lost-tasks-per-rack stat. This ensures that there is at least a zero
     // exported for all racks.
-    if (rack.isPresent()) {
-      counters.getUnchecked(rackStatName(rack.get()));
-    }
+    rack.ifPresent(s -> counters.getUnchecked(rackStatName(s)));
 
     if (newState == ScheduleStatus.LOST) {
       if (rack.isPresent()) {
@@ -168,6 +170,32 @@
     }
   }
 
+  private void updateDedicatedCounters(IScheduledTask task, ScheduleStatus newState) {
+    final String host = task.getAssignedTask().getSlaveHost();
+    ImmutableSet<String> dedicatedRoles;
+    if (Strings.isNullOrEmpty(host)) {
+      dedicatedRoles = ImmutableSet.of();
+    } else {
+      dedicatedRoles = storage.read(store ->
+          StreamSupport.stream(
+                AttributeStore.Util.attributesOrNone(store, host).spliterator(),
+                false)
+              .filter(attr -> "dedicated".equals(attr.getName()))
+              .findFirst()
+              .map(IAttribute::getValues)
+              .orElse(ImmutableSet.of())
+      );
+    }
+
+    // Always dummy-read the lost-tasks-per-role stat. This ensures that there is at least a zero
+    // exported for all roles.
+    dedicatedRoles.forEach(s -> counters.getUnchecked(dedicatedRoleStatName(s)));
+
+    if (newState == ScheduleStatus.LOST) {
+      dedicatedRoles.forEach(s -> counters.getUnchecked(dedicatedRoleStatName(s)).increment());
+    }
+  }
+
   private void updateJobCounters(IScheduledTask task, ScheduleStatus newState) {
     if (TRACKED_JOB_STATES.contains(newState)) {
       untrackedCounters.getUnchecked(jobStatName(task, newState)).increment();
@@ -186,6 +214,7 @@
 
     updateRackCounters(task, task.getStatus());
     updateJobCounters(task, task.getStatus());
+    updateDedicatedCounters(task, task.getStatus());
   }
 
   @Override
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index 6321ec0..2bae69d 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -53,6 +53,7 @@
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.TaskVars.VETO_GROUPS_TO_COUNTERS;
 import static org.apache.aurora.scheduler.TaskVars.VETO_TYPE_TO_COUNTERS;
+import static org.apache.aurora.scheduler.TaskVars.dedicatedRoleStatName;
 import static org.apache.aurora.scheduler.TaskVars.jobStatName;
 import static org.apache.aurora.scheduler.TaskVars.rackStatName;
 import static org.easymock.EasyMock.expect;
@@ -185,8 +186,11 @@
     expectStatusCountersInitialized();
 
     IScheduledTask taskA = makeTask(JOB_A, INIT);
-    expectGetHostRack("hostA", "rackA").atLeastOnce();
+    expectGetHostAttributes("hostA", "rackA", ImmutableSet.of("role-1/job-1", "role.2/job.2"))
+        .atLeastOnce();
     expectStatExport(rackStatName("rackA"));
+    expectStatExport(dedicatedRoleStatName("role-1/job-1"));
+    expectStatExport(dedicatedRoleStatName("role.2/job.2"));
 
     replayAndBuild();
     schedulerActivated();
@@ -204,6 +208,8 @@
     assertEquals(0, getValue(RUNNING));
     assertEquals(1, getValue(FINISHED));
     assertEquals(0, getValue(rackStatName("rackA")));
+    assertEquals(0, getValue(dedicatedRoleStatName("role-1/job-1")));
+    assertEquals(0, getValue(dedicatedRoleStatName("role.2/job.2")));
     vars.tasksDeleted(new TasksDeleted(ImmutableSet.of(
         IScheduledTask.build(taskA.newBuilder().setStatus(FINISHED)))));
     assertAllZero();
@@ -260,10 +266,12 @@
   @Test
   public void testLoadsFromStorage() {
     expectStatusCountersInitialized();
-    expectGetHostRack("hostA", "rackA").atLeastOnce();
-    expectGetHostRack("hostB", "rackB").atLeastOnce();
+    expectGetHostAttributes("hostA", "rackA", ImmutableSet.of("role-1/job-1")).atLeastOnce();
+    expectGetHostAttributes("hostB", "rackB", ImmutableSet.of("role.2/job.2")).atLeastOnce();
     expectStatExport(rackStatName("rackA"));
     expectStatExport(rackStatName("rackB"));
+    expectStatExport(dedicatedRoleStatName("role-1/job-1"));
+    expectStatExport(dedicatedRoleStatName("role.2/job.2"));
 
     IScheduledTask failedTask = makeTask(JOB_B, FAILED, "hostB");
     expectStatExport(jobStatName(failedTask, FAILED), untrackedProvider);
@@ -282,14 +290,27 @@
     assertEquals(1, getValue(FAILED));
     assertEquals(0, getValue(rackStatName("rackA")));
     assertEquals(0, getValue(rackStatName("rackB")));
+    assertEquals(0, getValue(dedicatedRoleStatName("role-1/job-1")));
+    assertEquals(0, getValue(dedicatedRoleStatName("role.2/job.2")));
     assertEquals(1, getValue(jobStatName(failedTask, FAILED)));
   }
 
-  private IExpectationSetters<?> expectGetHostRack(String host, String rackToReturn) {
+  private IExpectationSetters<?> expectGetHostAttributes(
+      String host,
+      String rackToReturn) {
+    return expectGetHostAttributes(host, rackToReturn, ImmutableSet.of());
+  }
+
+  private IExpectationSetters<?> expectGetHostAttributes(
+      String host,
+      String rackToReturn,
+      ImmutableSet<String> dedicatedAttrs) {
+
     IHostAttributes attributes = IHostAttributes.build(new HostAttributes()
         .setHost(host)
         .setAttributes(ImmutableSet.of(
-            new Attribute().setName("rack").setValues(ImmutableSet.of(rackToReturn)))));
+            new Attribute().setName("rack").setValues(ImmutableSet.of(rackToReturn)),
+            new Attribute().setName("dedicated").setValues(dedicatedAttrs))));
     return expect(storageUtil.attributeStore.getHostAttributes(host))
         .andReturn(Optional.of(attributes));
   }
@@ -297,12 +318,14 @@
   @Test
   public void testLostCounters() {
     expectStatusCountersInitialized();
-    expectGetHostRack("host1", "rackA").atLeastOnce();
-    expectGetHostRack("host2", "rackB").atLeastOnce();
-    expectGetHostRack("host3", "rackB").atLeastOnce();
+    expectGetHostAttributes("host1", "rackA", ImmutableSet.of("role-1/job-1")).atLeastOnce();
+    expectGetHostAttributes("host2", "rackB", ImmutableSet.of("role.2/job.2")).atLeastOnce();
+    expectGetHostAttributes("host3", "rackB").atLeastOnce();
 
     expectStatExport(rackStatName("rackA"));
     expectStatExport(rackStatName("rackB"));
+    expectStatExport(dedicatedRoleStatName("role-1/job-1"));
+    expectStatExport(dedicatedRoleStatName("role.2/job.2"));
 
     IScheduledTask a = makeTask(JOB_A, RUNNING, "host1");
     IScheduledTask b = makeTask(JOB_B, RUNNING, "host2");
@@ -325,6 +348,9 @@
     assertEquals(2, getValue(rackStatName("rackA")));
     assertEquals(2, getValue(rackStatName("rackB")));
 
+    assertEquals(2, getValue(dedicatedRoleStatName("role-1/job-1")));
+    assertEquals(1, getValue(dedicatedRoleStatName("role.2/job.2")));
+
     assertEquals(1, getValue(jobStatName(a, LOST)));
     assertEquals(1, getValue(jobStatName(b, LOST)));
     assertEquals(2, getValue(jobStatName(c, LOST)));
@@ -335,6 +361,8 @@
     expectStatusCountersInitialized();
     expect(storageUtil.attributeStore.getHostAttributes("a"))
         .andReturn(Optional.empty());
+    expect(storageUtil.attributeStore.getHostAttributes("a"))
+        .andReturn(Optional.empty());
 
     IScheduledTask a = makeTask(JOB_A, RUNNING, "a");
     expectStatExport(jobStatName(a, LOST), untrackedProvider);