Merge branch 'master' into jira/solr-14749
diff --git a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
index fd77413..1558f1c 100644
--- a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
+++ b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
@@ -85,9 +85,18 @@
   }
 
   public static List<Api> getApis(Object obj) {
-    return getApis(obj.getClass(), obj);
+    return getApis(obj.getClass(), obj, true);
   }
-  public static List<Api> getApis(Class<? extends Object> theClass , Object obj)  {
+
+  /**
+   * Get a list of Api-s supported by this class.
+   * @param theClass class
+   * @param obj object of this class (may be null)
+   * @param required if true then an exception is thrown if no Api-s can be retrieved, if false
+   *                then absence of Api-s is silently ignored.
+   * @return list of discovered Api-s
+   */
+  public static List<Api> getApis(Class<? extends Object> theClass , Object obj, boolean required)  {
     Class<?> klas = null;
     try {
       klas = MethodHandles.publicLookup().accessClass(theClass);
@@ -122,7 +131,7 @@
         SpecProvider specProvider = readSpec(endPoint, Collections.singletonList(m));
         apis.add(new AnnotatedApi(specProvider, endPoint, Collections.singletonMap("", cmd), null));
       }
-      if (apis.isEmpty()) {
+      if (required && apis.isEmpty()) {
         throw new RuntimeException("Invalid Class : " + klas.getName() + " No @EndPoints");
       }
 
diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
index 93de2e3..ec3e4e3 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
@@ -33,6 +33,9 @@
 import org.apache.lucene.util.ResourceLoaderAware;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.annotation.JsonProperty;
@@ -68,6 +71,7 @@
     refresh();
     return false;
   }
+
   public CustomContainerPlugins(CoreContainer coreContainer, ApiBag apiBag) {
     this.coreContainer = coreContainer;
     this.containerApiBag = apiBag;
@@ -78,6 +82,10 @@
     currentPlugins.forEach(ew.getBiConsumer());
   }
 
+  public synchronized ApiInfo getPlugin(String name) {
+    return currentPlugins.get(name);
+  }
+
   public synchronized void refresh() {
     Map<String, Object> pluginInfos = null;
     try {
@@ -107,6 +115,7 @@
       if (e.getValue() == Diff.REMOVED) {
         ApiInfo apiInfo = currentPlugins.remove(e.getKey());
         if (apiInfo == null) continue;
+        handleClusterSingleton(null, apiInfo);
         for (ApiHolder holder : apiInfo.holders) {
           Api old = containerApiBag.unregister(holder.api.getEndPoint().method()[0],
               getActualPath(apiInfo, holder.api.getEndPoint().path()[0]));
@@ -136,6 +145,7 @@
             containerApiBag.register(holder, getTemplateVars(apiInfo.info));
           }
           currentPlugins.put(e.getKey(), apiInfo);
+          handleClusterSingleton(apiInfo, null);
         } else {
           //this plugin is being updated
           ApiInfo old = currentPlugins.put(e.getKey(), apiInfo);
@@ -143,6 +153,7 @@
             //register all new paths
             containerApiBag.register(holder, getTemplateVars(apiInfo.info));
           }
+          handleClusterSingleton(apiInfo, old);
           if (old != null) {
             //this is an update of the plugin. But, it is possible that
             // some paths are remved in the newer version of the plugin
@@ -163,6 +174,47 @@
     }
   }
 
+  private void handleClusterSingleton(ApiInfo newApiInfo, ApiInfo oldApiInfo) {
+    if (newApiInfo != null) {
+      // register new api
+      Object instance = newApiInfo.getInstance();
+      if (instance instanceof ClusterSingleton) {
+        ClusterSingleton singleton = (ClusterSingleton) instance;
+        coreContainer.getClusterSingletons().getSingletons().put(singleton.getName(), singleton);
+        // easy check to see if we should immediately start this singleton
+        if (coreContainer.getClusterEventProducer() != null &&
+            coreContainer.getClusterEventProducer().isRunning()) {
+          try {
+            singleton.start();
+          } catch (Exception exc) {
+            log.warn("Exception starting ClusterSingleton {}: {}", newApiInfo, exc);
+          }
+        }
+      }
+      if (instance instanceof ClusterEventListener) {
+        // XXX nocommit obtain a list of supported event types from the config
+        ClusterEvent.EventType[] types = ClusterEvent.EventType.values();
+        try {
+          coreContainer.getClusterEventProducer().registerListener((ClusterEventListener) instance, types);
+        } catch (Exception exc) {
+          log.warn("Exception adding ClusterEventListener {}: {}", newApiInfo, exc);
+        }
+      }
+    }
+    if (oldApiInfo != null) {
+      // stop & unregister the old api
+      Object instance = oldApiInfo.getInstance();
+      if (instance instanceof ClusterSingleton) {
+        ClusterSingleton singleton = (ClusterSingleton) instance;
+        singleton.stop();
+        coreContainer.getClusterSingletons().getSingletons().remove(singleton.getName());
+      }
+      if (instance instanceof ClusterEventListener) {
+        coreContainer.getClusterEventProducer().unregisterListener((ClusterEventListener) instance);
+      }
+    }
+  }
+
   private static String getActualPath(ApiInfo apiInfo, String path) {
     path = path.replaceAll("\\$path-prefix", apiInfo.info.pathPrefix);
     path = path.replaceAll("\\$plugin-name", apiInfo.info.name);
@@ -222,6 +274,9 @@
       return null;
     }
 
+    public Object getInstance() {
+      return instance;
+    }
 
     @SuppressWarnings({"unchecked","rawtypes"})
     public ApiInfo(PluginMeta info, List<String> errs) {
@@ -268,14 +323,14 @@
       }
 
       try {
-        List<Api> apis = AnnotatedApi.getApis(klas, null);
+        List<Api> apis = AnnotatedApi.getApis(klas, null, false);
         for (Object api : apis) {
           EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
           if (endPoint.path().length > 1 || endPoint.method().length > 1) {
             errs.add("Only one HTTP method and url supported for each API");
           }
           if (endPoint.method().length != 1 || endPoint.path().length != 1) {
-            errs.add("The @EndPint must have exactly one method and path attributes");
+            errs.add("The @EndPoint must have exactly one method and path attributes");
           }
           List<String> pathSegments = StrUtils.splitSmart(endPoint.path()[0], '/', true);
           PathTrie.replaceTemplates(pathSegments, getTemplateVars(info));
@@ -320,7 +375,7 @@
         }
       }
       this.holders = new ArrayList<>();
-      for (Api api : AnnotatedApi.getApis(instance)) {
+      for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, false)) {
         holders.add(new ApiHolder((AnnotatedApi) api));
       }
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
new file mode 100644
index 0000000..1ae1eed
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+/**
+ * Intended for {@link org.apache.solr.core.CoreContainer} plugins that should be
+ * enabled only one instance per cluster.
+ * <p>Components that implement this interface are always in one of two states:
+ * <ul>
+ *   <li>STOPPED - the default state. The component is idle and does not perform
+ *   any functions. It should also avoid holding any resources.</li>
+ *   <li>RUNNING - the component is active.</li>
+ * </ul>
+ * <p>Components must be prepared to change these states multiple times in their
+ * life-cycle.</p>
+ * <p>Implementation detail: currently these plugins are instantiated on all nodes
+ * but they are started only on the Overseer leader, and stopped when the current
+ * node loses its leadership.</p>
+ */
+public interface ClusterSingleton {
+
+  /**
+   * Unique name of this singleton. Used for registration.
+   */
+  String getName();
+
+  /**
+   * Start the operation of the component. On return the component is assumed
+   * to be in the RUNNING state.
+   * @throws Exception on startup errors. The component should revert to the
+   * STOPPED state.
+   */
+  void start() throws Exception;
+
+  /**
+   * Returns true if the component is in the RUNNING state, false otherwise.
+   */
+  boolean isRunning();
+
+  /**
+   * Stop the operation of the component. On return the component is assumed
+   * to be in the STOPPED state. Components should also avoid holding any resources
+   * in the STOPPED state.
+   */
+  void stop();
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index bb405ad..2465f8a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,8 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 
 import org.apache.lucene.util.Version;
@@ -47,6 +49,7 @@
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.overseer.ZkStateWriter;
 import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.SolrException;
@@ -656,6 +659,8 @@
       }
     });
 
