Reduce helix controller log and minor code improve (#2102)

Turn down a few log level to DEBUG and reduce duplicated logs
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index bda56ba..c2af5d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -138,7 +138,7 @@
       String resourceName = message.getResourceName();
       Resource resource = resourceMap.get(resourceName);
       if (resource == null) {
-        LogUtil.logInfo(LOG, _eventId, String.format(
+        LogUtil.logDebug(LOG, _eventId, String.format(
             "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
             message.getMsgId(), resourceName, message.getPartitionName()));
         continue;
@@ -156,7 +156,7 @@
             cache.addStaleMessage(instanceName, message);
           }
         } else {
-          LogUtil.logInfo(LOG, _eventId, String
+          LogUtil.logDebug(LOG, _eventId, String
               .format("Ignore a pending message %s for a non-exist resource %s and partition %s",
                   message.getMsgId(), resourceName, message.getPartitionName()));
         }
@@ -168,7 +168,7 @@
             if (partition != null) {
               setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
             } else {
-              LogUtil.logInfo(LOG, _eventId, String.format(
+              LogUtil.logDebug(LOG, _eventId, String.format(
                   "Ignore a pending message %s for a non-exist resource %s and partition %s",
                   message.getMsgId(), resourceName, message.getPartitionName()));
             }
@@ -193,7 +193,7 @@
       String resourceName = message.getResourceName();
       Resource resource = resourceMap.get(resourceName);
       if (resource == null) {
-        LogUtil.logInfo(LOG, _eventId, String.format(
+        LogUtil.logDebug(LOG, _eventId, String.format(
             "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
             message.getMsgId(), resourceName, message.getPartitionName()));
         continue;
@@ -205,7 +205,7 @@
         if (partition != null) {
           currentStateOutput.setPendingRelayMessage(resourceName, partition, instanceName, message);
         } else {
-          LogUtil.logInfo(LOG, _eventId, String.format(
+          LogUtil.logDebug(LOG, _eventId, String.format(
               "Ignore a pending relay message %s for a non-exist resource %s and partition %s",
               message.getMsgId(), resourceName, message.getPartitionName()));
         }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 7981302..8a7ae52 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -325,13 +325,13 @@
                 .getResourceName() + "." + partition.getPartitionName() + " from " + currentState
                 + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
       } else if (currentState.equalsIgnoreCase(pendingState)) {
-        LogUtil.logInfo(logger, _eventId,
+        LogUtil.logDebug(logger, _eventId,
             "Message hasn't been removed for " + instanceName + " to transit " + resource
                 .getResourceName() + "." + partition.getPartitionName() + " to " + pendingState
                 + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage
                 .isRelayMessage());
       } else {
-        LogUtil.logInfo(logger, _eventId,
+        LogUtil.logDebug(logger, _eventId,
             "IdealState changed before state transition completes for " + resource.getResourceName()
                 + "." + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
                 + pendingState + ", currentState: " + currentState + ", nextState: " + nextState
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index d8b4820..39ea600 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -419,7 +419,6 @@
     }
     LOG.info(
         "AssignableInstanceManager updated AssignableInstances due to LiveInstance/InstanceConfig change.");
-
     computeGlobalThreadBasedCapacity();
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 4912169..e32f34c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -21,7 +21,6 @@
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -75,7 +74,7 @@
     // Clean up if workflow marked for deletion
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
-      LOG.info("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
+      LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
       updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
       cleanupWorkflow(workflow);
       return;
@@ -119,7 +118,7 @@
 
     // Step 4: Handle finished workflows
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
-      LOG.info("Workflow {} is finished.", workflow);
+      LOG.debug("Workflow {} is finished.", workflow);
       updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
       long expiryTime = workflowCfg.getExpiry();
       // Check if this workflow has been finished past its expiry.
@@ -150,9 +149,9 @@
     // For workflows that have already reached final states, STOP should not take into effect.
     if (!TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())
         && TargetState.STOP.equals(targetState)) {
-      LOG.info("Workflow {} is marked as stopped. Workflow state is {}", workflow,
-          workflowCtx.getWorkflowState());
-      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+      if (isWorkflowStopped(workflowCtx, workflowCfg) && workflowCtx.getWorkflowState() != TaskState.STOPPED) {
+        LOG.debug("Workflow {} is marked as stopped. Workflow state is {}", workflow,
+            workflowCtx.getWorkflowState());
         workflowCtx.setWorkflowState(TaskState.STOPPED);
         _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
       }
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
index 74d6253..fb8d869 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import java.util.Set;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.TaskConfig;
@@ -74,19 +75,16 @@
   public Map<String, TaskAssignResult> assignTasks(
       AssignableInstanceManager assignableInstanceManager, Collection<String> instances,
       Iterable<TaskConfig> tasks, String quotaType) {
-    Iterable<AssignableInstance> assignableInstances = new HashSet<>();
+    Set<AssignableInstance> assignableInstances = new HashSet<>();
     // Only add the AssignableInstances that are also in instances
     for (String instance : instances) {
-      ((HashSet<AssignableInstance>) assignableInstances)
-          .add(assignableInstanceManager.getAssignableInstance(instance));
+      assignableInstances.add(assignableInstanceManager.getAssignableInstance(instance));
     }
 
     if (tasks == null || !tasks.iterator().hasNext()) {
-      logger.warn("No task to assign!");
       return Collections.emptyMap();
     }
-    if (assignableInstances == null || !assignableInstances.iterator().hasNext()) {
-      logger.warn("No instance to assign!");
+    if (assignableInstances.isEmpty()) {
       return buildNoInstanceAssignment(tasks, quotaType);
     }
     if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) {
@@ -148,7 +146,7 @@
     return result;
   }
 
-  private class AssignableInstanceComparator implements Comparator<AssignableInstance> {
+  private static class AssignableInstanceComparator implements Comparator<AssignableInstance> {
 
     /**
      * Resource type this comparator needs to compare