Introduce a `countdown-ms` param in Coordinator request.

With the introduction of `timeoutSecs` for HostMaintenanceRequest
and the `CoordinatorSlaPolicy`, it will be beneficial to expose the
time remaining until forced maintenance to the Coordinator. Send
the time remaining until force task maintenance as an extra query
param to the Coordinator.

Testing Done:
./gradlew test
./build-support/jenkins/build.sh

**Tested on Vagrant**

***Logs from Coordinator***
Request received for {'task': ['devcluster/vagrant/test/coordinator/0']}
{
  "forceMaintenanceCountdownMs": "604755646",
  "task": "devcluster/vagrant/test/coordinator/0",
  "taskConfig": {
    "assignedTask": {
      "assignedPorts": {},
      "instanceId": 0,
      "slaveHost": "192.168.33.7",
      "slaveId": "f0336813-864b-4c8f-914c-80f8cef3b61d-S0",
      "task": {
      ...<SNIPPED>
}
Responded: True

Reviewed at https://reviews.apache.org/r/67657/
diff --git a/docs/features/sla-requirements.md b/docs/features/sla-requirements.md
index 555b174..2b3fa65 100644
--- a/docs/features/sla-requirements.md
+++ b/docs/features/sla-requirements.md
@@ -123,8 +123,8 @@
 (such as drains) will be required to get approval from the `Coordinator` before proceeding. The
 coordinator service needs to expose a HTTP endpoint, that can take a `task-key` param
 (`<cluster>/<role>/<env>/<name>/<instance>`) and a json body describing the task
-details and return a response json that will contain the boolean status for allowing or disallowing
-the task's removal.
+details, force maintenance countdown (ms) and other params and return a response json that will
+contain the boolean status for allowing or disallowing the task's removal.
 
 ##### Request:
 ```javascript
@@ -132,24 +132,28 @@
   ?task=<cluster>/<role>/<env>/<name>/<instance>
 
 {
-  "assignedTask": {
-    "taskId": "taskA",
-    "slaveHost": "a",
-    "task": {
-      "job": {
-        "role": "role",
-        "environment": "devel",
-        "name": "job"
+  "forceMaintenanceCountdownMs": "604755646",
+  "task": "cluster/role/devel/job/1",
+  "taskConfig": {
+    "assignedTask": {
+      "taskId": "taskA",
+      "slaveHost": "a",
+      "task": {
+        "job": {
+          "role": "role",
+          "environment": "devel",
+          "name": "job"
+        },
+        ...
       },
+      "assignedPorts": {
+        "http": 1000
+      },
+      "instanceId": 1
       ...
     },
-    "assignedPorts": {
-      "http": 1000
-    },
-    "instanceId": 1
     ...
-  },
-  ...
+  }
 }
 ```
 
diff --git a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
index 626a682..344a430 100644
--- a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
@@ -33,6 +33,7 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -177,6 +178,8 @@
     @VisibleForTesting
     static final String DRAINING_MESSAGE = "Draining machine for maintenance.";
 
+    private static final String COUNTDOWN_MS_PARAM = "forceMaintenanceCountdownMs";
+
     private static final String MAINTENANCE_COUNTDOWN_STAT_NAME = "maintenance_countdown_ms";
     private static final String MISSING_MAINTENANCE_REQUEST = "missing_maintenance_request";
     private static final SlaPolicy ZERO_PERCENT_SLA = SlaPolicy.percentageSlaPolicy(
@@ -465,19 +468,23 @@
       }
 
       boolean force = false;
-      long expireMs =
-          System.currentTimeMillis() - hostMaintenanceRequest.get().getCreatedTimestampMs();
-      long maintenanceCountDownMs =
-          TimeAmount.of(hostMaintenanceRequest.get().getTimeoutSecs(), Time.SECONDS)
-              .as(Time.MILLISECONDS) - expireMs;
+      long startMs = hostMaintenanceRequest.get().getCreatedTimestampMs();
+      long timeoutMs = TimeAmount.of(
+            hostMaintenanceRequest.get().getTimeoutSecs(),
+            Time.SECONDS)
+          .as(Time.MILLISECONDS);
+      long endMs = startMs + timeoutMs;
+      long remainingMs = endMs - System.currentTimeMillis();
       maintenanceCountDownByTask.get(
           Joiner.on("_")
-              .join(MAINTENANCE_COUNTDOWN_STAT_NAME,
-                  InstanceKeys.toString(Tasks.getJob(task), Tasks.getInstanceId(task))))
-          .getAndSet(maintenanceCountDownMs);
+              .join(
+                  MAINTENANCE_COUNTDOWN_STAT_NAME,
+                  InstanceKeys.toString(Tasks.getJob(task), Tasks.getInstanceId(task))
+              )
+            )
+          .getAndSet(remainingMs);
 
-      if (hostMaintenanceRequest.get().getTimeoutSecs()
-            < TimeAmount.of(expireMs, Time.MILLISECONDS).as(Time.SECONDS)) {
+      if (remainingMs < 0) {
         LOG.warn("Maintenance request timed out for host: {} after {} secs. Forcing drain of {}.",
             host, hostMaintenanceRequest.get().getTimeoutSecs(), Tasks.id(task));
         force = true;
@@ -496,6 +503,7 @@
               Optional.empty(),
               ScheduleStatus.DRAINING,
               Optional.of(DRAINING_MESSAGE)),
+          ImmutableMap.of(COUNTDOWN_MS_PARAM, Long.toString(remainingMs)),
           force);
     }
   }
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
index 9c5caf4..35ca771 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
@@ -33,6 +33,8 @@
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Striped;
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import com.google.inject.Inject;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
@@ -266,13 +268,15 @@
    * @param task Task whose SLA is to checked.
    * @param slaPolicy {@link ICoordinatorSlaPolicy} to use for checking SLA.
    * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+   * @param params dictionary of key-value pairs to send in json body to Coordinator
    * @param <T> The type of result the {@link Storage.MutateWork} produces.
    * @param <E> The type of exception the {@link Storage.MutateWork} throw.
    */
   private <T, E extends Exception> void askCoordinatorThenAct(
       IScheduledTask task,
       ICoordinatorSlaPolicy slaPolicy,
-      Storage.MutateWork<T, E> work) {
+      Storage.MutateWork<T, E> work,
+      Map<String, String> params) {
 
     String taskKey = getTaskKey(task);
 
@@ -287,7 +291,7 @@
             taskKey);
         attemptsCounter.incrementAndGet();
 