+    startClusterSingletons();
+
     assert ObjectReleaseTracker.track(this);
   }
 
@@ -775,6 +780,59 @@
     }
   }
 
+  /**
+   * Start {@link ClusterSingleton} plugins when we become the leader.
+   */
+  public void startClusterSingletons() {
+    CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
+    final Runnable initializer = () -> {
+      if (isClosed()) {
+        return;
+      }
+      try {
+        singletons.waitUntilReady(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        log.warn("Interrupted initialization of ClusterSingleton-s");
+        return;
+      } catch (TimeoutException te) {
+        log.warn("Timed out during initialization of ClusterSingleton-s");
+        return;
+      }
+      singletons.getSingletons().forEach((name, singleton) -> {
+        try {
+          singleton.start();
+          if (singleton instanceof ClusterEventListener) {
+            getCoreContainer().getClusterEventProducer().registerListener((ClusterEventListener) singleton);
+          }
+        } catch (Exception e) {
+          log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+        }
+      });
+    };
+    if (singletons.isReady()) {
+      // wait until all singleton-s are ready for the first startup
+      getCoreContainer().runAsync(initializer);
+    } else {
+      initializer.run();
+    }
+  }
+
+  /**
+   * Stop {@link ClusterSingleton} plugins when we lose leadership.
+   */
+  private void stopClusterSingletons() {
+    CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
+    if (singletons == null) {
+      return;
+    }
+    singletons.getSingletons().forEach((name, singleton) -> {
+      if (singleton instanceof ClusterEventListener) {
+        getCoreContainer().getClusterEventProducer().unregisterListener((ClusterEventListener) singleton);
+      }
+      singleton.stop();
+    });
+  }
+
   public Stats getStats() {
     return stats;
   }
@@ -814,9 +872,14 @@
     if (this.id != null) {
       log.info("Overseer (id={}) closing", id);
     }
+    // stop singletons only on the leader
+    if (!this.closed) {
+      stopClusterSingletons();
+    }
     this.closed = true;
     doClose();
 
+
     assert ObjectReleaseTracker.release(this);
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
new file mode 100644
index 0000000..2dc7a32
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.common.MapWriter;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Cluster-level event.
+ */
+public interface ClusterEvent extends MapWriter {
+
+  enum EventType {
+    /** One or more nodes went down. */
+    NODES_DOWN,
+    /** One or more nodes went up. */
+    NODES_UP,
+    /** One or more collections have been added. */
+    COLLECTIONS_ADDED,
+    /** One or more collections have been removed. */
+    COLLECTIONS_REMOVED,
+    /** One or more replicas went down. */
+    REPLICAS_DOWN,
+    /** Cluster properties have changed. */
+    CLUSTER_PROPERTIES_CHANGED
+    // other types? eg. Overseer leader change, shard leader change,
+    // node overload (eg. CPU / MEM circuit breakers tripped)?
+  }
+
+  /** Get event type. */
+  EventType getType();
+
+  /** Get event timestamp. This is the instant when the event was generated (not necessarily when
+   * the underlying condition first occurred). */
+  Instant getTimestamp();
+
+  default void writeMap(EntryWriter ew) throws IOException {
+    ew.put("type", getType());
+    ew.put("timestamp", getTimestamp().toEpochMilli());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
new file mode 100644
index 0000000..592f118
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.cloud.ClusterSingleton;
+
+/**
+ * Components that want to be notified of cluster-wide events should use this.
+ *
+ * XXX should this work only for ClusterSingleton-s? some types of events may be
+ * XXX difficult (or pointless) to propagate to every node.
+ */
+public interface ClusterEventListener extends ClusterSingleton {
+
+  /**
+   * Handle the event. Implementations should be non-blocking - if any long
+   * processing is needed it should be performed asynchronously.
+   * @param event cluster event
+   */
+  void onEvent(ClusterEvent event);
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
new file mode 100644
index 0000000..1c2327b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.cloud.ClusterSingleton;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Component that produces {@link ClusterEvent} instances.
+ */
+public interface ClusterEventProducer extends ClusterSingleton {
+
+  String PLUGIN_NAME = "clusterEventProducer";
+
+  default String getName() {
+    return PLUGIN_NAME;
+  }
+
+  /**
+   * Returns a modifiable map of event types and listeners to process events
+   * of a given type.
+   */
+  Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners();
+
+  /**
+   * Register an event listener for processing the specified event types.
+   * @param listener non-null listener. If the same instance of the listener is
+   *                 already registered it will be ignored.
+   * @param eventTypes non-empty array of event types that this listener
+   *                   is being registered for. If this is null or empty then all types will be used.
+   */
+  default void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) throws Exception {
+    Objects.requireNonNull(listener);
+    if (eventTypes == null || eventTypes.length == 0) {
+      eventTypes = ClusterEvent.EventType.values();
+    }
+    for (ClusterEvent.EventType type : eventTypes) {
+      Set<ClusterEventListener> perType = getEventListeners().computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet());
+      perType.add(listener);
+    }
+  }
+
+  /**
+   * Unregister an event listener.
+   * @param listener non-null listener.
+   */
+  default void unregisterListener(ClusterEventListener listener) {
+    Objects.requireNonNull(listener);
+    getEventListeners().forEach((type, listeners) -> {
+      listeners.remove(listener);
+    });
+  }
+
+  /**
+   * Unregister an event listener for specified event types.
+   * @param listener non-null listener.
+   * @param eventTypes event types from which the listener will be unregistered. If this
+   *                   is null or empty then all event types will be used
+   */
+  default void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+    Objects.requireNonNull(listener);
+    if (eventTypes == null || eventTypes.length == 0) {
+      eventTypes = ClusterEvent.EventType.values();
+    }
+    for (ClusterEvent.EventType type : eventTypes) {
+      getEventListeners()
+          .getOrDefault(type, Collections.emptySet())
+          .remove(listener);
+    }
+  }
+
+  /**
+   * Fire an event. This method will call registered listeners that subscribed to the
+   * type of event being passed.
+   * @param event cluster event
+   */
+  default void fireEvent(ClusterEvent event) {
+    getEventListeners().getOrDefault(event.getType(), Collections.emptySet())
+        .forEach(listener -> listener.onEvent(event));
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
new file mode 100644
index 0000000..ad9c0b8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Event generated when {@link org.apache.solr.common.cloud.ZkStateReader#CLUSTER_PROPS} is modified.
+ */
+public interface ClusterPropertiesChangedEvent extends ClusterEvent {
+
+  @Override
+  default EventType getType() {
+    return EventType.CLUSTER_PROPERTIES_CHANGED;
+  }
+
+  Map<String, Object> getNewClusterProperties();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("newClusterProperties", getNewClusterProperties());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
new file mode 100644
index 0000000..78046f8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some collections have been added.
+ */
+public interface CollectionsAddedEvent extends ClusterEvent {
+
+  @Override
+  default EventType getType() {
+    return EventType.COLLECTIONS_ADDED;
+  }
+
+  Iterator<String> getCollectionNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("collectionNames", getCollectionNames());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
new file mode 100644
index 0000000..a93be4c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some collections have been removed.
+ */
+public interface CollectionsRemovedEvent extends ClusterEvent {
+
+  @Override
+  default EventType getType() {
+    return EventType.COLLECTIONS_REMOVED;
+  }
+
+  Iterator<String> getCollectionNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("collectionNames", getCollectionNames());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
new file mode 100644
index 0000000..5001ccb
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some nodes went down.
+ */
+public interface NodesDownEvent extends ClusterEvent {
+
+  @Override
+  default EventType getType() {
+    return EventType.NODES_DOWN;
+  }
+
+  Iterator<String> getNodeNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("nodeNames", getNodeNames());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
new file mode 100644
index 0000000..fa08f85
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some nodes went up.
+ */
+public interface NodesUpEvent extends ClusterEvent {
+
+  @Override
+  default EventType getType() {
+    return EventType.NODES_UP;
+  }
+
+  Iterator<String> getNodeNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("nodeNames", getNodeNames());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
new file mode 100644
index 0000000..1d3ce9b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.common.cloud.Replica;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some replicas went down.
+ */
+public interface ReplicasDownEvent extends ClusterEvent {
+
+  @Override
+  default EventType getType() {
+    return EventType.REPLICAS_DOWN;
+  }
+
+  Iterator<Replica> getReplicas();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("replicas", getReplicas());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
new file mode 100644
index 0000000..034fa8a
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cluster.events.CollectionsAddedEvent;
+import org.apache.solr.cluster.events.CollectionsRemovedEvent;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.NodesUpEvent;
+import org.apache.solr.common.cloud.CloudCollectionsListener;
+import org.apache.solr.common.cloud.ClusterPropertiesListener;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ClusterEventProducer}.
+ * <h3>Implementation notes</h3>
+ * <p>For each cluster event relevant listeners are always invoked sequentially
+ * (not in parallel) and in arbitrary order. This means that if any listener blocks the
+ * processing other listeners may be invoked much later or not at all.</p>
+ */
+public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSingleton {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
+  private CoreContainer coreContainer;
+  private LiveNodesListener liveNodesListener;
+  private CloudCollectionsListener cloudCollectionsListener;
+  private ClusterPropertiesListener clusterPropertiesListener;
+  private ZkController zkController;
+  private volatile boolean running;
+
+  private final Set<ClusterEvent.EventType> supportedEvents =
+      new HashSet<>(Arrays.asList(
+          ClusterEvent.EventType.NODES_DOWN,
+          ClusterEvent.EventType.NODES_UP,
+          ClusterEvent.EventType.COLLECTIONS_ADDED,
+          ClusterEvent.EventType.COLLECTIONS_REMOVED,
+          ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
+      ));
+
+  public ClusterEventProducerImpl(CoreContainer coreContainer) {
+    this.coreContainer = coreContainer;
+  }
+
+  // ClusterSingleton lifecycle methods
+  @Override
+  public void start() {
+    if (coreContainer == null) {
+      liveNodesListener = null;
+      cloudCollectionsListener = null;
+      clusterPropertiesListener = null;
+      return;
+    }
+    this.zkController = this.coreContainer.getZkController();
+
+    // clean up any previous instances
+    doStop();
+
+    // register liveNodesListener
+    liveNodesListener = (oldNodes, newNodes) -> {
+      // already closed but still registered
+      if (!running) {
+        // remove the listener
+        return true;
+      }
+      // spurious event, ignore but keep listening
+      if (oldNodes.equals(newNodes)) {
+        return false;
+      }
+      final Instant now = Instant.now();
+      final Set<String> downNodes = new HashSet<>(oldNodes);
+      downNodes.removeAll(newNodes);
+      if (!downNodes.isEmpty()) {
+        fireEvent(new NodesDownEvent() {
+          @Override
+          public Iterator<String> getNodeNames() {
+            return downNodes.iterator();
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+      final Set<String> upNodes = new HashSet<>(newNodes);
+      upNodes.removeAll(oldNodes);
+      if (!upNodes.isEmpty()) {
+        fireEvent(new NodesUpEvent() {
+          @Override
+          public Iterator<String> getNodeNames() {
+            return upNodes.iterator();
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+      return false;
+    };
+    zkController.zkStateReader.registerLiveNodesListener(liveNodesListener);
+
+    cloudCollectionsListener = ((oldCollections, newCollections) -> {
+      if (oldCollections.equals(newCollections)) {
+        return;
+      }
+      final Instant now = Instant.now();
+      final Set<String> removed = new HashSet<>(oldCollections);
+      removed.removeAll(newCollections);
+      if (!removed.isEmpty()) {
+        fireEvent(new CollectionsRemovedEvent() {
+          @Override
+          public Iterator<String> getCollectionNames() {
+            return removed.iterator();
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+      final Set<String> added = new HashSet<>(newCollections);
+      added.removeAll(oldCollections);
+      if (!added.isEmpty()) {
+        fireEvent(new CollectionsAddedEvent() {
+          @Override
+          public Iterator<String> getCollectionNames() {
+            return added.iterator();
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+    });
+    zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
+
+    clusterPropertiesListener = (newProperties) -> {
+      fireEvent(new ClusterPropertiesChangedEvent() {
+        final Instant now = Instant.now();
+        @Override
+        public Map<String, Object> getNewClusterProperties() {
+          return newProperties;
+        }
+
+        @Override
+        public Instant getTimestamp() {
+          return now;
+        }
+      });
+      return false;
+    };
+    zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
+
+    // XXX register collection state listener?
+    // XXX not sure how to efficiently monitor for REPLICA_DOWN events
+
+    running = true;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running;
+  }
+
+  @Override
+  public void stop() {
+    doStop();
+    running = false;
+  }
+
+  private void doStop() {
+    if (liveNodesListener != null) {
+      zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
+    }
+    if (cloudCollectionsListener != null) {
+      zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
+    }
+    if (clusterPropertiesListener != null) {
+      zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
+    }
+    liveNodesListener = null;
+    cloudCollectionsListener = null;
+    clusterPropertiesListener = null;
+  }
+
+  @Override
+  public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) throws Exception {
+    try {
+      for (ClusterEvent.EventType type : eventTypes) {
+        if (!supportedEvents.contains(type)) {
+          log.warn("event type {} not supported yet.", type);
+        }
+      }
+    } catch (Throwable e) {
+      throw new Exception(e);
+    }
+    ClusterEventProducer.super.registerListener(listener, eventTypes);
+  }
+
+  @Override
+  public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+    return listeners;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
new file mode 100644
index 0000000..42dcde3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.ReplicasDownEvent;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an illustration how to re-implement the combination of 8x
+ * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replication factor.
+ * <p>NOTE: there's no support for 'waitFor' yet.</p>
+ * <p>NOTE 2: this functionality would be probably more reliable when executed also as a
+ * periodically scheduled check - both as a reactive (listener) and proactive (scheduled) measure.</p>
+ */
+public class CollectionsRepairEventListener implements ClusterEventListener {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String PLUGIN_NAME = "collectionsRepairListener";
+  private static final String ASYNC_ID_PREFIX = "_async_" + PLUGIN_NAME;
+  private static final AtomicInteger counter = new AtomicInteger();
+
+  private final SolrClient solrClient;
+  private final SolrCloudManager solrCloudManager;
+
+  private volatile boolean running = false;
+
+  public CollectionsRepairEventListener(CoreContainer cc) {
+    this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
+    this.solrCloudManager = cc.getZkController().getSolrCloudManager();
+  }
+
+  @Override
+  public String getName() {
+    return PLUGIN_NAME;
+  }
+
+  @Override
+  public void onEvent(ClusterEvent event) {
+    if (!isRunning()) {
+      // ignore the event
+      return;
+    }
+    switch (event.getType()) {
+      case NODES_DOWN:
+        handleNodesDown((NodesDownEvent) event);
+        break;
+      case REPLICAS_DOWN:
+        handleReplicasDown((ReplicasDownEvent) event);
+        break;
+      default:
+        log.warn("Unsupported event {}, ignoring...", event);
+    }
+  }
+
+  private void handleNodesDown(NodesDownEvent event) {
+    // collect all lost replicas
+    // collection / positions
+    Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
+    try {
+      ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
+      Set<String> lostNodeNames = new HashSet<>();
+      event.getNodeNames().forEachRemaining(lostNodeNames::add);
+      clusterState.forEachCollection(coll -> {
+        // shard / type / count
+        Map<String, Map<Replica.Type, AtomicInteger>> lostReplicas = new HashMap<>();
+        coll.forEachReplica((shard, replica) -> {
+          if (lostNodeNames.contains(replica.getNodeName())) {
+            lostReplicas.computeIfAbsent(shard, s -> new HashMap<>())
+                .computeIfAbsent(replica.type, t -> new AtomicInteger())
+                .incrementAndGet();
+          }
+        });
+        Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(solrCloudManager, clusterState, coll);
+        lostReplicas.forEach((shard, types) -> {
+          Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder()
+              .forCollection(coll.getName())
+              .forShard(Collections.singletonList(shard));
+          types.forEach((type, count) -> {
+            switch (type) {
+              case NRT:
+                assignRequestBuilder.assignNrtReplicas(count.get());
+                break;
+              case PULL:
+                assignRequestBuilder.assignPullReplicas(count.get());
+                break;
+              case TLOG:
+                assignRequestBuilder.assignTlogReplicas(count.get());
+                break;
+            }
+          });
+          Assign.AssignRequest assignRequest = assignRequestBuilder.build();
+          try {
+            List<ReplicaPosition> positions = assignStrategy.assign(solrCloudManager, assignRequest);
+            newPositions.put(coll.getName(), positions);
+          } catch (Exception e) {
+            log.warn("Exception computing positions for {}/{}: {}", coll.getName(), shard, e);
+            return;
+          }
+        });
+      });
+    } catch (IOException e) {
+      log.warn("Exception getting cluster state", e);
+      return;
+    }
+
+    // send ADDREPLICA admin requests for each lost replica
+    // XXX should we use 'async' for that, to avoid blocking here?
+    List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
+    newPositions.forEach((collection, positions) -> {
+      positions.forEach(position -> {
+        CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest
+            .addReplicaToShard(collection, position.shard, position.type);
+        addReplica.setNode(position.node);
+        addReplica.setAsyncId(ASYNC_ID_PREFIX + counter.incrementAndGet());
+        addReplicas.add(addReplica);
+      });
+    });
+    addReplicas.forEach(addReplica -> {
+      try {
+        solrClient.request(addReplica);
+      } catch (Exception e) {
+        log.warn("Exception calling ADDREPLICA {}: {}", addReplica.getParams().toQueryString(), e);
+      }
+    });
+
+    // ... and DELETERPLICA for lost ones?
+  }
+
+  private void handleReplicasDown(ReplicasDownEvent event) {
+    // compute new placements for all replicas that went down
+    // send ADDREPLICA admin request for each lost replica
+  }
+
+  @Override
+  public void start() throws Exception {
+    running = true;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running;
+  }
+
+  @Override
+  public void stop() {
+    running = false;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
new file mode 100644
index 0000000..2c115b6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+/** 
+ * Default implementation of {@link org.apache.solr.cluster.events.ClusterEventProducer}.
+ */
+package org.apache.solr.cluster.events.impl;
+
+
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/package-info.java b/solr/core/src/java/org/apache/solr/cluster/events/package-info.java
new file mode 100644
index 0000000..a334a00
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+/** 
+ * Interfaces defining support for cluster-level event generation and processing.
+ */
+package org.apache.solr.cluster.events;
+
+
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index bcc2039..1427dad 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -39,9 +39,12 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,8 +71,13 @@
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.impl.ClusterEventProducerImpl;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -168,6 +176,72 @@
     }
   }
 
+  public static class ClusterSingletons {
+    private Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
+    // we use this latch to delay the initial startup of singletons, due to
+    // the leader election occurring in parallel with the rest of the load() method.
+    private CountDownLatch readyLatch = new CountDownLatch(1);
+
+    public Map<String, ClusterSingleton> getSingletons() {
+      return singletonMap;
+    }
+
+    public boolean isReady() {
+      return readyLatch.getCount() > 0;
+    }
+
+    public void setReady() {
+      readyLatch.countDown();
+    }
+
+    public void waitUntilReady(long timeout, TimeUnit timeUnit)
+        throws InterruptedException, TimeoutException {
+      boolean await = readyLatch.await(timeout, timeUnit);
+      if (!await) {
+        throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
+      }
+    }
+  }
+
+  /**
+   * This class helps in handling the initial registration of plugin-based listeners,
+   * when both the final {@link ClusterEventProducer} implementation and listeners
+   * are configured using plugins.
+   */
+  public static class InitialClusterEventProducer implements ClusterEventProducer {
+    Map<ClusterEvent.EventType, Set<ClusterEventListener>> initialListeners = new HashMap<>();
+
+    @Override
+    public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+      return initialListeners;
+    }
+
+    public void transferListeners(ClusterEventProducer target) {
+      initialListeners.forEach((type, listeners) -> {
+        listeners.forEach(listener -> {
+          try {
+            target.registerListener(listener, type);
+          } catch (Exception e) {
+            log.warn("Unable to register event listener for type {}: {}", type, e);
+          }
+        });
+      });
+    }
+
+    @Override
+    public void start() throws Exception {
+    }
+
+    @Override
+    public boolean isRunning() {
+      return false;
+    }
+
+    @Override
+    public void stop() {
+    }
+  }
+
   private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
 
   /**
@@ -243,8 +317,11 @@
 
   private volatile SolrClientCache solrClientCache;
 
+  private volatile ClusterEventProducer clusterEventProducer = new InitialClusterEventProducer();
+
   private final ObjectCache objectCache = new ObjectCache();
 
+  private final ClusterSingletons clusterSingletons = new ClusterSingletons();
   private PackageStoreAPI packageStoreAPI;
   private PackageLoader packageLoader;
 
@@ -892,7 +969,31 @@
       ContainerPluginsApi containerPluginsApi = new ContainerPluginsApi(this);
       containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
       containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
+
+      // init ClusterSingleton-s
+
+      // register the handlers that are also ClusterSingleton
+      containerHandlers.keySet().forEach(handlerName -> {
+        SolrRequestHandler handler = containerHandlers.get(handlerName);
+        if (handler instanceof ClusterSingleton) {
+          clusterSingletons.singletonMap.put(handlerName, (ClusterSingleton) handler);
+        }
+      });
+      // create the ClusterEventProducer
+      InitialClusterEventProducer initialClusterEventProducer = (InitialClusterEventProducer) clusterEventProducer;
+      CustomContainerPlugins.ApiInfo clusterEventProducerInfo = customContainerPlugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
+      if (clusterEventProducerInfo != null) {
+        clusterEventProducer = (ClusterEventProducer) clusterEventProducerInfo.getInstance();
+      } else {
+        clusterEventProducer = new ClusterEventProducerImpl(this);
+        clusterSingletons.singletonMap.put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
+      }
+      // transfer those listeners that were already registered to the initial impl
+      initialClusterEventProducer.transferListeners(clusterEventProducer);
+
+      clusterSingletons.setReady();
       zkSys.getZkController().checkOverseerDesignate();
+
     }
     // This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
     status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
@@ -2090,6 +2191,14 @@
     return customContainerPlugins;
   }
 
+  public ClusterSingletons getClusterSingletons() {
+    return clusterSingletons;
+  }
+
+  public ClusterEventProducer getClusterEventProducer() {
+    return clusterEventProducer;
+  }
+
   static {
     ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
index 0c7a487..ad95423 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
@@ -51,7 +51,7 @@
 public class ContainerPluginsApi {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final String PLUGIN = "plugin";
+  public static final String PLUGINS = "plugin";
   private final Supplier<SolrZkClient> zkClientSupplier;
   private final CoreContainer coreContainer;
   public final Read readAPI = new Read();
@@ -67,7 +67,7 @@
         path = "/cluster/plugin",
         permission = PermissionNameProvider.Name.COLL_READ_PERM)
     public void list(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
-      rsp.add(PLUGIN, plugins(zkClientSupplier));
+      rsp.add(PLUGINS, plugins(zkClientSupplier));
     }
   }
 
@@ -151,7 +151,7 @@
     SolrZkClient zkClient = zkClientSupplier.get();
     try {
       Map<String, Object> clusterPropsJson = (Map<String, Object>) Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
-      return (Map<String, Object>) clusterPropsJson.computeIfAbsent(PLUGIN, o -> new LinkedHashMap<>());
+      return (Map<String, Object>) clusterPropsJson.computeIfAbsent(PLUGINS, o -> new LinkedHashMap<>());
     } catch (KeeperException.NoNodeException e) {
       return new LinkedHashMap<>();
     } catch (KeeperException | InterruptedException e) {
@@ -165,9 +165,9 @@
       zkClientSupplier.get().atomicUpdate(ZkStateReader.CLUSTER_PROPS, bytes -> {
         Map rawJson = bytes == null ? new LinkedHashMap() :
             (Map) Utils.fromJSON(bytes);
-        Map pluginsModified = modifier.apply((Map) rawJson.computeIfAbsent(PLUGIN, o -> new LinkedHashMap<>()));
+        Map pluginsModified = modifier.apply((Map) rawJson.computeIfAbsent(PLUGINS, o -> new LinkedHashMap<>()));
         if (pluginsModified == null) return null;
-        rawJson.put(PLUGIN, pluginsModified);
+        rawJson.put(PLUGINS, pluginsModified);
         return Utils.toJSON(rawJson);
       });
     } catch (KeeperException | InterruptedException e) {
diff --git a/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java b/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
index 424f604..044cf8c 100644
--- a/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
+++ b/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
@@ -52,6 +52,7 @@
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.filestore.DistribPackageStore;
+import org.apache.solr.handler.admin.ContainerPluginsApi;
 import org.apache.solr.packagemanager.SolrPackage.Command;
 import org.apache.solr.packagemanager.SolrPackage.Manifest;
 import org.apache.solr.packagemanager.SolrPackage.Plugin;
@@ -231,7 +232,7 @@
       }
     }
     @SuppressWarnings({"unchecked"})
-    Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault("plugin", Collections.emptyMap());
+    Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault(ContainerPluginsApi.PLUGINS, Collections.emptyMap());
     for (String key : clusterPlugins.keySet()) {
       // Map<String, String> pluginMeta = (Map<String, String>) clusterPlugins.get(key);
       PluginMeta pluginMeta;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index b6372a0..f6c07c1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -17,13 +17,7 @@
 package org.apache.solr.cloud;
 
 import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -56,6 +50,7 @@
 import org.apache.solr.cloud.overseer.NodeMutator;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.cluster.events.impl.ClusterEventProducerImpl;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -117,7 +112,6 @@
 
   private static SolrZkClient zkClient;
 
-
   private volatile boolean testDone = false;
 
   private final List<ZkController> zkControllers = Collections.synchronizedList(new ArrayList<>());
@@ -127,7 +121,6 @@
   private final List<HttpShardHandlerFactory> httpShardHandlerFactorys = Collections.synchronizedList(new ArrayList<>());
   private final List<UpdateShardHandler> updateShardHandlers = Collections.synchronizedList(new ArrayList<>());
   private final List<CloudSolrClient> solrClients = Collections.synchronizedList(new ArrayList<>());
-
   private static final String COLLECTION = SolrTestCaseJ4.DEFAULT_TEST_COLLECTION_NAME;
 
   public static class MockZKController{
@@ -306,6 +299,7 @@
   @Before
   public void setUp() throws Exception {
     testDone = false;
+
     super.setUp();
   }
 
@@ -322,6 +316,7 @@
     }
 
     server = null;
+
   }
 
   @After
@@ -1428,11 +1423,24 @@
         Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
     when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone);  // Allow retry on session expiry
     when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
+    CoreContainer.ClusterSingletons singletons = new CoreContainer.ClusterSingletons();
+    // don't wait for all singletons
+    singletons.setReady();
+    FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"), singletons);
+    ClusterEventProducerImpl clusterEventProducer = new ClusterEventProducerImpl(mockAlwaysUpCoreContainer);
+    when(mockAlwaysUpCoreContainer.getClusterEventProducer()).thenReturn(clusterEventProducer);
     FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient);
     FieldSetter.setField(zkController, ZkController.class.getDeclaredField("cc"), mockAlwaysUpCoreContainer);
     when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
     when(zkController.getZkClient()).thenReturn(zkClient);
     when(zkController.getZkStateReader()).thenReturn(reader);
+    // primitive support for CC.runAsync
+    doAnswer(invocable -> {
+      Runnable r = invocable.getArgument(0);
+      Thread t = new Thread(r);
+      t.start();
+      return null;
+    }).when(mockAlwaysUpCoreContainer).runAsync(any(Runnable.class));
 
     when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod();
     when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod();
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
new file mode 100644
index 0000000..8da6bea
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events;
+
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class AllEventsListener implements ClusterEventListener {
+  CountDownLatch eventLatch = new CountDownLatch(1);
+  ClusterEvent.EventType expectedType;
+  Map<ClusterEvent.EventType, List<ClusterEvent>> events = new HashMap<>();
+
+  @Override
+  public String getName() {
+    return "allEventsListener";
+  }
+
+  @Override
+  public void onEvent(ClusterEvent event) {
+    events.computeIfAbsent(event.getType(), type -> new ArrayList<>()).add(event);
+    if (event.getType() == expectedType) {
+      eventLatch.countDown();
+    }
+  }
+
+  public void setExpectedType(ClusterEvent.EventType expectedType) {
+    this.expectedType = expectedType;
+    eventLatch = new CountDownLatch(1);
+  }
+
+  public void waitForExpectedEvent(int timeoutSeconds) throws InterruptedException {
+    boolean await = eventLatch.await(timeoutSeconds, TimeUnit.SECONDS);
+    if (!await) {
+      Assert.fail("Timed out waiting for expected event " + expectedType);
+    }
+  }
+
+  @Override
+  public void start() throws Exception {
+
+  }
+
+  @Override
+  public boolean isRunning() {
+    return false;
+  }
+
+  @Override
+  public void stop() {
+
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
new file mode 100644
index 0000000..ea6c5f5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ClusterProperties;
+import org.apache.solr.common.util.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
+/**
+ *
+ */
+public class ClusterEventProducerTest extends SolrCloudTestCase {
+
+  private AllEventsListener eventsListener;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(3)
+        .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .configure();
+  }
+
+  @Before
+  public void setUp() throws Exception  {
+    System.setProperty("enable.packages", "true");
+    super.setUp();
+    cluster.deleteAllCollections();
+    eventsListener = new AllEventsListener();
+    cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
+  }
+
+  @After
+  public void teardown() {
+    System.clearProperty("enable.packages");
+    if (eventsListener != null) {
+      cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
+    }
+  }
+
+  @Test
+  public void testEvents() throws Exception {
+
+    // NODES_DOWN
+
+    eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
+
+    // don't kill Overseer
+    JettySolrRunner nonOverseerJetty = null;
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
+        continue;
+      }
+      nonOverseerJetty = jetty;
+      break;
+    }
+    String nodeName = nonOverseerJetty.getNodeName();
+    cluster.stopJettySolrRunner(nonOverseerJetty);
+    cluster.waitForJettyToStop(nonOverseerJetty);
+    eventsListener.waitForExpectedEvent(10);
+    assertNotNull("should be NODES_DOWN events", eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN));
+    List<ClusterEvent> events = eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN);
+    assertEquals("should be one NODES_DOWN event", 1, events.size());
+    ClusterEvent event = events.get(0);
+    assertEquals("should be NODES_DOWN event type", ClusterEvent.EventType.NODES_DOWN, event.getType());
+    NodesDownEvent nodesDown = (NodesDownEvent) event;
+    assertEquals("should be node " + nodeName, nodeName, nodesDown.getNodeNames().next());
+
+    // NODES_UP
+    eventsListener.setExpectedType(ClusterEvent.EventType.NODES_UP);
+    JettySolrRunner newNode = cluster.startJettySolrRunner();
+    cluster.waitForNode(newNode, 60);
+    eventsListener.waitForExpectedEvent(10);
+    assertNotNull("should be NODES_UP events", eventsListener.events.get(ClusterEvent.EventType.NODES_UP));
+    events = eventsListener.events.get(ClusterEvent.EventType.NODES_UP);
+    assertEquals("should be one NODES_UP event", 1, events.size());
+    event = events.get(0);
+    assertEquals("should be NODES_UP event type", ClusterEvent.EventType.NODES_UP, event.getType());
+    NodesUpEvent nodesUp = (NodesUpEvent) event;
+    assertEquals("should be node " + newNode.getNodeName(), newNode.getNodeName(), nodesUp.getNodeNames().next());
+
+    // COLLECTIONS_ADDED
+    eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
+    String collection = "testNodesEvent_collection";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+    cluster.getSolrClient().request(create);
+    cluster.waitForActiveCollection(collection, 1, 1);
+    eventsListener.waitForExpectedEvent(10);
+    assertNotNull("should be COLLECTIONS_ADDED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED));
+    events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED);
+    assertEquals("should be one COLLECTIONS_ADDED event", 1, events.size());
+    event = events.get(0);
+    assertEquals("should be COLLECTIONS_ADDED event type", ClusterEvent.EventType.COLLECTIONS_ADDED, event.getType());
+    CollectionsAddedEvent collectionsAdded = (CollectionsAddedEvent) event;
+    assertEquals("should be collection " + collection, collection, collectionsAdded.getCollectionNames().next());
+
+    // COLLECTIONS_REMOVED
+    eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_REMOVED);
+    CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+    cluster.getSolrClient().request(delete);
+    eventsListener.waitForExpectedEvent(10);
+    assertNotNull("should be COLLECTIONS_REMOVED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED));
+    events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED);
+    assertEquals("should be one COLLECTIONS_REMOVED event", 1, events.size());
+    event = events.get(0);
+    assertEquals("should be COLLECTIONS_REMOVED event type", ClusterEvent.EventType.COLLECTIONS_REMOVED, event.getType());
+    CollectionsRemovedEvent collectionsRemoved = (CollectionsRemovedEvent) event;
+    assertEquals("should be collection " + collection, collection, collectionsRemoved.getCollectionNames().next());
+
+    // CLUSTER_CONFIG_CHANGED
+    eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+    ClusterProperties clusterProperties = new ClusterProperties(cluster.getZkClient());
+    Map<String, Object> oldProps = new HashMap<>(clusterProperties.getClusterProperties());
+    clusterProperties.setClusterProperty("ext.foo", "bar");
+    eventsListener.waitForExpectedEvent(10);
+    assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
+    events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+    assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
+    event = events.get(0);
+    assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
+    ClusterPropertiesChangedEvent propertiesChanged = (ClusterPropertiesChangedEvent) event;
+    Map<String, Object> newProps = propertiesChanged.getNewClusterProperties();
+    assertEquals("new properties wrong value of the 'ext.foo' property: " + newProps,
+        "bar", newProps.get("ext.foo"));
+
+    // unset the property
+    eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+    clusterProperties.setClusterProperty("ext.foo", null);
+    eventsListener.waitForExpectedEvent(10);
+    assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
+    events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+    assertEquals("should be two CLUSTER_CONFIG_CHANGED events", 2, events.size());
+    event = events.get(1);
+    assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
+    propertiesChanged = (ClusterPropertiesChangedEvent) event;
+    assertEquals("new properties should not have 'ext.foo' property: " + propertiesChanged.getNewClusterProperties(),
+        null, propertiesChanged.getNewClusterProperties().get("ext.foo"));
+
+  }
+
+  private static CountDownLatch dummyEventLatch = new CountDownLatch(1);
+  private static ClusterEvent lastEvent = null;
+
+  public static class DummyEventListener implements ClusterEventListener {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    boolean running = false;
+    @Override
+    public void onEvent(ClusterEvent event) {
+      if (!running) {
+        log.debug("skipped event, not running: {}", event);
+        return;
+      }
+      if (event.getType() == ClusterEvent.EventType.COLLECTIONS_ADDED ||
+          event.getType() == ClusterEvent.EventType.COLLECTIONS_REMOVED) {
+        log.debug("recorded event {}", Utils.toJSONString(event));
+        lastEvent = event;
+        dummyEventLatch.countDown();
+      } else {
+        log.debug("skipped event, wrong type: {}", event.getType());
+      }
+    }
+
+    @Override
+    public String getName() {
+      return "dummy";
+    }
+
+    @Override
+    public void start() throws Exception {
+      log.debug("starting {}", Integer.toHexString(hashCode()));
+      running = true;
+    }
+
+    @Override
+    public boolean isRunning() {
+      return running;
+    }
+
+    @Override
+    public void stop() {
+      log.debug("stopping {}", Integer.toHexString(hashCode()));
+      running = false;
+    }
+  }
+
+  @Test
+  public void testListenerPlugins() throws Exception {
+    PluginMeta plugin = new PluginMeta();
+    plugin.name = "testplugin";
+    plugin.klass = DummyEventListener.class.getName();
+    V2Request req = new V2Request.Builder("/cluster/plugin")
+        .forceV2(true)
+        .withMethod(POST)
+        .withPayload(singletonMap("add", plugin))
+        .build();
+    V2Response rsp = req.process(cluster.getSolrClient());
+    //just check if the plugin is indeed registered
+    V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+        .forceV2(true)
+        .withMethod(GET)
+        .build();
+    rsp = readPluginState.process(cluster.getSolrClient());
+    assertEquals(DummyEventListener.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
+
+    String collection = "testListenerPlugins_collection";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+    cluster.getSolrClient().request(create);
+    cluster.waitForActiveCollection(collection, 1, 1);
+    boolean await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+    if (!await) {
+      fail("Timed out waiting for COLLECTIONS_ADDED event, " + collection);
+    }
+    assertNotNull("lastEvent should be COLLECTIONS_ADDED", lastEvent);
+    assertEquals("lastEvent should be COLLECTIONS_ADDED", ClusterEvent.EventType.COLLECTIONS_ADDED, lastEvent.getType());
+    // verify timestamp
+    Instant now = Instant.now();
+    assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+    assertEquals(collection, ((CollectionsAddedEvent)lastEvent).getCollectionNames().next());
+
+    dummyEventLatch = new CountDownLatch(1);
+    lastEvent = null;
+
+    CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+    cluster.getSolrClient().request(delete);
+    await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+    if (!await) {
+      fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+    }
+    assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+    assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
+    // verify timestamp
+    now = Instant.now();
+    assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+    assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
new file mode 100644
index 0000000..a764051
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events.impl;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.events.AllEventsListener;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.core.CoreContainer;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
+
+  public static class CollectionsRepairWrapperListener implements ClusterEventListener {
+    final CollectionsRepairEventListener delegate;
+
+    CountDownLatch completed = new CountDownLatch(1);
+
+    CollectionsRepairWrapperListener(CoreContainer cc) throws Exception {
+      delegate = new CollectionsRepairEventListener(cc);
+    }
+
+    @Override
+    public void onEvent(ClusterEvent event) {
+      delegate.onEvent(event);
+      completed.countDown();
+    }
+
+    @Override
+    public String getName() {
+      return "wrapperListener";
+    }
+
+    @Override
+    public void start() throws Exception {
+      delegate.start();
+    }
+
+    @Override
+    public boolean isRunning() {
+      return delegate.isRunning();
+    }
+
+    @Override
+    public void stop() {
+      delegate.stop();
+    }
+  }
+
+  private static AllEventsListener eventsListener = new AllEventsListener();
+  private static CollectionsRepairWrapperListener repairListener;
+
+  private static int NUM_NODES = 3;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(NUM_NODES)
+        .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .configure();
+    CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
+    cc.getClusterEventProducer()
+        .registerListener(eventsListener, ClusterEvent.EventType.values());
+    repairListener = new CollectionsRepairWrapperListener(cc);
+    cc.getClusterEventProducer()
+        .registerListener(repairListener, ClusterEvent.EventType.NODES_DOWN);
+    repairListener.start();
+  }
+
+  @Before
+  public void setUp() throws Exception  {
+    super.setUp();
+    cluster.deleteAllCollections();
+  }
+
+  @Test
+  public void testCollectionRepair() throws Exception {
+    eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
+    String collection = "testCollectionRepair_collection";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 3);
+    cluster.getSolrClient().request(create);
+    cluster.waitForActiveCollection(collection, 1, 3);
+    eventsListener.waitForExpectedEvent(10);
+    eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
+
+    // don't kill Overseer
+    JettySolrRunner nonOverseerJetty = null;
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
+        continue;
+      }
+      nonOverseerJetty = jetty;
+      break;
+    }
+    String nodeName = nonOverseerJetty.getNodeName();
+    cluster.stopJettySolrRunner(nonOverseerJetty);
+    cluster.waitForJettyToStop(nonOverseerJetty);
+    eventsListener.waitForExpectedEvent(10);
+    cluster.waitForActiveCollection(collection, 1, 2);
+
+    // wait for completed processing in the repair listener
+    boolean await = repairListener.completed.await(60, TimeUnit.SECONDS);
+    if (!await) {
+      fail("Timeout waiting for the processing to complete");
+    }
+    cluster.waitForActiveCollection(collection, 1, 3);
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 4c37c17..bd9bf7c 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -92,8 +92,8 @@
       expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
 
       //test with an invalid class
-      plugin.klass = C1.class.getName();
-      expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
+//      plugin.klass = C1.class.getName();
+//      expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
 
       //test with a valid class. This should succeed now
       plugin.klass = C3.class.getName();