AMBARI-24762. Ambari server continues to send request updates after all commands were completed. (#2448)

* AMBARI-24762. Ambari server continues to send request updates after all commands were completed. (mpapirkovskyy)

* AMBARI-24762. Ambari server continues to send request updates after all commands were completed. (mpapirkovskyy)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 2dfedb2..735a774 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -895,6 +895,9 @@
         }
 
         updateRoleStats(status, roleStats.get(roleStr));
+        if (status == HostRoleStatus.FAILED) {
+          LOG.info("Role {} on host {} was failed", roleStr, host);
+        }
 
       }
     }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
index 34bc106..2492929 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
@@ -21,6 +21,7 @@
 import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.State;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -42,12 +43,17 @@
   @JsonProperty("state")
   private State state;
 
-  public ServiceUpdateEvent(String clusterName, MaintenanceState maintenanceState, String serviceName, State state) {
+  @JsonIgnore
+  private boolean stateChanged = false;
+
+  public ServiceUpdateEvent(String clusterName, MaintenanceState maintenanceState, String serviceName, State state,
+                            boolean stateChanged) {
     super(Type.SERVICE);
     this.clusterName = clusterName;
     this.maintenanceState = maintenanceState;
     this.serviceName = serviceName;
     this.state = state;
+    this.stateChanged = stateChanged;
   }
 
   public String getClusterName() {
@@ -82,6 +88,14 @@
     this.state = state;
   }
 
+  public boolean isStateChanged() {
+    return stateChanged;
+  }
+
+  public void setStateChanged(boolean stateChanged) {
+    this.stateChanged = stateChanged;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
index c35cc08..4cbac1f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
@@ -24,18 +24,14 @@
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.EagerSingleton;
-import org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
-import org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
 import org.apache.ambari.server.events.HostComponentUpdate;
 import org.apache.ambari.server.events.HostComponentsUpdateEvent;
 import org.apache.ambari.server.events.MaintenanceModeEvent;
 import org.apache.ambari.server.events.ServiceUpdateEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
-import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.State;
 
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
@@ -45,14 +41,10 @@
 @Singleton
 @EagerSingleton
 public class ServiceUpdateListener {
-  private Map<Long, Map<String, State>> states = new HashMap<>();
 
   private STOMPUpdatePublisher STOMPUpdatePublisher;
 
   @Inject
-  private ServiceDesiredStateDAO serviceDesiredStateDAO;
-
-  @Inject
   private Provider<Clusters> m_clusters;
 
   @Inject
@@ -75,15 +67,8 @@
       Long clusterId = clusterServices.getKey();
       String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName();
       for (String serviceName : clusterServices.getValue()) {
-        ServiceCalculatedState serviceCalculatedState = ServiceCalculatedStateFactory.getServiceStateProvider(serviceName);
-        State serviceState = serviceCalculatedState.getState(clusterName, serviceName);
-
-        // retrieve state from cache
-        if (states.containsKey(clusterId) && states.get(clusterId).containsKey(serviceName) && states.get(clusterId).get(serviceName).equals(serviceState)) {
-          continue;
-        }
-        states.computeIfAbsent(clusterId, c -> new HashMap<>()).put(serviceName, serviceState);
-        STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, serviceState));
+        STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, null,
+            true));
       }
     }
   }
@@ -99,6 +84,7 @@
 
     MaintenanceState maintenanceState = event.getMaintenanceState();
 
