Adding flag to enable SLA aware killing for non production workloads. (#67)

Added a flag that allows operators to enable SLA aware killing for non-production tasks. 
The flag is disabled by default.
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
index 35ca771..3e7d4ac 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
@@ -98,6 +98,11 @@
   @interface MinRequiredInstances { }
 
   @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface SlaAwareKillNonProd { }
+
+  @VisibleForTesting
   static final String TASK_PARAM = "task";
 
   private static final String ATTEMPTS_STAT_NAME = "sla_coordinator_attempts";
@@ -113,6 +118,7 @@
   private final Striped<Lock> lock;
   private final int minRequiredInstances;
   private final TierManager tierManager;
+  private final boolean slaAwareKillNonProd;
 
   private final AtomicLong attemptsCounter;
   private final AtomicLong successCounter;
@@ -131,7 +137,8 @@
              IServerInfo serverInfo,
              @HttpClient AsyncHttpClient httpClient,
              TierManager tierManager,
-             StatsProvider statsProvider) {
+             StatsProvider statsProvider,
+             @SlaAwareKillNonProd boolean slaAwareKillNonProd) {
 
     this.executor = requireNonNull(executor);
     this.storage = requireNonNull(storage);
@@ -169,6 +176,7 @@
           }
         }
     );
+    this.slaAwareKillNonProd = slaAwareKillNonProd;
   }
 
   private long getSlaDuration(ISlaPolicy slaPolicy) {
@@ -445,8 +453,8 @@
   }
 
   private boolean skipSla(IScheduledTask task, long numActive) {
-    if (!tierManager.getTier(task.getAssignedTask().getTask()).isPreemptible()
-        && !tierManager.getTier(task.getAssignedTask().getTask()).isRevocable()) {
+    if (slaAwareKillNonProd
+        || tierManager.getTier(task.getAssignedTask().getTask()).isProduction()) {
       return numActive < minRequiredInstances;
     }
     return true;
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
index 27bbaa8..3149bf1 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -39,6 +39,7 @@
 import org.apache.aurora.scheduler.config.validators.PositiveAmount;
 import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
 import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCategory;
+import org.apache.aurora.scheduler.sla.SlaManager.SlaAwareKillNonProd;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
@@ -101,6 +102,11 @@
             + "This does not apply to jobs that have a CoordinatorSlaPolicy."
     )
     public TimeAmount maxSlaDuration = new TimeAmount(2, Time.HOURS);
+
+    @Parameter(names = "-sla_aware_kill_non_prod",
+        description = "Enables SLA awareness for drain and and update for non-production tasks",
+        arity = 1)
+    public boolean slaAwareKillNonProd = false;
   }
 
   @VisibleForTesting
@@ -149,6 +155,10 @@
         .annotatedWith(SlaManager.MinRequiredInstances.class)
         .toInstance(options.minRequiredInstances);
 
+    bind(new TypeLiteral<Boolean>() { })
+        .annotatedWith(SlaAwareKillNonProd.class)
+        .toInstance(options.slaAwareKillNonProd);
+
     bind(new TypeLiteral<Integer>() { })
         .annotatedWith(SlaManager.MaxParallelCoordinators.class)
         .toInstance(options.maxParallelCoordinators);
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 5e34680..a7ca6e8 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -241,6 +241,7 @@
     expected.sla.slaProdMetrics = ImmutableList.of(MetricCategory.JOB_UPTIMES);
     expected.sla.slaNonProdMetrics = ImmutableList.of(MetricCategory.JOB_UPTIMES);
     expected.sla.slaRefreshInterval = TEST_TIME;
+    expected.sla.slaAwareKillNonProd = true;
     expected.webhook.webhookConfigFile = tempFile;
     expected.scheduler.maxRegistrationDelay = TEST_TIME;
     expected.scheduler.maxLeadingDuration = TEST_TIME;
@@ -326,6 +327,7 @@
         "-sla_aware_action_max_batch_size=42",
         "-sla_aware_kill_retry_min_delay=42days",
         "-sla_aware_kill_retry_max_delay=42days",
+        "-sla_aware_kill_non_prod=true",
         "-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
         "-dlog_snapshot_interval=42days",
         "-dlog_max_entry_size=42GB",
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
index 6881678..02ec650 100644
--- a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
@@ -135,6 +135,10 @@
                 .annotatedWith(SlaManager.MinRequiredInstances.class)
                 .toInstance(2);
 
+            bind(new TypeLiteral<Boolean>() { })
+                .annotatedWith(SlaManager.SlaAwareKillNonProd.class)
+                .toInstance(false);
+
             bind(new TypeLiteral<Integer>() { })
                 .annotatedWith(SlaManager.MaxParallelCoordinators.class)
                 .toInstance(10);