-        if (coordinatorAllows(task, taskKey, slaPolicy)) {
+        if (coordinatorAllows(task, taskKey, slaPolicy, params)) {
           LOG.info("Performing work after coordinator: {} approval for task: {}",
               slaPolicy.getCoordinatorUrl(),
               taskKey);
@@ -322,14 +326,22 @@
   private boolean coordinatorAllows(
       IScheduledTask task,
       String taskKey,
-      ICoordinatorSlaPolicy slaPolicy)
+      ICoordinatorSlaPolicy slaPolicy,
+      Map<String, String> params)
       throws InterruptedException, ExecutionException, TException {
 
     LOG.info("Checking coordinator: {} for task: {}", slaPolicy.getCoordinatorUrl(), taskKey);
 
+    String taskConfig = new TSerializer(new TSimpleJSONProtocol.Factory())
+        .toString(task.newBuilder());
+    JsonObject jsonBody = new JsonObject();
+    jsonBody.add("taskConfig", new JsonParser().parse(taskConfig));
+    jsonBody.addProperty(TASK_PARAM, taskKey);
+    params.forEach(jsonBody::addProperty);
+
     Response response = httpClient.preparePost(slaPolicy.getCoordinatorUrl())
         .setQueryParams(ImmutableList.of(new Param(TASK_PARAM, taskKey)))
-        .setBody(new TSerializer(new TSimpleJSONProtocol.Factory()).toString(task.newBuilder()))
+        .setBody(new Gson().toJson(jsonBody))
         .execute()
         .get();
 
@@ -391,6 +403,7 @@
    * @param task Task whose SLA is to be checked.
    * @param slaPolicy {@link ISlaPolicy} to use.
    * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+   * @param params dictionary of key-value pairs to send in json body to Coordinator
    * @param force boolean to indicate if work should be performed without checking SLA.
    * @param <T> The type of result the {@link Storage.MutateWork} produces.
    * @param <E> The type of exception the {@link Storage.MutateWork} throw.
@@ -401,6 +414,7 @@
       IScheduledTask task,
       ISlaPolicy slaPolicy,
       Storage.MutateWork<T, E> work,
+      Map<String, String> params,
       boolean force) throws E {
 
     if (force) {
@@ -417,7 +431,8 @@
       executor.execute(() -> askCoordinatorThenAct(
           task,
           slaPolicy.getCoordinatorSlaPolicy(),
-          work));
+          work,
+          params));
     } else {
       // verify sla and perform work if satisfied
       storage.write(store -> {
diff --git a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
index c9390df..1a62f5a 100644
--- a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
@@ -17,6 +17,7 @@
 import java.util.Set;
 import java.util.concurrent.Executor;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
@@ -174,6 +175,7 @@
         IHostMaintenanceRequest.build(new HostMaintenanceRequest()
             .setHost(HOST_A)
             .setCreatedTimestampMs(System.currentTimeMillis())
+            .setTimeoutSecs(7200)
             .setDefaultSlaPolicy(SLA_POLICY));
 
     storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
@@ -294,6 +296,7 @@
         IHostMaintenanceRequest.build(new HostMaintenanceRequest()
             .setHost(HOST_A)
             .setCreatedTimestampMs(System.currentTimeMillis())
+            .setTimeoutSecs(7200)
             .setDefaultSlaPolicy(SLA_POLICY));
     expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
         .andReturn(Optional.of(maintenanceRequest)).times(2);
@@ -385,6 +388,7 @@
         IHostMaintenanceRequest.build(new HostMaintenanceRequest()
             .setHost(HOST_A)
             .setCreatedTimestampMs(System.currentTimeMillis())
+            .setTimeoutSecs(7200)
             .setDefaultSlaPolicy(SLA_POLICY));
     expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
         .andReturn(Optional.of(maintenanceRequest)).times(1);
@@ -402,6 +406,7 @@
         eq(task),
         eq(ISlaPolicy.build(SLA_POLICY)),
         anyObject(Storage.MutateWork.class),
+        anyObject(ImmutableMap.class),
         eq(force));
   }
 
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
index 759a1bc..6881678 100644
--- a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
@@ -15,6 +15,7 @@
 
 import java.io.IOException;
 import java.net.URLEncoder;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
@@ -25,7 +26,11 @@
 import javax.servlet.http.HttpServletResponse;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -207,6 +212,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -245,6 +251,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -279,6 +286,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -310,6 +318,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -341,6 +350,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -372,6 +382,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -403,6 +414,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -427,6 +439,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         true);
   }
 