-    STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, maintenanceState, serviceName, null));
+    STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, maintenanceState, serviceName, null,
+        false));
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index e02785f..25d396c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -24,8 +24,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Singleton;
@@ -34,31 +32,25 @@
 public abstract class BufferedUpdateEventPublisher<T> {
 
   private static final long TIMEOUT = 1000L;
-  private final AtomicLong previousTime = new AtomicLong(0);
-  private final AtomicBoolean collecting = new AtomicBoolean(false);
   private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>();
-  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+  private ScheduledExecutorService scheduledExecutorService;
 
   public void publish(T event, EventBus m_eventBus) {
-    long eventTime = System.currentTimeMillis();
-    if ((eventTime - previousTime.get() <= TIMEOUT) && !collecting.get()) {
-      buffer.add(event);
-      collecting.set(true);
-      scheduledExecutorService.schedule(getScheduledPublisher(m_eventBus),
-          TIMEOUT, TimeUnit.MILLISECONDS);
-    } else if (collecting.get()) {
-      buffer.add(event);
-    } else {
-      //TODO add logging and metrics posting
-      previousTime.set(eventTime);
-      m_eventBus.post(event);
+    if (scheduledExecutorService == null) {
+      scheduledExecutorService =
+          Executors.newScheduledThreadPool(1);
+      scheduledExecutorService
+          .scheduleWithFixedDelay(getScheduledPublisher(m_eventBus), TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS);
     }
+    buffer.add(event);
   }
 
-  protected abstract Runnable getScheduledPublisher(EventBus m_eventBus);
+  protected MergingRunnable getScheduledPublisher(EventBus m_eventBus) {
+    return new MergingRunnable(m_eventBus);
+  }
 
   protected List<T> retrieveBuffer() {
-    resetCollecting();
     List<T> bufferContent = new ArrayList<>();
     while (!buffer.isEmpty()) {
       bufferContent.add(buffer.poll());
@@ -66,8 +58,23 @@
     return bufferContent;
   }
 
-  protected void resetCollecting() {
-    previousTime.set(System.currentTimeMillis());
-    collecting.set(false);
+  public abstract void mergeBufferAndPost(List<T> events, EventBus m_eventBus);
+
+  private class MergingRunnable implements Runnable {
+
+    private final EventBus m_eventBus;
+
+    public MergingRunnable(EventBus m_eventBus) {
+      this.m_eventBus = m_eventBus;
+    }
+
+    @Override
+    public final void run() {
+      List<T> events = retrieveBuffer();
+      if (events.isEmpty()) {
+        return;
+      }
+      mergeBufferAndPost(events, m_eventBus);
+    }
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index f7fea1d..d9f51e8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -31,30 +31,12 @@
 public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
 
   @Override
-  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
-    return new HostComponentsEventRunnable(m_eventBus);
-  }
+  public void mergeBufferAndPost(List<HostComponentsUpdateEvent> events, EventBus m_eventBus) {
+    List<HostComponentUpdate> hostComponentUpdates = events.stream().flatMap(
+        u -> u.getHostComponentUpdates().stream()).collect(Collectors.toList());
 
-  private class HostComponentsEventRunnable implements Runnable {
-
-    private final EventBus eventBus;
-
-    public HostComponentsEventRunnable(EventBus eventBus) {
-      this.eventBus = eventBus;
-    }
-
-    @Override
-    public void run() {
-      List<HostComponentsUpdateEvent> hostComponentUpdateEvents = retrieveBuffer();
-      if (hostComponentUpdateEvents.isEmpty()) {
-        return;
-      }
-      List<HostComponentUpdate> hostComponentUpdates = hostComponentUpdateEvents.stream().flatMap(
-          u -> u.getHostComponentUpdates().stream()).collect(Collectors.toList());
-
-      HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates);
-      //TODO add logging and metrics posting
-      eventBus.post(resultEvents);
-    }
+    HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates);
+    //TODO add logging and metrics posting
+    m_eventBus.post(resultEvents);
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
index e080bd9..42f22ba 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
@@ -18,9 +18,9 @@
 
 package org.apache.ambari.server.events.publishers;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.events.RequestUpdateEvent;
@@ -35,11 +35,7 @@
 import com.google.inject.Singleton;
 
 @Singleton
-public class RequestUpdateEventPublisher {
-
-  private final Long TIMEOUT = 1000L;
-  private ConcurrentHashMap<Long, Long> previousTime = new ConcurrentHashMap<>();
-  private ConcurrentHashMap<Long, RequestUpdateEvent> buffer = new ConcurrentHashMap<>();
+public class RequestUpdateEventPublisher extends BufferedUpdateEventPublisher<RequestUpdateEvent> {
 
   @Inject
   private HostRoleCommandDAO hostRoleCommandDAO;
@@ -53,27 +49,25 @@
   @Inject
   private ClusterDAO clusterDAO;
 
-  public void publish(RequestUpdateEvent event, EventBus m_eventBus) {
-    Long eventTime = System.currentTimeMillis();
-    Long requestId = event.getRequestId();
-    if (!previousTime.containsKey(requestId)) {
-      previousTime.put(requestId, 0L);
+  @Override
+  public void mergeBufferAndPost(List<RequestUpdateEvent> events, EventBus m_eventBus) {
+    Map<Long, RequestUpdateEvent> filteredRequests = new HashMap<>();
+    for (RequestUpdateEvent event : events) {
+      Long requestId = event.getRequestId();
+      if (filteredRequests.containsKey(requestId)) {
+        RequestUpdateEvent filteredRequest = filteredRequests.get(requestId);
+        filteredRequest.setEndTime(event.getEndTime());
+        filteredRequest.setRequestStatus(event.getRequestStatus());
+        filteredRequest.setRequestContext(event.getRequestContext());
+        filteredRequest.getHostRoleCommands().removeAll(event.getHostRoleCommands());
+        filteredRequest.getHostRoleCommands().addAll(event.getHostRoleCommands());
+      } else {
+        filteredRequests.put(requestId, event);
+      }
     }
-    if (eventTime - previousTime.get(requestId) <= TIMEOUT && !buffer.containsKey(requestId)) {
-      buffer.put(event.getRequestId(), event);
-      Executors.newScheduledThreadPool(1).schedule(new RequestEventRunnable(requestId, m_eventBus),
-          TIMEOUT, TimeUnit.MILLISECONDS);
-    } else if (buffer.containsKey(requestId)) {
-      //merge available buffer content with arrived
-      buffer.get(requestId).setEndTime(event.getEndTime());
-      buffer.get(requestId).setRequestStatus(event.getRequestStatus());
-      buffer.get(requestId).setRequestContext(event.getRequestContext());
-      buffer.get(requestId).getHostRoleCommands().removeAll(event.getHostRoleCommands());
-      buffer.get(requestId).getHostRoleCommands().addAll(event.getHostRoleCommands());
-    } else {
-      previousTime.put(requestId, eventTime);
-      //TODO add logging and metrics posting
-      m_eventBus.post(fillRequest(event));
+    for (RequestUpdateEvent requestUpdateEvent : filteredRequests.values()) {
+      RequestUpdateEvent filled = fillRequest(requestUpdateEvent);
+      m_eventBus.post(filled);
     }
   }
 
@@ -94,24 +88,4 @@
     }
     return event;
   }
-
-  private class RequestEventRunnable implements Runnable {
-
-    private final long requestId;
-    private final EventBus eventBus;
-
-    public RequestEventRunnable(long requestId, EventBus eventBus) {
-      this.requestId = requestId;
-      this.eventBus = eventBus;
-    }
-
-    @Override
-    public void run() {
-      RequestUpdateEvent resultEvent = buffer.get(requestId);
-      //TODO add logging and metrics posting
-      eventBus.post(fillRequest(resultEvent));
-      buffer.remove(requestId);
-      previousTime.remove(requestId);
-    }
-  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 8f45859..6dfef43 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -19,52 +19,62 @@
 package org.apache.ambari.server.events.publishers;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
+import org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
 import org.apache.ambari.server.events.ServiceUpdateEvent;
+import org.apache.ambari.server.state.State;
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Singleton;
 
 @Singleton
 public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<ServiceUpdateEvent> {
+  private Map<String, Map<String, State>> states = new HashMap<>();
+
 
   @Override
-  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
-    return new ServiceEventRunnable(m_eventBus);
-  }
-
-  private class ServiceEventRunnable implements Runnable {
-
-    private final EventBus eventBus;
-
-    public ServiceEventRunnable(EventBus eventBus) {
-      this.eventBus = eventBus;
-    }
-
-    @Override
-    public void run() {
-      List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer();
-      if (serviceUpdates.isEmpty()) {
-        return;
-      }
-      List<ServiceUpdateEvent> filtered = new ArrayList<>();
-      for (ServiceUpdateEvent event : serviceUpdates) {
-        int pos = filtered.indexOf(event);
-        if (pos != -1) {
-          if (event.getState() != null) {
-            filtered.get(pos).setState(event.getState());
-          }
-          if (event.getMaintenanceState() != null) {
-            filtered.get(pos).setMaintenanceState(event.getMaintenanceState());
-          }
-        } else {
-          filtered.add(event);
+  public void mergeBufferAndPost(List<ServiceUpdateEvent> events, EventBus eventBus) {
+    List<ServiceUpdateEvent> filtered = new ArrayList<>();
+    for (ServiceUpdateEvent event : events) {
+      int pos = filtered.indexOf(event);
+      if (pos != -1) {
+        if (event.isStateChanged()) {
+          filtered.get(pos).setStateChanged(true);
         }
+        if (event.getMaintenanceState() != null) {
+          filtered.get(pos).setMaintenanceState(event.getMaintenanceState());
+        }
+      } else {
+        filtered.add(event);
       }
-      for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) {
-        eventBus.post(serviceUpdateEvent);
+    }
+    for (ServiceUpdateEvent serviceUpdateEvent : filtered) {
+      // calc state
+      if (serviceUpdateEvent.isStateChanged()) {
+        ServiceCalculatedState serviceCalculatedState =
+            ServiceCalculatedStateFactory.getServiceStateProvider(serviceUpdateEvent.getServiceName());
+        State serviceState =
+            serviceCalculatedState.getState(serviceUpdateEvent.getClusterName(), serviceUpdateEvent.getServiceName());
+
+        String serviceName = serviceUpdateEvent.getServiceName();
+        String clusterName = serviceUpdateEvent.getClusterName();
+
+        // retrieve state from cache
+        // don't send update when state was not changed and update doesn't have maintenance info
+        if (states.containsKey(clusterName) && states.get(clusterName).containsKey(serviceName)
+            && states.get(clusterName).get(serviceName).equals(serviceState)
+            && serviceUpdateEvent.getMaintenanceState() == null) {
+          continue;
+        }
+        states.computeIfAbsent(clusterName, c -> new HashMap<>()).put(serviceName, serviceState);
+        serviceUpdateEvent.setState(serviceState);
       }
+
+      eventBus.post(serviceUpdateEvent);
     }
   }
 }