blob: 0ac7c4f9fe63c580bd15ce25a0ea4865d6d0cdec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.request.beans.PluginMeta;
import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
import org.apache.solr.cluster.events.impl.DelegatingClusterEventProducer;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonMap;
import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
/**
*
*/
@LogLevel("org.apache.solr.cluster.events=DEBUG")
public class ClusterEventProducerTest extends SolrCloudTestCase {
private AllEventsListener eventsListener;
private Phaser phaser;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(3)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
}
@Before
public void setUp() throws Exception {
System.setProperty("enable.packages", "true");
super.setUp();
cluster.deleteAllCollections();
eventsListener = new AllEventsListener();
cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
ClusterEventProducer clusterEventProducer = cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer();
assertTrue("not a delegating producer? " + clusterEventProducer.getClass(),
clusterEventProducer instanceof DelegatingClusterEventProducer);
DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer) clusterEventProducer;
phaser = new Phaser();
wrapper.setDelegationPhaser(phaser);
}
@After
public void teardown() throws Exception {
System.clearProperty("enable.packages");
if (eventsListener != null) {
cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
eventsListener.events.clear();
}
V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.withMethod(GET)
.build();
V2Response rsp = readPluginState.process(cluster.getSolrClient());
if (rsp._getStr("/plugin/" + ClusterEventProducer.PLUGIN_NAME + "/class", null) != null) {
V2Request req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
.build();
req.process(cluster.getSolrClient());
}
}
@Test
public void testEvents() throws Exception {
int version = phaser.getPhase();
PluginMeta plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName();
plugin.name = ClusterEventProducer.PLUGIN_NAME;
V2Request req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("add", plugin))
.build();
V2Response rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus());
phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
// NODES_DOWN
eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
// don't kill Overseer
JettySolrRunner nonOverseerJetty = null;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
continue;
} else {
nonOverseerJetty = jetty;
break;
}
}
String nodeName = nonOverseerJetty.getNodeName();
cluster.stopJettySolrRunner(nonOverseerJetty);
cluster.waitForJettyToStop(nonOverseerJetty);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be NODES_DOWN events", eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN));
List<ClusterEvent> events = eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN);
assertEquals("should be one NODES_DOWN event", 1, events.size());
ClusterEvent event = events.get(0);
assertEquals("should be NODES_DOWN event type", ClusterEvent.EventType.NODES_DOWN, event.getType());
NodesDownEvent nodesDown = (NodesDownEvent) event;
assertEquals("should be node " + nodeName, nodeName, nodesDown.getNodeNames().next());
// NODES_UP
eventsListener.setExpectedType(ClusterEvent.EventType.NODES_UP);
JettySolrRunner newNode = cluster.startJettySolrRunner();
cluster.waitForNode(newNode, 60);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be NODES_UP events", eventsListener.events.get(ClusterEvent.EventType.NODES_UP));
events = eventsListener.events.get(ClusterEvent.EventType.NODES_UP);
assertEquals("should be one NODES_UP event", 1, events.size());
event = events.get(0);
assertEquals("should be NODES_UP event type", ClusterEvent.EventType.NODES_UP, event.getType());
NodesUpEvent nodesUp = (NodesUpEvent) event;
assertEquals("should be node " + newNode.getNodeName(), newNode.getNodeName(), nodesUp.getNodeNames().next());
// COLLECTIONS_ADDED
eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
String collection = "testNodesEvent_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 1);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be COLLECTIONS_ADDED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED));
events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED);
assertEquals("should be one COLLECTIONS_ADDED event", 1, events.size());
event = events.get(0);
assertEquals("should be COLLECTIONS_ADDED event type", ClusterEvent.EventType.COLLECTIONS_ADDED, event.getType());
CollectionsAddedEvent collectionsAdded = (CollectionsAddedEvent) event;
assertEquals("should be collection " + collection, collection, collectionsAdded.getCollectionNames().next());
// COLLECTIONS_REMOVED
eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_REMOVED);
CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
cluster.getSolrClient().request(delete);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be COLLECTIONS_REMOVED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED));
events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED);
assertEquals("should be one COLLECTIONS_REMOVED event", 1, events.size());
event = events.get(0);
assertEquals("should be COLLECTIONS_REMOVED event type", ClusterEvent.EventType.COLLECTIONS_REMOVED, event.getType());
CollectionsRemovedEvent collectionsRemoved = (CollectionsRemovedEvent) event;
assertEquals("should be collection " + collection, collection, collectionsRemoved.getCollectionNames().next());
// CLUSTER_CONFIG_CHANGED
eventsListener.events.clear();
eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
ClusterProperties clusterProperties = new ClusterProperties(cluster.getZkClient());
Map<String, Object> oldProps = new HashMap<>(clusterProperties.getClusterProperties());
clusterProperties.setClusterProperty("ext.foo", "bar");
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
event = events.get(0);
assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
ClusterPropertiesChangedEvent propertiesChanged = (ClusterPropertiesChangedEvent) event;
Map<String, Object> newProps = propertiesChanged.getNewClusterProperties();
assertEquals("new properties wrong value of the 'ext.foo' property: " + newProps,
"bar", newProps.get("ext.foo"));
// unset the property
eventsListener.events.clear();
eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
clusterProperties.setClusterProperty("ext.foo", null);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
event = events.get(0);
assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
propertiesChanged = (ClusterPropertiesChangedEvent) event;
assertEquals("new properties should not have 'ext.foo' property: " + propertiesChanged.getNewClusterProperties(),
null, propertiesChanged.getNewClusterProperties().get("ext.foo"));
}
private static CountDownLatch dummyEventLatch = new CountDownLatch(1);
private static ClusterEvent lastEvent = null;
public static class DummyEventListener implements ClusterEventListener, ClusterSingleton {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
State state = State.STOPPED;
@Override
public void onEvent(ClusterEvent event) {
if (state != State.RUNNING) {
if (log.isDebugEnabled()) {
log.debug("skipped event, not running: {}", event);
}
return;
}
if (event.getType() == ClusterEvent.EventType.COLLECTIONS_ADDED ||
event.getType() == ClusterEvent.EventType.COLLECTIONS_REMOVED) {
if (log.isDebugEnabled()) {
log.debug("recorded event {}", Utils.toJSONString(event));
}
lastEvent = event;
dummyEventLatch.countDown();
} else {
if (log.isDebugEnabled()) {
log.debug("skipped event, wrong type: {}", event.getType());
}
}
}
@Override
public String getName() {
return "dummy";
}
@Override
public void start() throws Exception {
if (log.isDebugEnabled()) {
log.debug("starting {}", Integer.toHexString(hashCode()));
}
state = State.RUNNING;
}
@Override
public State getState() {
return state;
}
@Override
public void stop() {
if (log.isDebugEnabled()) {
log.debug("stopping {}", Integer.toHexString(hashCode()));
}
state = State.STOPPED;
}
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug("closing {}", Integer.toHexString(hashCode()));
}
}
}
@Test
public void testListenerPlugins() throws Exception {
int version = phaser.getPhase();
PluginMeta plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName();
plugin.name = ClusterEventProducer.PLUGIN_NAME;
V2Request req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("add", plugin))
.build();
V2Response rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus());
version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
plugin = new PluginMeta();
plugin.name = "testplugin";
plugin.klass = DummyEventListener.class.getName();
req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.withMethod(POST)
.withPayload(singletonMap("add", plugin))
.build();
rsp = req.process(cluster.getSolrClient());
//just check if the plugin is indeed registered
V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.withMethod(GET)
.build();
rsp = readPluginState.process(cluster.getSolrClient());
assertEquals(DummyEventListener.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
String collection = "testListenerPlugins_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 1);
boolean await = dummyEventLatch.await(30, TimeUnit.SECONDS);
if (!await) {
fail("Timed out waiting for COLLECTIONS_ADDED event, " + collection);
}
assertNotNull("lastEvent should be COLLECTIONS_ADDED", lastEvent);
assertEquals("lastEvent should be COLLECTIONS_ADDED", ClusterEvent.EventType.COLLECTIONS_ADDED, lastEvent.getType());
// verify timestamp
Instant now = Instant.now();
assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
assertEquals(collection, ((CollectionsAddedEvent)lastEvent).getCollectionNames().next());
dummyEventLatch = new CountDownLatch(1);
lastEvent = null;
CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
cluster.getSolrClient().request(delete);
await = dummyEventLatch.await(30, TimeUnit.SECONDS);
if (!await) {
fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
}
assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
// verify timestamp
now = Instant.now();
assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
// test changing the ClusterEventProducer plugin dynamically
// remove the plugin (a NoOpProducer will be used instead)
req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
.build();
req.process(cluster.getSolrClient());
version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
dummyEventLatch = new CountDownLatch(1);
lastEvent = null;
// should not receive any events now
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 1);
await = dummyEventLatch.await(5, TimeUnit.SECONDS);
if (await) {
fail("should not receive any events but got " + lastEvent);
}
// reinstall the plugin
plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName();
plugin.name = ClusterEventProducer.PLUGIN_NAME;
req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("add", plugin))
.build();
rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus());
phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
dummyEventLatch = new CountDownLatch(1);
lastEvent = null;
cluster.getSolrClient().request(delete);
await = dummyEventLatch.await(30, TimeUnit.SECONDS);
if (!await) {
fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
}
assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
}
}