@@ -462,6 +475,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -503,6 +517,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -540,6 +555,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -571,6 +587,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -602,6 +619,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -633,6 +651,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -665,6 +684,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -690,6 +710,7 @@
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         true);
   }
 
@@ -724,6 +745,52 @@
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
+          false);
+    }
+
+    // wait until we are sure that the server has responded
+    coordinatorResponded.await();
+    workCalled.await();
+
+    assertEquals(0, coordinatorResponded.getCount());
+    // check the work was called
+    assertEquals(0, workCalled.getCount());
+  }
+
+  /**
+   * Verifies that SLA check passes and the supplied {@link Storage.MutateWork} gets executed
+   * for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds
+   * with {@code {"drain": true}} when supplied with extra params.
+   */
+  @Test
+  public void testCheckCoordinatorWithExtraParamsSlaPassesThenActs() throws Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    Map<String, String> extraParams = ImmutableMap.of("someKey", "someValue");
+    jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": true}", extraParams));
+    jettyServer.start();
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) {
+      slaManager.checkSlaThenAct(
+          task1,
+          createCoordinatorSlaPolicy(),
+          storeProvider -> {
+            // set the marker to indicate that we performed the work
+            workCalled.countDown();
+            storeProvider
+                .getUnsafeTaskStore()
+                .fetchTask(task1.getAssignedTask().getTaskId());
+            return null;
+          },
+          extraParams,
           false);
     }
 
@@ -767,6 +834,7 @@
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
           false);
     }
 
@@ -804,6 +872,7 @@
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
           false);
     }
 
@@ -847,6 +916,7 @@
               .fetchTask(task1.getAssignedTask().getTaskId());
           return null;
         },
+        ImmutableMap.of(),
         true);
 
     workCalled.await();
@@ -884,6 +954,7 @@
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
           false);
     }
 
@@ -925,6 +996,7 @@
               .fetchTask(task1.getAssignedTask().getTaskId());
           return null;
         },
+        ImmutableMap.of(),
         false);
 
     try {
@@ -987,6 +1059,7 @@
                 LOG.info("Finished action1 for task:{}", slaManager.getTaskKey(task1));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1019,6 +1092,7 @@
                 LOG.info("Finished action2 for task:{}", slaManager.getTaskKey(task2));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1104,6 +1178,7 @@
                 LOG.info("Finished action for task:{}", slaManager.getTaskKey(task1));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1137,6 +1212,7 @@
                 LOG.info("Finished action for task:{}", slaManager.getTaskKey(task2));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1178,6 +1254,13 @@
   private AbstractHandler mockCoordinatorResponse(
       IScheduledTask task,
       String pollResponse) {
+    return mockCoordinatorResponse(task, pollResponse, ImmutableMap.of());
+  }
+
+  private AbstractHandler mockCoordinatorResponse(
+      IScheduledTask task,
+      String pollResponse,
+      Map<String, String> params) {
 
     return new AbstractHandler() {
       @Override
@@ -1191,8 +1274,15 @@
           String query = Joiner
               .on("=")
               .join(SlaManager.TASK_PARAM, URLEncoder.encode(taskKey, "UTF-8"));
-          String body = new TSerializer(new TSimpleJSONProtocol.Factory())
+
+          String taskConfig = new TSerializer(new TSimpleJSONProtocol.Factory())
               .toString(task.newBuilder());
+          JsonObject jsonBody = new JsonObject();
+          jsonBody.add("taskConfig", new JsonParser().parse(taskConfig));
+          jsonBody.addProperty(SlaManager.TASK_PARAM, taskKey);
+          params.forEach(jsonBody::addProperty);
+          String body = new Gson().toJson(jsonBody);
+
           if (request.getQueryString().equals(query)
               && request.getReader().lines().collect(Collectors.joining()).equals(body)) {
             createResponse(baseRequest, response, pollResponse);