AMBARI-24762. Ambari server continues to send request updates after all commands were completed. (#2448)
* AMBARI-24762. Ambari server continues to send request updates after all commands were completed. (mpapirkovskyy)
* AMBARI-24762. Ambari server continues to send request updates after all commands were completed. (mpapirkovskyy)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 2dfedb2..735a774 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -895,6 +895,9 @@
}
updateRoleStats(status, roleStats.get(roleStr));
+ if (status == HostRoleStatus.FAILED) {
+ LOG.info("Role {} on host {} was failed", roleStr, host);
+ }
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
index 34bc106..2492929 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
@@ -21,6 +21,7 @@
import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.State;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -42,12 +43,17 @@
@JsonProperty("state")
private State state;
- public ServiceUpdateEvent(String clusterName, MaintenanceState maintenanceState, String serviceName, State state) {
+ @JsonIgnore
+ private boolean stateChanged = false;
+
+ public ServiceUpdateEvent(String clusterName, MaintenanceState maintenanceState, String serviceName, State state,
+ boolean stateChanged) {
super(Type.SERVICE);
this.clusterName = clusterName;
this.maintenanceState = maintenanceState;
this.serviceName = serviceName;
this.state = state;
+ this.stateChanged = stateChanged;
}
public String getClusterName() {
@@ -82,6 +88,14 @@
this.state = state;
}
+ public boolean isStateChanged() {
+ return stateChanged;
+ }
+
+ public void setStateChanged(boolean stateChanged) {
+ this.stateChanged = stateChanged;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
index c35cc08..4cbac1f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
@@ -24,18 +24,14 @@
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.EagerSingleton;
-import org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
-import org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
import org.apache.ambari.server.events.HostComponentUpdate;
import org.apache.ambari.server.events.HostComponentsUpdateEvent;
import org.apache.ambari.server.events.MaintenanceModeEvent;
import org.apache.ambari.server.events.ServiceUpdateEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
-import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.State;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
@@ -45,14 +41,10 @@
@Singleton
@EagerSingleton
public class ServiceUpdateListener {
- private Map<Long, Map<String, State>> states = new HashMap<>();
private STOMPUpdatePublisher STOMPUpdatePublisher;
@Inject
- private ServiceDesiredStateDAO serviceDesiredStateDAO;
-
- @Inject
private Provider<Clusters> m_clusters;
@Inject
@@ -75,15 +67,8 @@
Long clusterId = clusterServices.getKey();
String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName();
for (String serviceName : clusterServices.getValue()) {
- ServiceCalculatedState serviceCalculatedState = ServiceCalculatedStateFactory.getServiceStateProvider(serviceName);
- State serviceState = serviceCalculatedState.getState(clusterName, serviceName);
-
- // retrieve state from cache
- if (states.containsKey(clusterId) && states.get(clusterId).containsKey(serviceName) && states.get(clusterId).get(serviceName).equals(serviceState)) {
- continue;
- }
- states.computeIfAbsent(clusterId, c -> new HashMap<>()).put(serviceName, serviceState);
- STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, serviceState));
+ STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, null,
+ true));
}
}
}
@@ -99,6 +84,7 @@
MaintenanceState maintenanceState = event.getMaintenanceState();
- STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, maintenanceState, serviceName, null));
+ STOMPUpdatePublisher.publish(new ServiceUpdateEvent(clusterName, maintenanceState, serviceName, null,
+ false));
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index e02785f..25d396c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -24,8 +24,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import com.google.common.eventbus.EventBus;
import com.google.inject.Singleton;
@@ -34,31 +32,25 @@
public abstract class BufferedUpdateEventPublisher<T> {
private static final long TIMEOUT = 1000L;
- private final AtomicLong previousTime = new AtomicLong(0);
- private final AtomicBoolean collecting = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>();
- private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+ private ScheduledExecutorService scheduledExecutorService;
public void publish(T event, EventBus m_eventBus) {
- long eventTime = System.currentTimeMillis();
- if ((eventTime - previousTime.get() <= TIMEOUT) && !collecting.get()) {
- buffer.add(event);
- collecting.set(true);
- scheduledExecutorService.schedule(getScheduledPublisher(m_eventBus),
- TIMEOUT, TimeUnit.MILLISECONDS);
- } else if (collecting.get()) {
- buffer.add(event);
- } else {
- //TODO add logging and metrics posting
- previousTime.set(eventTime);
- m_eventBus.post(event);
+ if (scheduledExecutorService == null) {
+ scheduledExecutorService =
+ Executors.newScheduledThreadPool(1);
+ scheduledExecutorService
+ .scheduleWithFixedDelay(getScheduledPublisher(m_eventBus), TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS);
}
+ buffer.add(event);
}
- protected abstract Runnable getScheduledPublisher(EventBus m_eventBus);
+ protected MergingRunnable getScheduledPublisher(EventBus m_eventBus) {
+ return new MergingRunnable(m_eventBus);
+ }
protected List<T> retrieveBuffer() {
- resetCollecting();
List<T> bufferContent = new ArrayList<>();
while (!buffer.isEmpty()) {
bufferContent.add(buffer.poll());
@@ -66,8 +58,23 @@
return bufferContent;
}
- protected void resetCollecting() {
- previousTime.set(System.currentTimeMillis());
- collecting.set(false);
+ public abstract void mergeBufferAndPost(List<T> events, EventBus m_eventBus);
+
+ private class MergingRunnable implements Runnable {
+
+ private final EventBus m_eventBus;
+
+ public MergingRunnable(EventBus m_eventBus) {
+ this.m_eventBus = m_eventBus;
+ }
+
+ @Override
+ public final void run() {
+ List<T> events = retrieveBuffer();
+ if (events.isEmpty()) {
+ return;
+ }
+ mergeBufferAndPost(events, m_eventBus);
+ }
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index f7fea1d..d9f51e8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -31,30 +31,12 @@
public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
@Override
- protected Runnable getScheduledPublisher(EventBus m_eventBus) {
- return new HostComponentsEventRunnable(m_eventBus);
- }
+ public void mergeBufferAndPost(List<HostComponentsUpdateEvent> events, EventBus m_eventBus) {
+ List<HostComponentUpdate> hostComponentUpdates = events.stream().flatMap(
+ u -> u.getHostComponentUpdates().stream()).collect(Collectors.toList());
- private class HostComponentsEventRunnable implements Runnable {
-
- private final EventBus eventBus;
-
- public HostComponentsEventRunnable(EventBus eventBus) {
- this.eventBus = eventBus;
- }
-
- @Override
- public void run() {
- List<HostComponentsUpdateEvent> hostComponentUpdateEvents = retrieveBuffer();
- if (hostComponentUpdateEvents.isEmpty()) {
- return;
- }
- List<HostComponentUpdate> hostComponentUpdates = hostComponentUpdateEvents.stream().flatMap(
- u -> u.getHostComponentUpdates().stream()).collect(Collectors.toList());
-
- HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates);
- //TODO add logging and metrics posting
- eventBus.post(resultEvents);
- }
+ HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates);
+ //TODO add logging and metrics posting
+ m_eventBus.post(resultEvents);
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
index e080bd9..42f22ba 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
@@ -18,9 +18,9 @@
package org.apache.ambari.server.events.publishers;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.events.RequestUpdateEvent;
@@ -35,11 +35,7 @@
import com.google.inject.Singleton;
@Singleton
-public class RequestUpdateEventPublisher {
-
- private final Long TIMEOUT = 1000L;
- private ConcurrentHashMap<Long, Long> previousTime = new ConcurrentHashMap<>();
- private ConcurrentHashMap<Long, RequestUpdateEvent> buffer = new ConcurrentHashMap<>();
+public class RequestUpdateEventPublisher extends BufferedUpdateEventPublisher<RequestUpdateEvent> {
@Inject
private HostRoleCommandDAO hostRoleCommandDAO;
@@ -53,27 +49,25 @@
@Inject
private ClusterDAO clusterDAO;
- public void publish(RequestUpdateEvent event, EventBus m_eventBus) {
- Long eventTime = System.currentTimeMillis();
- Long requestId = event.getRequestId();
- if (!previousTime.containsKey(requestId)) {
- previousTime.put(requestId, 0L);
+ @Override
+ public void mergeBufferAndPost(List<RequestUpdateEvent> events, EventBus m_eventBus) {
+ Map<Long, RequestUpdateEvent> filteredRequests = new HashMap<>();
+ for (RequestUpdateEvent event : events) {
+ Long requestId = event.getRequestId();
+ if (filteredRequests.containsKey(requestId)) {
+ RequestUpdateEvent filteredRequest = filteredRequests.get(requestId);
+ filteredRequest.setEndTime(event.getEndTime());
+ filteredRequest.setRequestStatus(event.getRequestStatus());
+ filteredRequest.setRequestContext(event.getRequestContext());
+ filteredRequest.getHostRoleCommands().removeAll(event.getHostRoleCommands());
+ filteredRequest.getHostRoleCommands().addAll(event.getHostRoleCommands());
+ } else {
+ filteredRequests.put(requestId, event);
+ }
}
- if (eventTime - previousTime.get(requestId) <= TIMEOUT && !buffer.containsKey(requestId)) {
- buffer.put(event.getRequestId(), event);
- Executors.newScheduledThreadPool(1).schedule(new RequestEventRunnable(requestId, m_eventBus),
- TIMEOUT, TimeUnit.MILLISECONDS);
- } else if (buffer.containsKey(requestId)) {
- //merge available buffer content with arrived
- buffer.get(requestId).setEndTime(event.getEndTime());
- buffer.get(requestId).setRequestStatus(event.getRequestStatus());
- buffer.get(requestId).setRequestContext(event.getRequestContext());
- buffer.get(requestId).getHostRoleCommands().removeAll(event.getHostRoleCommands());
- buffer.get(requestId).getHostRoleCommands().addAll(event.getHostRoleCommands());
- } else {
- previousTime.put(requestId, eventTime);
- //TODO add logging and metrics posting
- m_eventBus.post(fillRequest(event));
+ for (RequestUpdateEvent requestUpdateEvent : filteredRequests.values()) {
+ RequestUpdateEvent filled = fillRequest(requestUpdateEvent);
+ m_eventBus.post(filled);
}
}
@@ -94,24 +88,4 @@
}
return event;
}
-
- private class RequestEventRunnable implements Runnable {
-
- private final long requestId;
- private final EventBus eventBus;
-
- public RequestEventRunnable(long requestId, EventBus eventBus) {
- this.requestId = requestId;
- this.eventBus = eventBus;
- }
-
- @Override
- public void run() {
- RequestUpdateEvent resultEvent = buffer.get(requestId);
- //TODO add logging and metrics posting
- eventBus.post(fillRequest(resultEvent));
- buffer.remove(requestId);
- previousTime.remove(requestId);
- }
- }
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 8f45859..6dfef43 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -19,52 +19,62 @@
package org.apache.ambari.server.events.publishers;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
+import org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
import org.apache.ambari.server.events.ServiceUpdateEvent;
+import org.apache.ambari.server.state.State;
import com.google.common.eventbus.EventBus;
import com.google.inject.Singleton;
@Singleton
public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<ServiceUpdateEvent> {
+ private Map<String, Map<String, State>> states = new HashMap<>();
+
@Override
- protected Runnable getScheduledPublisher(EventBus m_eventBus) {
- return new ServiceEventRunnable(m_eventBus);
- }
-
- private class ServiceEventRunnable implements Runnable {
-
- private final EventBus eventBus;
-
- public ServiceEventRunnable(EventBus eventBus) {
- this.eventBus = eventBus;
- }
-
- @Override
- public void run() {
- List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer();
- if (serviceUpdates.isEmpty()) {
- return;
- }
- List<ServiceUpdateEvent> filtered = new ArrayList<>();
- for (ServiceUpdateEvent event : serviceUpdates) {
- int pos = filtered.indexOf(event);
- if (pos != -1) {
- if (event.getState() != null) {
- filtered.get(pos).setState(event.getState());
- }
- if (event.getMaintenanceState() != null) {
- filtered.get(pos).setMaintenanceState(event.getMaintenanceState());
- }
- } else {
- filtered.add(event);
+ public void mergeBufferAndPost(List<ServiceUpdateEvent> events, EventBus eventBus) {
+ List<ServiceUpdateEvent> filtered = new ArrayList<>();
+ for (ServiceUpdateEvent event : events) {
+ int pos = filtered.indexOf(event);
+ if (pos != -1) {
+ if (event.isStateChanged()) {
+ filtered.get(pos).setStateChanged(true);
}
+ if (event.getMaintenanceState() != null) {
+ filtered.get(pos).setMaintenanceState(event.getMaintenanceState());
+ }
+ } else {
+ filtered.add(event);
}
- for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) {
- eventBus.post(serviceUpdateEvent);
+ }
+ for (ServiceUpdateEvent serviceUpdateEvent : filtered) {
+ // calc state
+ if (serviceUpdateEvent.isStateChanged()) {
+ ServiceCalculatedState serviceCalculatedState =
+ ServiceCalculatedStateFactory.getServiceStateProvider(serviceUpdateEvent.getServiceName());
+ State serviceState =
+ serviceCalculatedState.getState(serviceUpdateEvent.getClusterName(), serviceUpdateEvent.getServiceName());
+
+ String serviceName = serviceUpdateEvent.getServiceName();
+ String clusterName = serviceUpdateEvent.getClusterName();
+
+ // retrieve state from cache
+ // don't send update when state was not changed and update doesn't have maintenance info
+ if (states.containsKey(clusterName) && states.get(clusterName).containsKey(serviceName)
+ && states.get(clusterName).get(serviceName).equals(serviceState)
+ && serviceUpdateEvent.getMaintenanceState() == null) {
+ continue;
+ }
+ states.computeIfAbsent(clusterName, c -> new HashMap<>()).put(serviceName, serviceState);
+ serviceUpdateEvent.setState(serviceState);
}
+
+ eventBus.post(serviceUpdateEvent);
}
}
}