Merge branch 'master' into jira/solr-14749
diff --git a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
index fd77413..1558f1c 100644
--- a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
+++ b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
@@ -85,9 +85,18 @@
}
public static List<Api> getApis(Object obj) {
- return getApis(obj.getClass(), obj);
+ return getApis(obj.getClass(), obj, true);
}
- public static List<Api> getApis(Class<? extends Object> theClass , Object obj) {
+
+ /**
+ * Get a list of Api-s supported by this class.
+ * @param theClass class
+ * @param obj object of this class (may be null)
+ * @param required if true then an exception is thrown if no Api-s can be retrieved, if false
+ * then absence of Api-s is silently ignored.
+ * @return list of discovered Api-s
+ */
+ public static List<Api> getApis(Class<? extends Object> theClass , Object obj, boolean required) {
Class<?> klas = null;
try {
klas = MethodHandles.publicLookup().accessClass(theClass);
@@ -122,7 +131,7 @@
SpecProvider specProvider = readSpec(endPoint, Collections.singletonList(m));
apis.add(new AnnotatedApi(specProvider, endPoint, Collections.singletonMap("", cmd), null));
}
- if (apis.isEmpty()) {
+ if (required && apis.isEmpty()) {
throw new RuntimeException("Invalid Class : " + klas.getName() + " No @EndPoints");
}
diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
index 93de2e3..ec3e4e3 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
@@ -33,6 +33,9 @@
import org.apache.lucene.util.ResourceLoaderAware;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.JsonProperty;
@@ -68,6 +71,7 @@
refresh();
return false;
}
+
public CustomContainerPlugins(CoreContainer coreContainer, ApiBag apiBag) {
this.coreContainer = coreContainer;
this.containerApiBag = apiBag;
@@ -78,6 +82,10 @@
currentPlugins.forEach(ew.getBiConsumer());
}
+ public synchronized ApiInfo getPlugin(String name) {
+ return currentPlugins.get(name);
+ }
+
public synchronized void refresh() {
Map<String, Object> pluginInfos = null;
try {
@@ -107,6 +115,7 @@
if (e.getValue() == Diff.REMOVED) {
ApiInfo apiInfo = currentPlugins.remove(e.getKey());
if (apiInfo == null) continue;
+ handleClusterSingleton(null, apiInfo);
for (ApiHolder holder : apiInfo.holders) {
Api old = containerApiBag.unregister(holder.api.getEndPoint().method()[0],
getActualPath(apiInfo, holder.api.getEndPoint().path()[0]));
@@ -136,6 +145,7 @@
containerApiBag.register(holder, getTemplateVars(apiInfo.info));
}
currentPlugins.put(e.getKey(), apiInfo);
+ handleClusterSingleton(apiInfo, null);
} else {
//this plugin is being updated
ApiInfo old = currentPlugins.put(e.getKey(), apiInfo);
@@ -143,6 +153,7 @@
//register all new paths
containerApiBag.register(holder, getTemplateVars(apiInfo.info));
}
+ handleClusterSingleton(apiInfo, old);
if (old != null) {
//this is an update of the plugin. But, it is possible that
// some paths are remved in the newer version of the plugin
@@ -163,6 +174,47 @@
}
}
+ private void handleClusterSingleton(ApiInfo newApiInfo, ApiInfo oldApiInfo) {
+ if (newApiInfo != null) {
+ // register new api
+ Object instance = newApiInfo.getInstance();
+ if (instance instanceof ClusterSingleton) {
+ ClusterSingleton singleton = (ClusterSingleton) instance;
+ coreContainer.getClusterSingletons().getSingletons().put(singleton.getName(), singleton);
+ // easy check to see if we should immediately start this singleton
+ if (coreContainer.getClusterEventProducer() != null &&
+ coreContainer.getClusterEventProducer().isRunning()) {
+ try {
+ singleton.start();
+ } catch (Exception exc) {
+ log.warn("Exception starting ClusterSingleton {}: {}", newApiInfo, exc);
+ }
+ }
+ }
+ if (instance instanceof ClusterEventListener) {
+ // XXX nocommit obtain a list of supported event types from the config
+ ClusterEvent.EventType[] types = ClusterEvent.EventType.values();
+ try {
+ coreContainer.getClusterEventProducer().registerListener((ClusterEventListener) instance, types);
+ } catch (Exception exc) {
+ log.warn("Exception adding ClusterEventListener {}: {}", newApiInfo, exc);
+ }
+ }
+ }
+ if (oldApiInfo != null) {
+ // stop & unregister the old api
+ Object instance = oldApiInfo.getInstance();
+ if (instance instanceof ClusterSingleton) {
+ ClusterSingleton singleton = (ClusterSingleton) instance;
+ singleton.stop();
+ coreContainer.getClusterSingletons().getSingletons().remove(singleton.getName());
+ }
+ if (instance instanceof ClusterEventListener) {
+ coreContainer.getClusterEventProducer().unregisterListener((ClusterEventListener) instance);
+ }
+ }
+ }
+
private static String getActualPath(ApiInfo apiInfo, String path) {
path = path.replaceAll("\\$path-prefix", apiInfo.info.pathPrefix);
path = path.replaceAll("\\$plugin-name", apiInfo.info.name);
@@ -222,6 +274,9 @@
return null;
}
+ public Object getInstance() {
+ return instance;
+ }
@SuppressWarnings({"unchecked","rawtypes"})
public ApiInfo(PluginMeta info, List<String> errs) {
@@ -268,14 +323,14 @@
}
try {
- List<Api> apis = AnnotatedApi.getApis(klas, null);
+ List<Api> apis = AnnotatedApi.getApis(klas, null, false);
for (Object api : apis) {
EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
if (endPoint.path().length > 1 || endPoint.method().length > 1) {
errs.add("Only one HTTP method and url supported for each API");
}
if (endPoint.method().length != 1 || endPoint.path().length != 1) {
- errs.add("The @EndPint must have exactly one method and path attributes");
+ errs.add("The @EndPoint must have exactly one method and path attributes");
}
List<String> pathSegments = StrUtils.splitSmart(endPoint.path()[0], '/', true);
PathTrie.replaceTemplates(pathSegments, getTemplateVars(info));
@@ -320,7 +375,7 @@
}
}
this.holders = new ArrayList<>();
- for (Api api : AnnotatedApi.getApis(instance)) {
+ for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, false)) {
holders.add(new ApiHolder((AnnotatedApi) api));
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
new file mode 100644
index 0000000..1ae1eed
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ClusterSingleton.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+/**
+ * Intended for {@link org.apache.solr.core.CoreContainer} plugins that should be
+ * enabled only one instance per cluster.
+ * <p>Components that implement this interface are always in one of two states:
+ * <ul>
+ * <li>STOPPED - the default state. The component is idle and does not perform
+ * any functions. It should also avoid holding any resources.</li>
+ * <li>RUNNING - the component is active.</li>
+ * </ul>
+ * <p>Components must be prepared to change these states multiple times in their
+ * life-cycle.</p>
+ * <p>Implementation detail: currently these plugins are instantiated on all nodes
+ * but they are started only on the Overseer leader, and stopped when the current
+ * node loses its leadership.</p>
+ */
+public interface ClusterSingleton {
+
+ /**
+ * Unique name of this singleton. Used for registration.
+ */
+ String getName();
+
+ /**
+ * Start the operation of the component. On return the component is assumed
+ * to be in the RUNNING state.
+ * @throws Exception on startup errors. The component should revert to the
+ * STOPPED state.
+ */
+ void start() throws Exception;
+
+ /**
+ * Returns true if the component is in the RUNNING state, false otherwise.
+ */
+ boolean isRunning();
+
+ /**
+ * Stop the operation of the component. On return the component is assumed
+ * to be in the STOPPED state. Components should also avoid holding any resources
+ * in the STOPPED state.
+ */
+ void stop();
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index bb405ad..2465f8a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
@@ -47,6 +49,7 @@
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
@@ -656,6 +659,8 @@
}
});
+ startClusterSingletons();
+
assert ObjectReleaseTracker.track(this);
}
@@ -775,6 +780,59 @@
}
}
+ /**
+ * Start {@link ClusterSingleton} plugins when we become the leader.
+ */
+ public void startClusterSingletons() {
+ CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
+ final Runnable initializer = () -> {
+ if (isClosed()) {
+ return;
+ }
+ try {
+ singletons.waitUntilReady(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted initialization of ClusterSingleton-s");
+ return;
+ } catch (TimeoutException te) {
+ log.warn("Timed out during initialization of ClusterSingleton-s");
+ return;
+ }
+ singletons.getSingletons().forEach((name, singleton) -> {
+ try {
+ singleton.start();
+ if (singleton instanceof ClusterEventListener) {
+ getCoreContainer().getClusterEventProducer().registerListener((ClusterEventListener) singleton);
+ }
+ } catch (Exception e) {
+ log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+ }
+ });
+ };
+ if (singletons.isReady()) {
+ // wait until all singleton-s are ready for the first startup
+ getCoreContainer().runAsync(initializer);
+ } else {
+ initializer.run();
+ }
+ }
+
+ /**
+ * Stop {@link ClusterSingleton} plugins when we lose leadership.
+ */
+ private void stopClusterSingletons() {
+ CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
+ if (singletons == null) {
+ return;
+ }
+ singletons.getSingletons().forEach((name, singleton) -> {
+ if (singleton instanceof ClusterEventListener) {
+ getCoreContainer().getClusterEventProducer().unregisterListener((ClusterEventListener) singleton);
+ }
+ singleton.stop();
+ });
+ }
+
public Stats getStats() {
return stats;
}
@@ -814,9 +872,14 @@
if (this.id != null) {
log.info("Overseer (id={}) closing", id);
}
+ // stop singletons only on the leader
+ if (!this.closed) {
+ stopClusterSingletons();
+ }
this.closed = true;
doClose();
+
assert ObjectReleaseTracker.release(this);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
new file mode 100644
index 0000000..2dc7a32
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.common.MapWriter;
+
+import java.io.IOException;
+import java.time.Instant;
+
+/**
+ * Cluster-level event.
+ */
+public interface ClusterEvent extends MapWriter {
+
+ enum EventType {
+ /** One or more nodes went down. */
+ NODES_DOWN,
+ /** One or more nodes went up. */
+ NODES_UP,
+ /** One or more collections have been added. */
+ COLLECTIONS_ADDED,
+ /** One or more collections have been removed. */
+ COLLECTIONS_REMOVED,
+ /** One or more replicas went down. */
+ REPLICAS_DOWN,
+ /** Cluster properties have changed. */
+ CLUSTER_PROPERTIES_CHANGED
+ // other types? eg. Overseer leader change, shard leader change,
+ // node overload (eg. CPU / MEM circuit breakers tripped)?
+ }
+
+ /** Get event type. */
+ EventType getType();
+
+ /** Get event timestamp. This is the instant when the event was generated (not necessarily when
+ * the underlying condition first occurred). */
+ Instant getTimestamp();
+
+ default void writeMap(EntryWriter ew) throws IOException {
+ ew.put("type", getType());
+ ew.put("timestamp", getTimestamp().toEpochMilli());
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
new file mode 100644
index 0000000..592f118
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.cloud.ClusterSingleton;
+
+/**
+ * Components that want to be notified of cluster-wide events should use this.
+ *
+ * XXX should this work only for ClusterSingleton-s? some types of events may be
+ * XXX difficult (or pointless) to propagate to every node.
+ */
+public interface ClusterEventListener extends ClusterSingleton {
+
+ /**
+ * Handle the event. Implementations should be non-blocking - if any long
+ * processing is needed it should be performed asynchronously.
+ * @param event cluster event
+ */
+ void onEvent(ClusterEvent event);
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
new file mode 100644
index 0000000..1c2327b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.cloud.ClusterSingleton;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Component that produces {@link ClusterEvent} instances.
+ */
+public interface ClusterEventProducer extends ClusterSingleton {
+
+ String PLUGIN_NAME = "clusterEventProducer";
+
+ default String getName() {
+ return PLUGIN_NAME;
+ }
+
+ /**
+ * Returns a modifiable map of event types and listeners to process events
+ * of a given type.
+ */
+ Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners();
+
+ /**
+ * Register an event listener for processing the specified event types.
+ * @param listener non-null listener. If the same instance of the listener is
+ * already registered it will be ignored.
+ * @param eventTypes non-empty array of event types that this listener
+ * is being registered for. If this is null or empty then all types will be used.
+ */
+ default void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) throws Exception {
+ Objects.requireNonNull(listener);
+ if (eventTypes == null || eventTypes.length == 0) {
+ eventTypes = ClusterEvent.EventType.values();
+ }
+ for (ClusterEvent.EventType type : eventTypes) {
+ Set<ClusterEventListener> perType = getEventListeners().computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet());
+ perType.add(listener);
+ }
+ }
+
+ /**
+ * Unregister an event listener.
+ * @param listener non-null listener.
+ */
+ default void unregisterListener(ClusterEventListener listener) {
+ Objects.requireNonNull(listener);
+ getEventListeners().forEach((type, listeners) -> {
+ listeners.remove(listener);
+ });
+ }
+
+ /**
+ * Unregister an event listener for specified event types.
+ * @param listener non-null listener.
+ * @param eventTypes event types from which the listener will be unregistered. If this
+ * is null or empty then all event types will be used
+ */
+ default void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ Objects.requireNonNull(listener);
+ if (eventTypes == null || eventTypes.length == 0) {
+ eventTypes = ClusterEvent.EventType.values();
+ }
+ for (ClusterEvent.EventType type : eventTypes) {
+ getEventListeners()
+ .getOrDefault(type, Collections.emptySet())
+ .remove(listener);
+ }
+ }
+
+ /**
+ * Fire an event. This method will call registered listeners that subscribed to the
+ * type of event being passed.
+ * @param event cluster event
+ */
+ default void fireEvent(ClusterEvent event) {
+ getEventListeners().getOrDefault(event.getType(), Collections.emptySet())
+ .forEach(listener -> listener.onEvent(event));
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
new file mode 100644
index 0000000..ad9c0b8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Event generated when {@link org.apache.solr.common.cloud.ZkStateReader#CLUSTER_PROPS} is modified.
+ */
+public interface ClusterPropertiesChangedEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.CLUSTER_PROPERTIES_CHANGED;
+ }
+
+ Map<String, Object> getNewClusterProperties();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("newClusterProperties", getNewClusterProperties());
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
new file mode 100644
index 0000000..78046f8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some collections have been added.
+ */
+public interface CollectionsAddedEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.COLLECTIONS_ADDED;
+ }
+
+ Iterator<String> getCollectionNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("collectionNames", getCollectionNames());
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
new file mode 100644
index 0000000..a93be4c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some collections have been removed.
+ */
+public interface CollectionsRemovedEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.COLLECTIONS_REMOVED;
+ }
+
+ Iterator<String> getCollectionNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("collectionNames", getCollectionNames());
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
new file mode 100644
index 0000000..5001ccb
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some nodes went down.
+ */
+public interface NodesDownEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.NODES_DOWN;
+ }
+
+ Iterator<String> getNodeNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("nodeNames", getNodeNames());
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
new file mode 100644
index 0000000..fa08f85
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some nodes went up.
+ */
+public interface NodesUpEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.NODES_UP;
+ }
+
+ Iterator<String> getNodeNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("nodeNames", getNodeNames());
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
new file mode 100644
index 0000000..1d3ce9b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.common.cloud.Replica;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Event generated when some replicas went down.
+ */
+public interface ReplicasDownEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.REPLICAS_DOWN;
+ }
+
+ Iterator<Replica> getReplicas();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("replicas", getReplicas());
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
new file mode 100644
index 0000000..034fa8a
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cluster.events.CollectionsAddedEvent;
+import org.apache.solr.cluster.events.CollectionsRemovedEvent;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.NodesUpEvent;
+import org.apache.solr.common.cloud.CloudCollectionsListener;
+import org.apache.solr.common.cloud.ClusterPropertiesListener;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ClusterEventProducer}.
+ * <h3>Implementation notes</h3>
+ * <p>For each cluster event relevant listeners are always invoked sequentially
+ * (not in parallel) and in arbitrary order. This means that if any listener blocks the
+ * processing other listeners may be invoked much later or not at all.</p>
+ */
+public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSingleton {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
+ private CoreContainer coreContainer;
+ private LiveNodesListener liveNodesListener;
+ private CloudCollectionsListener cloudCollectionsListener;
+ private ClusterPropertiesListener clusterPropertiesListener;
+ private ZkController zkController;
+ private volatile boolean running;
+
+ private final Set<ClusterEvent.EventType> supportedEvents =
+ new HashSet<>(Arrays.asList(
+ ClusterEvent.EventType.NODES_DOWN,
+ ClusterEvent.EventType.NODES_UP,
+ ClusterEvent.EventType.COLLECTIONS_ADDED,
+ ClusterEvent.EventType.COLLECTIONS_REMOVED,
+ ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
+ ));
+
+ public ClusterEventProducerImpl(CoreContainer coreContainer) {
+ this.coreContainer = coreContainer;
+ }
+
+ // ClusterSingleton lifecycle methods
+ @Override
+ public void start() {
+ if (coreContainer == null) {
+ liveNodesListener = null;
+ cloudCollectionsListener = null;
+ clusterPropertiesListener = null;
+ return;
+ }
+ this.zkController = this.coreContainer.getZkController();
+
+ // clean up any previous instances
+ doStop();
+
+ // register liveNodesListener
+ liveNodesListener = (oldNodes, newNodes) -> {
+ // already closed but still registered
+ if (!running) {
+ // remove the listener
+ return true;
+ }
+ // spurious event, ignore but keep listening
+ if (oldNodes.equals(newNodes)) {
+ return false;
+ }
+ final Instant now = Instant.now();
+ final Set<String> downNodes = new HashSet<>(oldNodes);
+ downNodes.removeAll(newNodes);
+ if (!downNodes.isEmpty()) {
+ fireEvent(new NodesDownEvent() {
+ @Override
+ public Iterator<String> getNodeNames() {
+ return downNodes.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ final Set<String> upNodes = new HashSet<>(newNodes);
+ upNodes.removeAll(oldNodes);
+ if (!upNodes.isEmpty()) {
+ fireEvent(new NodesUpEvent() {
+ @Override
+ public Iterator<String> getNodeNames() {
+ return upNodes.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ return false;
+ };
+ zkController.zkStateReader.registerLiveNodesListener(liveNodesListener);
+
+ cloudCollectionsListener = ((oldCollections, newCollections) -> {
+ if (oldCollections.equals(newCollections)) {
+ return;
+ }
+ final Instant now = Instant.now();
+ final Set<String> removed = new HashSet<>(oldCollections);
+ removed.removeAll(newCollections);
+ if (!removed.isEmpty()) {
+ fireEvent(new CollectionsRemovedEvent() {
+ @Override
+ public Iterator<String> getCollectionNames() {
+ return removed.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ final Set<String> added = new HashSet<>(newCollections);
+ added.removeAll(oldCollections);
+ if (!added.isEmpty()) {
+ fireEvent(new CollectionsAddedEvent() {
+ @Override
+ public Iterator<String> getCollectionNames() {
+ return added.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ });
+ zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
+
+ clusterPropertiesListener = (newProperties) -> {
+ fireEvent(new ClusterPropertiesChangedEvent() {
+ final Instant now = Instant.now();
+ @Override
+ public Map<String, Object> getNewClusterProperties() {
+ return newProperties;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ return false;
+ };
+ zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
+
+ // XXX register collection state listener?
+ // XXX not sure how to efficiently monitor for REPLICA_DOWN events
+
+ running = true;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public void stop() {
+ doStop();
+ running = false;
+ }
+
+ private void doStop() {
+ if (liveNodesListener != null) {
+ zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
+ }
+ if (cloudCollectionsListener != null) {
+ zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
+ }
+ if (clusterPropertiesListener != null) {
+ zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
+ }
+ liveNodesListener = null;
+ cloudCollectionsListener = null;
+ clusterPropertiesListener = null;
+ }
+
+ @Override
+ public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) throws Exception {
+ try {
+ for (ClusterEvent.EventType type : eventTypes) {
+ if (!supportedEvents.contains(type)) {
+ log.warn("event type {} not supported yet.", type);
+ }
+ }
+ } catch (Throwable e) {
+ throw new Exception(e);
+ }
+ ClusterEventProducer.super.registerListener(listener, eventTypes);
+ }
+
+ @Override
+ public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+ return listeners;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
new file mode 100644
index 0000000..42dcde3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.ReplicasDownEvent;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an illustration how to re-implement the combination of 8x
+ * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replication factor.
+ * <p>NOTE: there's no support for 'waitFor' yet.</p>
+ * <p>NOTE 2: this functionality would be probably more reliable when executed also as a
+ * periodically scheduled check - both as a reactive (listener) and proactive (scheduled) measure.</p>
+ */
+public class CollectionsRepairEventListener implements ClusterEventListener {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String PLUGIN_NAME = "collectionsRepairListener";
+ private static final String ASYNC_ID_PREFIX = "_async_" + PLUGIN_NAME;
+ private static final AtomicInteger counter = new AtomicInteger();
+
+ private final SolrClient solrClient;
+ private final SolrCloudManager solrCloudManager;
+
+ private volatile boolean running = false;
+
+ public CollectionsRepairEventListener(CoreContainer cc) {
+ this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
+ this.solrCloudManager = cc.getZkController().getSolrCloudManager();
+ }
+
+ @Override
+ public String getName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ public void onEvent(ClusterEvent event) {
+ if (!isRunning()) {
+ // ignore the event
+ return;
+ }
+ switch (event.getType()) {
+ case NODES_DOWN:
+ handleNodesDown((NodesDownEvent) event);
+ break;
+ case REPLICAS_DOWN:
+ handleReplicasDown((ReplicasDownEvent) event);
+ break;
+ default:
+ log.warn("Unsupported event {}, ignoring...", event);
+ }
+ }
+
+ private void handleNodesDown(NodesDownEvent event) {
+ // collect all lost replicas
+ // collection / positions
+ Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
+ try {
+ ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
+ Set<String> lostNodeNames = new HashSet<>();
+ event.getNodeNames().forEachRemaining(lostNodeNames::add);
+ clusterState.forEachCollection(coll -> {
+ // shard / type / count
+ Map<String, Map<Replica.Type, AtomicInteger>> lostReplicas = new HashMap<>();
+ coll.forEachReplica((shard, replica) -> {
+ if (lostNodeNames.contains(replica.getNodeName())) {
+ lostReplicas.computeIfAbsent(shard, s -> new HashMap<>())
+ .computeIfAbsent(replica.type, t -> new AtomicInteger())
+ .incrementAndGet();
+ }
+ });
+ Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(solrCloudManager, clusterState, coll);
+ lostReplicas.forEach((shard, types) -> {
+ Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder()
+ .forCollection(coll.getName())
+ .forShard(Collections.singletonList(shard));
+ types.forEach((type, count) -> {
+ switch (type) {
+ case NRT:
+ assignRequestBuilder.assignNrtReplicas(count.get());
+ break;
+ case PULL:
+ assignRequestBuilder.assignPullReplicas(count.get());
+ break;
+ case TLOG:
+ assignRequestBuilder.assignTlogReplicas(count.get());
+ break;
+ }
+ });
+ Assign.AssignRequest assignRequest = assignRequestBuilder.build();
+ try {
+ List<ReplicaPosition> positions = assignStrategy.assign(solrCloudManager, assignRequest);
+ newPositions.put(coll.getName(), positions);
+ } catch (Exception e) {
+ log.warn("Exception computing positions for {}/{}: {}", coll.getName(), shard, e);
+ return;
+ }
+ });
+ });
+ } catch (IOException e) {
+ log.warn("Exception getting cluster state", e);
+ return;
+ }
+
+ // send ADDREPLICA admin requests for each lost replica
+ // XXX should we use 'async' for that, to avoid blocking here?
+ List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
+ newPositions.forEach((collection, positions) -> {
+ positions.forEach(position -> {
+ CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest
+ .addReplicaToShard(collection, position.shard, position.type);
+ addReplica.setNode(position.node);
+ addReplica.setAsyncId(ASYNC_ID_PREFIX + counter.incrementAndGet());
+ addReplicas.add(addReplica);
+ });
+ });
+ addReplicas.forEach(addReplica -> {
+ try {
+ solrClient.request(addReplica);
+ } catch (Exception e) {
+ log.warn("Exception calling ADDREPLICA {}: {}", addReplica.getParams().toQueryString(), e);
+ }
+ });
+
+ // ... and DELETERPLICA for lost ones?
+ }
+
+ private void handleReplicasDown(ReplicasDownEvent event) {
+ // compute new placements for all replicas that went down
+ // send ADDREPLICA admin request for each lost replica
+ }
+
+ @Override
+ public void start() throws Exception {
+ running = true;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public void stop() {
+ running = false;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
new file mode 100644
index 0000000..2c115b6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Default implementation of {@link org.apache.solr.cluster.events.ClusterEventProducer}.
+ */
+package org.apache.solr.cluster.events.impl;
+
+
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/package-info.java b/solr/core/src/java/org/apache/solr/cluster/events/package-info.java
new file mode 100644
index 0000000..a334a00
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Interfaces defining support for cluster-level event generation and processing.
+ */
+package org.apache.solr.cluster.events;
+
+
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index bcc2039..1427dad 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -39,9 +39,12 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
@@ -68,8 +71,13 @@
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.impl.ClusterEventProducerImpl;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -168,6 +176,72 @@
}
}
+ public static class ClusterSingletons {
+ private Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
+ // we use this latch to delay the initial startup of singletons, due to
+ // the leader election occurring in parallel with the rest of the load() method.
+ private CountDownLatch readyLatch = new CountDownLatch(1);
+
+ public Map<String, ClusterSingleton> getSingletons() {
+ return singletonMap;
+ }
+
+ public boolean isReady() {
+ return readyLatch.getCount() > 0;
+ }
+
+ public void setReady() {
+ readyLatch.countDown();
+ }
+
+ public void waitUntilReady(long timeout, TimeUnit timeUnit)
+ throws InterruptedException, TimeoutException {
+ boolean await = readyLatch.await(timeout, timeUnit);
+ if (!await) {
+ throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
+ }
+ }
+ }
+
+ /**
+ * This class helps in handling the initial registration of plugin-based listeners,
+ * when both the final {@link ClusterEventProducer} implementation and listeners
+ * are configured using plugins.
+ */
+ public static class InitialClusterEventProducer implements ClusterEventProducer {
+ Map<ClusterEvent.EventType, Set<ClusterEventListener>> initialListeners = new HashMap<>();
+
+ @Override
+ public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+ return initialListeners;
+ }
+
+ public void transferListeners(ClusterEventProducer target) {
+ initialListeners.forEach((type, listeners) -> {
+ listeners.forEach(listener -> {
+ try {
+ target.registerListener(listener, type);
+ } catch (Exception e) {
+ log.warn("Unable to register event listener for type {}: {}", type, e);
+ }
+ });
+ });
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+
private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
/**
@@ -243,8 +317,11 @@
private volatile SolrClientCache solrClientCache;
+ private volatile ClusterEventProducer clusterEventProducer = new InitialClusterEventProducer();
+
private final ObjectCache objectCache = new ObjectCache();
+ private final ClusterSingletons clusterSingletons = new ClusterSingletons();
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@@ -892,7 +969,31 @@
ContainerPluginsApi containerPluginsApi = new ContainerPluginsApi(this);
containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
+
+ // init ClusterSingleton-s
+
+ // register the handlers that are also ClusterSingleton
+ containerHandlers.keySet().forEach(handlerName -> {
+ SolrRequestHandler handler = containerHandlers.get(handlerName);
+ if (handler instanceof ClusterSingleton) {
+ clusterSingletons.singletonMap.put(handlerName, (ClusterSingleton) handler);
+ }
+ });
+ // create the ClusterEventProducer
+ InitialClusterEventProducer initialClusterEventProducer = (InitialClusterEventProducer) clusterEventProducer;
+ CustomContainerPlugins.ApiInfo clusterEventProducerInfo = customContainerPlugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
+ if (clusterEventProducerInfo != null) {
+ clusterEventProducer = (ClusterEventProducer) clusterEventProducerInfo.getInstance();
+ } else {
+ clusterEventProducer = new ClusterEventProducerImpl(this);
+ clusterSingletons.singletonMap.put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
+ }
+ // transfer those listeners that were already registered to the initial impl
+ initialClusterEventProducer.transferListeners(clusterEventProducer);
+
+ clusterSingletons.setReady();
zkSys.getZkController().checkOverseerDesignate();
+
}
// This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
@@ -2090,6 +2191,14 @@
return customContainerPlugins;
}
+ public ClusterSingletons getClusterSingletons() {
+ return clusterSingletons;
+ }
+
+ public ClusterEventProducer getClusterEventProducer() {
+ return clusterEventProducer;
+ }
+
static {
ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
index 0c7a487..ad95423 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
@@ -51,7 +51,7 @@
public class ContainerPluginsApi {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String PLUGIN = "plugin";
+ public static final String PLUGINS = "plugin";
private final Supplier<SolrZkClient> zkClientSupplier;
private final CoreContainer coreContainer;
public final Read readAPI = new Read();
@@ -67,7 +67,7 @@
path = "/cluster/plugin",
permission = PermissionNameProvider.Name.COLL_READ_PERM)
public void list(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
- rsp.add(PLUGIN, plugins(zkClientSupplier));
+ rsp.add(PLUGINS, plugins(zkClientSupplier));
}
}
@@ -151,7 +151,7 @@
SolrZkClient zkClient = zkClientSupplier.get();
try {
Map<String, Object> clusterPropsJson = (Map<String, Object>) Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
- return (Map<String, Object>) clusterPropsJson.computeIfAbsent(PLUGIN, o -> new LinkedHashMap<>());
+ return (Map<String, Object>) clusterPropsJson.computeIfAbsent(PLUGINS, o -> new LinkedHashMap<>());
} catch (KeeperException.NoNodeException e) {
return new LinkedHashMap<>();
} catch (KeeperException | InterruptedException e) {
@@ -165,9 +165,9 @@
zkClientSupplier.get().atomicUpdate(ZkStateReader.CLUSTER_PROPS, bytes -> {
Map rawJson = bytes == null ? new LinkedHashMap() :
(Map) Utils.fromJSON(bytes);
- Map pluginsModified = modifier.apply((Map) rawJson.computeIfAbsent(PLUGIN, o -> new LinkedHashMap<>()));
+ Map pluginsModified = modifier.apply((Map) rawJson.computeIfAbsent(PLUGINS, o -> new LinkedHashMap<>()));
if (pluginsModified == null) return null;
- rawJson.put(PLUGIN, pluginsModified);
+ rawJson.put(PLUGINS, pluginsModified);
return Utils.toJSON(rawJson);
});
} catch (KeeperException | InterruptedException e) {
diff --git a/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java b/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
index 424f604..044cf8c 100644
--- a/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
+++ b/solr/core/src/java/org/apache/solr/packagemanager/PackageManager.java
@@ -52,6 +52,7 @@
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.solr.filestore.DistribPackageStore;
+import org.apache.solr.handler.admin.ContainerPluginsApi;
import org.apache.solr.packagemanager.SolrPackage.Command;
import org.apache.solr.packagemanager.SolrPackage.Manifest;
import org.apache.solr.packagemanager.SolrPackage.Plugin;
@@ -231,7 +232,7 @@
}
}
@SuppressWarnings({"unchecked"})
- Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault("plugin", Collections.emptyMap());
+ Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault(ContainerPluginsApi.PLUGINS, Collections.emptyMap());
for (String key : clusterPlugins.keySet()) {
// Map<String, String> pluginMeta = (Map<String, String>) clusterPlugins.get(key);
PluginMeta pluginMeta;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index b6372a0..f6c07c1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -17,13 +17,7 @@
package org.apache.solr.cloud;
import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -56,6 +50,7 @@
import org.apache.solr.cloud.overseer.NodeMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.cluster.events.impl.ClusterEventProducerImpl;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -117,7 +112,6 @@
private static SolrZkClient zkClient;
-
private volatile boolean testDone = false;
private final List<ZkController> zkControllers = Collections.synchronizedList(new ArrayList<>());
@@ -127,7 +121,6 @@
private final List<HttpShardHandlerFactory> httpShardHandlerFactorys = Collections.synchronizedList(new ArrayList<>());
private final List<UpdateShardHandler> updateShardHandlers = Collections.synchronizedList(new ArrayList<>());
private final List<CloudSolrClient> solrClients = Collections.synchronizedList(new ArrayList<>());
-
private static final String COLLECTION = SolrTestCaseJ4.DEFAULT_TEST_COLLECTION_NAME;
public static class MockZKController{
@@ -306,6 +299,7 @@
@Before
public void setUp() throws Exception {
testDone = false;
+
super.setUp();
}
@@ -322,6 +316,7 @@
}
server = null;
+
}
@After
@@ -1428,11 +1423,24 @@
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone); // Allow retry on session expiry
when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
+ CoreContainer.ClusterSingletons singletons = new CoreContainer.ClusterSingletons();
+ // don't wait for all singletons
+ singletons.setReady();
+ FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"), singletons);
+ ClusterEventProducerImpl clusterEventProducer = new ClusterEventProducerImpl(mockAlwaysUpCoreContainer);
+ when(mockAlwaysUpCoreContainer.getClusterEventProducer()).thenReturn(clusterEventProducer);
FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient);
FieldSetter.setField(zkController, ZkController.class.getDeclaredField("cc"), mockAlwaysUpCoreContainer);
when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
when(zkController.getZkClient()).thenReturn(zkClient);
when(zkController.getZkStateReader()).thenReturn(reader);
+ // primitive support for CC.runAsync
+ doAnswer(invocable -> {
+ Runnable r = invocable.getArgument(0);
+ Thread t = new Thread(r);
+ t.start();
+ return null;
+ }).when(mockAlwaysUpCoreContainer).runAsync(any(Runnable.class));
when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod();
when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod();
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
new file mode 100644
index 0000000..8da6bea
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events;
+
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class AllEventsListener implements ClusterEventListener {
+ CountDownLatch eventLatch = new CountDownLatch(1);
+ ClusterEvent.EventType expectedType;
+ Map<ClusterEvent.EventType, List<ClusterEvent>> events = new HashMap<>();
+
+ @Override
+ public String getName() {
+ return "allEventsListener";
+ }
+
+ @Override
+ public void onEvent(ClusterEvent event) {
+ events.computeIfAbsent(event.getType(), type -> new ArrayList<>()).add(event);
+ if (event.getType() == expectedType) {
+ eventLatch.countDown();
+ }
+ }
+
+ public void setExpectedType(ClusterEvent.EventType expectedType) {
+ this.expectedType = expectedType;
+ eventLatch = new CountDownLatch(1);
+ }
+
+ public void waitForExpectedEvent(int timeoutSeconds) throws InterruptedException {
+ boolean await = eventLatch.await(timeoutSeconds, TimeUnit.SECONDS);
+ if (!await) {
+ Assert.fail("Timed out waiting for expected event " + expectedType);
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
new file mode 100644
index 0000000..ea6c5f5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ClusterProperties;
+import org.apache.solr.common.util.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
+/**
+ *
+ */
+public class ClusterEventProducerTest extends SolrCloudTestCase {
+
+ private AllEventsListener eventsListener;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(3)
+ .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+ .configure();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("enable.packages", "true");
+ super.setUp();
+ cluster.deleteAllCollections();
+ eventsListener = new AllEventsListener();
+ cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
+ }
+
+ @After
+ public void teardown() {
+ System.clearProperty("enable.packages");
+ if (eventsListener != null) {
+ cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
+ }
+ }
+
+ @Test
+ public void testEvents() throws Exception {
+
+ // NODES_DOWN
+
+ eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
+
+ // don't kill Overseer
+ JettySolrRunner nonOverseerJetty = null;
+ for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+ if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
+ continue;
+ }
+ nonOverseerJetty = jetty;
+ break;
+ }
+ String nodeName = nonOverseerJetty.getNodeName();
+ cluster.stopJettySolrRunner(nonOverseerJetty);
+ cluster.waitForJettyToStop(nonOverseerJetty);
+ eventsListener.waitForExpectedEvent(10);
+ assertNotNull("should be NODES_DOWN events", eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN));
+ List<ClusterEvent> events = eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN);
+ assertEquals("should be one NODES_DOWN event", 1, events.size());
+ ClusterEvent event = events.get(0);
+ assertEquals("should be NODES_DOWN event type", ClusterEvent.EventType.NODES_DOWN, event.getType());
+ NodesDownEvent nodesDown = (NodesDownEvent) event;
+ assertEquals("should be node " + nodeName, nodeName, nodesDown.getNodeNames().next());
+
+ // NODES_UP
+ eventsListener.setExpectedType(ClusterEvent.EventType.NODES_UP);
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ cluster.waitForNode(newNode, 60);
+ eventsListener.waitForExpectedEvent(10);
+ assertNotNull("should be NODES_UP events", eventsListener.events.get(ClusterEvent.EventType.NODES_UP));
+ events = eventsListener.events.get(ClusterEvent.EventType.NODES_UP);
+ assertEquals("should be one NODES_UP event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be NODES_UP event type", ClusterEvent.EventType.NODES_UP, event.getType());
+ NodesUpEvent nodesUp = (NodesUpEvent) event;
+ assertEquals("should be node " + newNode.getNodeName(), newNode.getNodeName(), nodesUp.getNodeNames().next());
+
+ // COLLECTIONS_ADDED
+ eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
+ String collection = "testNodesEvent_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 1);
+ eventsListener.waitForExpectedEvent(10);
+ assertNotNull("should be COLLECTIONS_ADDED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED));
+ events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED);
+ assertEquals("should be one COLLECTIONS_ADDED event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be COLLECTIONS_ADDED event type", ClusterEvent.EventType.COLLECTIONS_ADDED, event.getType());
+ CollectionsAddedEvent collectionsAdded = (CollectionsAddedEvent) event;
+ assertEquals("should be collection " + collection, collection, collectionsAdded.getCollectionNames().next());
+
+ // COLLECTIONS_REMOVED
+ eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_REMOVED);
+ CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+ cluster.getSolrClient().request(delete);
+ eventsListener.waitForExpectedEvent(10);
+ assertNotNull("should be COLLECTIONS_REMOVED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED));
+ events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED);
+ assertEquals("should be one COLLECTIONS_REMOVED event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be COLLECTIONS_REMOVED event type", ClusterEvent.EventType.COLLECTIONS_REMOVED, event.getType());
+ CollectionsRemovedEvent collectionsRemoved = (CollectionsRemovedEvent) event;
+ assertEquals("should be collection " + collection, collection, collectionsRemoved.getCollectionNames().next());
+
+ // CLUSTER_CONFIG_CHANGED
+ eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ ClusterProperties clusterProperties = new ClusterProperties(cluster.getZkClient());
+ Map<String, Object> oldProps = new HashMap<>(clusterProperties.getClusterProperties());
+ clusterProperties.setClusterProperty("ext.foo", "bar");
+ eventsListener.waitForExpectedEvent(10);
+ assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
+ events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
+ ClusterPropertiesChangedEvent propertiesChanged = (ClusterPropertiesChangedEvent) event;
+ Map<String, Object> newProps = propertiesChanged.getNewClusterProperties();
+ assertEquals("new properties wrong value of the 'ext.foo' property: " + newProps,
+ "bar", newProps.get("ext.foo"));
+
+ // unset the property
+ eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ clusterProperties.setClusterProperty("ext.foo", null);
+ eventsListener.waitForExpectedEvent(10);
+ assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
+ events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ assertEquals("should be two CLUSTER_CONFIG_CHANGED events", 2, events.size());
+ event = events.get(1);
+ assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
+ propertiesChanged = (ClusterPropertiesChangedEvent) event;
+ assertEquals("new properties should not have 'ext.foo' property: " + propertiesChanged.getNewClusterProperties(),
+ null, propertiesChanged.getNewClusterProperties().get("ext.foo"));
+
+ }
+
+ private static CountDownLatch dummyEventLatch = new CountDownLatch(1);
+ private static ClusterEvent lastEvent = null;
+
+ public static class DummyEventListener implements ClusterEventListener {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ boolean running = false;
+ @Override
+ public void onEvent(ClusterEvent event) {
+ if (!running) {
+ log.debug("skipped event, not running: {}", event);
+ return;
+ }
+ if (event.getType() == ClusterEvent.EventType.COLLECTIONS_ADDED ||
+ event.getType() == ClusterEvent.EventType.COLLECTIONS_REMOVED) {
+ log.debug("recorded event {}", Utils.toJSONString(event));
+ lastEvent = event;
+ dummyEventLatch.countDown();
+ } else {
+ log.debug("skipped event, wrong type: {}", event.getType());
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "dummy";
+ }
+
+ @Override
+ public void start() throws Exception {
+ log.debug("starting {}", Integer.toHexString(hashCode()));
+ running = true;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public void stop() {
+ log.debug("stopping {}", Integer.toHexString(hashCode()));
+ running = false;
+ }
+ }
+
+ @Test
+ public void testListenerPlugins() throws Exception {
+ PluginMeta plugin = new PluginMeta();
+ plugin.name = "testplugin";
+ plugin.klass = DummyEventListener.class.getName();
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(POST)
+ .withPayload(singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ //just check if the plugin is indeed registered
+ V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(GET)
+ .build();
+ rsp = readPluginState.process(cluster.getSolrClient());
+ assertEquals(DummyEventListener.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
+
+ String collection = "testListenerPlugins_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 1);
+ boolean await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_ADDED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_ADDED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_ADDED", ClusterEvent.EventType.COLLECTIONS_ADDED, lastEvent.getType());
+ // verify timestamp
+ Instant now = Instant.now();
+ assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+ assertEquals(collection, ((CollectionsAddedEvent)lastEvent).getCollectionNames().next());
+
+ dummyEventLatch = new CountDownLatch(1);
+ lastEvent = null;
+
+ CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+ cluster.getSolrClient().request(delete);
+ await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
+ // verify timestamp
+ now = Instant.now();
+ assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+ assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
new file mode 100644
index 0000000..a764051
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events.impl;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.events.AllEventsListener;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.core.CoreContainer;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
+
+ public static class CollectionsRepairWrapperListener implements ClusterEventListener {
+ final CollectionsRepairEventListener delegate;
+
+ CountDownLatch completed = new CountDownLatch(1);
+
+ CollectionsRepairWrapperListener(CoreContainer cc) throws Exception {
+ delegate = new CollectionsRepairEventListener(cc);
+ }
+
+ @Override
+ public void onEvent(ClusterEvent event) {
+ delegate.onEvent(event);
+ completed.countDown();
+ }
+
+ @Override
+ public String getName() {
+ return "wrapperListener";
+ }
+
+ @Override
+ public void start() throws Exception {
+ delegate.start();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return delegate.isRunning();
+ }
+
+ @Override
+ public void stop() {
+ delegate.stop();
+ }
+ }
+
+ private static AllEventsListener eventsListener = new AllEventsListener();
+ private static CollectionsRepairWrapperListener repairListener;
+
+ private static int NUM_NODES = 3;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(NUM_NODES)
+ .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+ .configure();
+ CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
+ cc.getClusterEventProducer()
+ .registerListener(eventsListener, ClusterEvent.EventType.values());
+ repairListener = new CollectionsRepairWrapperListener(cc);
+ cc.getClusterEventProducer()
+ .registerListener(repairListener, ClusterEvent.EventType.NODES_DOWN);
+ repairListener.start();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ cluster.deleteAllCollections();
+ }
+
+ @Test
+ public void testCollectionRepair() throws Exception {
+ eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
+ String collection = "testCollectionRepair_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 3);
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 3);
+ eventsListener.waitForExpectedEvent(10);
+ eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
+
+ // don't kill Overseer
+ JettySolrRunner nonOverseerJetty = null;
+ for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+ if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
+ continue;
+ }
+ nonOverseerJetty = jetty;
+ break;
+ }
+ String nodeName = nonOverseerJetty.getNodeName();
+ cluster.stopJettySolrRunner(nonOverseerJetty);
+ cluster.waitForJettyToStop(nonOverseerJetty);
+ eventsListener.waitForExpectedEvent(10);
+ cluster.waitForActiveCollection(collection, 1, 2);
+
+ // wait for completed processing in the repair listener
+ boolean await = repairListener.completed.await(60, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timeout waiting for the processing to complete");
+ }
+ cluster.waitForActiveCollection(collection, 1, 3);
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 4c37c17..bd9bf7c 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -92,8 +92,8 @@
expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
//test with an invalid class
- plugin.klass = C1.class.getName();
- expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
+// plugin.klass = C1.class.getName();
+// expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
//test with a valid class. This should succeed now
plugin.klass = C3.class.getName();