SOLR-14749: Make sure the plugin config is reloaded on Overseer.
diff --git a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
index 8f1d386..e92f715 100644
--- a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
+++ b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
@@ -26,11 +26,13 @@
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Phaser;
import java.util.function.Supplier;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.lucene.util.ResourceLoaderAware;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.PluginMeta;
@@ -79,12 +81,30 @@
private final Map<String, ApiInfo> currentPlugins = new HashMap<>();
+ private Phaser phaser;
+
@Override
public boolean onChange(Map<String, Object> properties) {
refresh();
+ Phaser localPhaser = phaser; // volatile read
+ if (localPhaser != null) {
+ assert localPhaser.getRegisteredParties() == 1;
+ localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time
+ }
return false;
}
+ /**
+ * A phaser that will advance phases every time {@link #onChange(Map)} is called.
+ * Useful for allowing tests to know when a new configuration is finished getting set.
+ */
+
+ @VisibleForTesting
+ public void setPhaser(Phaser phaser) {
+ phaser.register();
+ this.phaser = phaser;
+ }
+
public void registerListener(PluginRegistryListener listener) {
listeners.add(listener);
}
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 224caf7..fb63e0e 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.IOUtils;
@@ -66,10 +68,12 @@
import static org.apache.solr.filestore.TestDistribPackageStore.uploadKey;
public class TestContainerPlugin extends SolrCloudTestCase {
+ private Phaser phaser;
@Before
public void setup() {
System.setProperty("enable.packages", "true");
+ phaser = new Phaser();
}
@After
@@ -80,9 +84,14 @@
@Test
public void testApi() throws Exception {
MiniSolrCloudCluster cluster =
- configureCluster(4)
- .withJettyConfig(jetty -> jetty.enableV2(true))
- .configure();
+ configureCluster(4)
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .configure();
+ ContainerPluginsRegistry pluginsRegistry = cluster.getOpenOverseer().getCoreContainer().getContainerPluginsRegistry();
+ pluginsRegistry.setPhaser(phaser);
+
+ int version = phaser.getPhase();
+
String errPath = "/error/details[0]/errorMessages[0]";
try {
PluginMeta plugin = new PluginMeta();
@@ -90,39 +99,43 @@
plugin.klass = C2.class.getName();
//test with an invalid class
V2Request req = new V2Request.Builder("/cluster/plugin")
- .forceV2(true)
- .POST()
- .withPayload(singletonMap("add", plugin))
- .build();
+ .forceV2(true)
+ .POST()
+ .withPayload(singletonMap("add", plugin))
+ .build();
expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
//test with a valid class. This should succeed now
plugin.klass = C3.class.getName();
req.process(cluster.getSolrClient());
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
+
//just check if the plugin is indeed registered
V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
- .forceV2(true)
- .GET()
- .build();
+ .forceV2(true)
+ .GET()
+ .build();
V2Response rsp = readPluginState.process(cluster.getSolrClient());
assertEquals(C3.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
//let's test the plugin
TestDistribPackageStore.assertResponseValues(10,
- () -> new V2Request.Builder("/plugin/my/plugin")
- .forceV2(true)
- .GET()
- .build().process(cluster.getSolrClient()),
- ImmutableMap.of("/testkey", "testval"));
+ () -> new V2Request.Builder("/plugin/my/plugin")
+ .forceV2(true)
+ .GET()
+ .build().process(cluster.getSolrClient()),
+ ImmutableMap.of("/testkey", "testval"));
//now remove the plugin
new V2Request.Builder("/cluster/plugin")
- .POST()
- .forceV2(true)
- .withPayload("{remove : testplugin}")
- .build()
- .process(cluster.getSolrClient());
+ .POST()
+ .forceV2(true)
+ .withPayload("{remove : testplugin}")
+ .build()
+ .process(cluster.getSolrClient());
+
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
//verify it is removed
rsp = readPluginState.process(cluster.getSolrClient());
@@ -138,50 +151,55 @@
plugin.pathPrefix = "my-random-prefix";
req.process(cluster.getSolrClient());
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
//let's test the plugin
TestDistribPackageStore.assertResponseValues(10,
- () -> new V2Request.Builder("/my-random-name/my/plugin")
- .forceV2(true)
- .GET()
- .build().process(cluster.getSolrClient()),
- ImmutableMap.of("/method.name", "m1"));
+ () -> new V2Request.Builder("/my-random-name/my/plugin")
+ .forceV2(true)
+ .GET()
+ .build().process(cluster.getSolrClient()),
+ ImmutableMap.of("/method.name", "m1"));
- TestDistribPackageStore.assertResponseValues(10,
- () -> new V2Request.Builder("/my-random-prefix/their/plugin")
- .forceV2(true)
- .GET()
- .build().process(cluster.getSolrClient()),
- ImmutableMap.of("/method.name", "m2"));
+ TestDistribPackageStore.assertResponseValues(10,
+ () -> new V2Request.Builder("/my-random-prefix/their/plugin")
+ .forceV2(true)
+ .GET()
+ .build().process(cluster.getSolrClient()),
+ ImmutableMap.of("/method.name", "m2"));
//now remove the plugin
new V2Request.Builder("/cluster/plugin")
- .POST()
- .forceV2(true)
- .withPayload("{remove : my-random-name}")
- .build()
- .process(cluster.getSolrClient());
+ .POST()
+ .forceV2(true)
+ .withPayload("{remove : my-random-name}")
+ .build()
+ .process(cluster.getSolrClient());
- expectFail( () -> new V2Request.Builder("/my-random-prefix/their/plugin")
- .forceV2(true)
- .GET()
- .build()
- .process(cluster.getSolrClient()));
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
+
expectFail(() -> new V2Request.Builder("/my-random-prefix/their/plugin")
- .forceV2(true)
- .GET()
- .build()
- .process(cluster.getSolrClient()));
+ .forceV2(true)
+ .GET()
+ .build()
+ .process(cluster.getSolrClient()));
+ expectFail(() -> new V2Request.Builder("/my-random-prefix/their/plugin")
+ .forceV2(true)
+ .GET()
+ .build()
+ .process(cluster.getSolrClient()));
// test ClusterSingleton plugin
plugin.name = "clusterSingleton";
plugin.klass = C6.class.getName();
req.process(cluster.getSolrClient());
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
- //just check if the plugin is indeed registered
+
+ //just check if the plugin is indeed registered
readPluginState = new V2Request.Builder("/cluster/plugin")
- .forceV2(true)
- .GET()
- .build();
+ .forceV2(true)
+ .GET()
+ .build();
rsp = readPluginState.process(cluster.getSolrClient());
assertEquals(C6.class.getName(), rsp._getStr("/plugin/clusterSingleton/class", null));
@@ -189,9 +207,9 @@
assertTrue("startCalled", C6.startCalled);
assertFalse("stopCalled", C6.stopCalled);
- assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC()));
- assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC1()));
- assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC2()));
+ assertEquals(CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC()));
+ assertEquals(CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC1()));
+ assertEquals(CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC2()));
CConfig cfg = new CConfig();
cfg.boolVal = Boolean.TRUE;
@@ -203,34 +221,38 @@
p.config = cfg;
new V2Request.Builder("/cluster/plugin")
- .forceV2(true)
- .POST()
- .withPayload(singletonMap("add", p))
- .build()
- .process(cluster.getSolrClient());
- TestDistribPackageStore.assertResponseValues(10,
- () -> new V2Request.Builder("hello/plugin")
.forceV2(true)
- .GET()
- .build().process(cluster.getSolrClient()),
- ImmutableMap.of("/config/boolVal", "true", "/config/strVal", "Something","/config/longVal", "1234" ));
+ .POST()
+ .withPayload(singletonMap("add", p))
+ .build()
+ .process(cluster.getSolrClient());
- cfg.strVal = "Something else";
- new V2Request.Builder("/cluster/plugin")
- .forceV2(true)
- .POST()
- .withPayload(singletonMap("update", p))
- .build()
- .process(cluster.getSolrClient());
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
TestDistribPackageStore.assertResponseValues(10,
- () -> new V2Request.Builder("hello/plugin")
- .forceV2(true)
- .GET()
- .build().process(cluster.getSolrClient()),
- ImmutableMap.of("/config/boolVal", "true", "/config/strVal", cfg.strVal,"/config/longVal", "1234" ));
+ () -> new V2Request.Builder("hello/plugin")
+ .forceV2(true)
+ .GET()
+ .build().process(cluster.getSolrClient()),
+ ImmutableMap.of("/config/boolVal", "true", "/config/strVal", "Something", "/config/longVal", "1234"));
- // kill the Overseer leader
+ cfg.strVal = "Something else";
+ new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .POST()
+ .withPayload(singletonMap("update", p))
+ .build()
+ .process(cluster.getSolrClient());
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
+
+ TestDistribPackageStore.assertResponseValues(10,
+ () -> new V2Request.Builder("hello/plugin")
+ .forceV2(true)
+ .GET()
+ .build().process(cluster.getSolrClient()),
+ ImmutableMap.of("/config/boolVal", "true", "/config/strVal", cfg.strVal, "/config/longVal", "1234"));
+
+ // kill the Overseer leader
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (!jetty.getCoreContainer().getZkController().getOverseer().isClosed()) {
cluster.stopJettySolrRunner(jetty);
@@ -244,7 +266,7 @@
}
private void expectFail(ThrowingRunnable runnable) throws Exception {
- for(int i=0;i< 20;i++) {
+ for (int i = 0; i < 20; i++) {
try {
runnable.run();
} catch (Throwable throwable) {
@@ -254,34 +276,40 @@
}
fail("should have failed with an exception");
}
+
@Test
public void testApiFromPackage() throws Exception {
MiniSolrCloudCluster cluster =
- configureCluster(4)
- .withJettyConfig(jetty -> jetty.enableV2(true))
- .configure();
+ configureCluster(4)
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .configure();
String FILE1 = "/myplugin/v1.jar";
String FILE2 = "/myplugin/v2.jar";
+ ContainerPluginsRegistry pluginsRegistry = cluster.getOpenOverseer().getCoreContainer().getContainerPluginsRegistry();
+ pluginsRegistry.setPhaser(phaser);
+
+ int version = phaser.getPhase();
+
String errPath = "/error/details[0]/errorMessages[0]";
try {
byte[] derFile = readFile("cryptokeys/pub_key512.der");
- uploadKey(derFile, PackageStoreAPI.KEYS_DIR+"/pub_key512.der", cluster);
+ uploadKey(derFile, PackageStoreAPI.KEYS_DIR + "/pub_key512.der", cluster);
TestPackages.postFileAndWait(cluster, "runtimecode/containerplugin.v.1.jar.bin", FILE1,
- "pmrmWCDafdNpYle2rueAGnU2J6NYlcAey9mkZYbqh+5RdYo2Ln+llLF9voyRj+DDivK9GV1XdtKvD9rgCxlD7Q==");
- TestPackages.postFileAndWait(cluster, "runtimecode/containerplugin.v.2.jar.bin", FILE2,
- "StR3DmqaUSL7qjDOeVEiCqE+ouiZAkW99fsL48F9oWG047o7NGgwwZ36iGgzDC3S2tPaFjRAd9Zg4UK7OZLQzg==");
+ "pmrmWCDafdNpYle2rueAGnU2J6NYlcAey9mkZYbqh+5RdYo2Ln+llLF9voyRj+DDivK9GV1XdtKvD9rgCxlD7Q==");
+ TestPackages.postFileAndWait(cluster, "runtimecode/containerplugin.v.2.jar.bin", FILE2,
+ "StR3DmqaUSL7qjDOeVEiCqE+ouiZAkW99fsL48F9oWG047o7NGgwwZ36iGgzDC3S2tPaFjRAd9Zg4UK7OZLQzg==");
- // We have two versions of the plugin in 2 different jar files. they are already uploaded to the package store
+ // We have two versions of the plugin in 2 different jar files. they are already uploaded to the package store
Package.AddVersion add = new Package.AddVersion();
add.version = "1.0";
add.pkg = "mypkg";
add.files = singletonList(FILE1);
V2Request addPkgVersionReq = new V2Request.Builder("/cluster/package")
- .forceV2(true)
- .POST()
- .withPayload(singletonMap("add", add))
- .build();
+ .forceV2(true)
+ .POST()
+ .withPayload(singletonMap("add", add))
+ .build();
addPkgVersionReq.process(cluster.getSolrClient());
waitForAllNodesToSync(cluster, "/cluster/package", Utils.makeMap(
@@ -295,28 +323,30 @@
plugin.klass = "mypkg:org.apache.solr.handler.MyPlugin";
plugin.version = add.version;
final V2Request req1 = new V2Request.Builder("/cluster/plugin")
- .forceV2(true)
- .POST()
- .withPayload(singletonMap("add", plugin))
- .build();
+ .forceV2(true)
+ .POST()
+ .withPayload(singletonMap("add", plugin))
+ .build();
req1.process(cluster.getSolrClient());
- //verify the plugin creation
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
+
+ //verify the plugin creation
TestDistribPackageStore.assertResponseValues(10,
- () -> new V2Request.Builder("/cluster/plugin").
- GET()
- .build().process(cluster.getSolrClient()),
- ImmutableMap.of(
- "/plugin/myplugin/class", plugin.klass,
- "/plugin/myplugin/version", plugin.version
- ));
+ () -> new V2Request.Builder("/cluster/plugin").
+ GET()
+ .build().process(cluster.getSolrClient()),
+ ImmutableMap.of(
+ "/plugin/myplugin/class", plugin.klass,
+ "/plugin/myplugin/version", plugin.version
+ ));
//let's test this now
Callable<NavigableObject> invokePlugin = () -> new V2Request.Builder("/plugin/my/path")
- .forceV2(true)
- .GET()
- .build().process(cluster.getSolrClient());
+ .forceV2(true)
+ .GET()
+ .build().process(cluster.getSolrClient());
TestDistribPackageStore.assertResponseValues(10,
- invokePlugin,
- ImmutableMap.of("/myplugin.version", "1.0"));
+ invokePlugin,
+ ImmutableMap.of("/myplugin.version", "1.0"));
//now let's upload the jar file for version 2.0 of the plugin
add.version = "2.0";
@@ -326,32 +356,34 @@
//here the plugin version is updated
plugin.version = add.version;
new V2Request.Builder("/cluster/plugin")
- .forceV2(true)
- .POST()
- .withPayload(singletonMap("update", plugin))
- .build()
- .process(cluster.getSolrClient());
+ .forceV2(true)
+ .POST()
+ .withPayload(singletonMap("update", plugin))
+ .build()
+ .process(cluster.getSolrClient());
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
//now verify if it is indeed updated
TestDistribPackageStore.assertResponseValues(10,
- () -> new V2Request.Builder("/cluster/plugin").
- GET()
- .build().process(cluster.getSolrClient()),
- ImmutableMap.of(
- "/plugin/myplugin/class", plugin.klass,
- "/plugin/myplugin/version", "2.0"
- ));
+ () -> new V2Request.Builder("/cluster/plugin").
+ GET()
+ .build().process(cluster.getSolrClient()),
+ ImmutableMap.of(
+ "/plugin/myplugin/class", plugin.klass,
+ "/plugin/myplugin/version", "2.0"
+ ));
// invoke the plugin and test thye output
TestDistribPackageStore.assertResponseValues(10,
- invokePlugin,
- ImmutableMap.of("/myplugin.version", "2.0"));
+ invokePlugin,
+ ImmutableMap.of("/myplugin.version", "2.0"));
plugin.name = "plugin2";
- plugin.klass = "mypkg:"+ C5.class.getName();
+ plugin.klass = "mypkg:" + C5.class.getName();
plugin.version = "2.0";
req1.process(cluster.getSolrClient());
+ version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
assertNotNull(C5.classData);
- assertEquals( 1452, C5.classData.limit());
+ assertEquals(1452, C5.classData.limit());
} finally {
cluster.shutdown();
}
@@ -360,14 +392,15 @@
public static class CC1 extends CC {
}
+
public static class CC2 extends CC1 {
}
+
public static class CC implements ConfigurablePlugin<CConfig> {
private CConfig cfg;
-
@Override
public void configure(CConfig cfg) {
this.cfg = cfg;
@@ -375,8 +408,8 @@
}
@EndPoint(method = GET,
- path = "/hello/plugin",
- permission = PermissionNameProvider.Name.READ_PERM)
+ path = "/hello/plugin",
+ permission = PermissionNameProvider.Name.READ_PERM)
public void m2(SolrQueryRequest req, SolrQueryResponse rsp) {
rsp.add("config", cfg);
}
@@ -436,24 +469,24 @@
public static class C5 implements ResourceLoaderAware {
static ByteBuffer classData;
- private SolrResourceLoader resourceLoader;
+ private SolrResourceLoader resourceLoader;
@Override
public void inform(ResourceLoader loader) throws IOException {
this.resourceLoader = (SolrResourceLoader) loader;
try {
InputStream is = resourceLoader.openResource("org/apache/solr/handler/MyPlugin.class");
- byte[] buf = new byte[1024*5];
+ byte[] buf = new byte[1024 * 5];
int sz = IOUtils.read(is, buf);
- classData = ByteBuffer.wrap(buf, 0,sz);
+ classData = ByteBuffer.wrap(buf, 0, sz);
} catch (IOException e) {
//do not do anything
}
}
@EndPoint(method = GET,
- path = "/$plugin-name/m2",
- permission = PermissionNameProvider.Name.COLL_READ_PERM)
+ path = "/$plugin-name/m2",
+ permission = PermissionNameProvider.Name.COLL_READ_PERM)
public void m2() {
@@ -466,18 +499,18 @@
}
@EndPoint(
- method = GET,
- path = "/plugin/my/plugin",
- permission = PermissionNameProvider.Name.COLL_READ_PERM)
+ method = GET,
+ path = "/plugin/my/plugin",
+ permission = PermissionNameProvider.Name.COLL_READ_PERM)
public class C2 {
}
@EndPoint(
- method = GET,
- path = "/plugin/my/plugin",
- permission = PermissionNameProvider.Name.COLL_READ_PERM)
+ method = GET,
+ path = "/plugin/my/plugin",
+ permission = PermissionNameProvider.Name.COLL_READ_PERM)
public static class C3 {
@Command
public void read(SolrQueryRequest req, SolrQueryResponse rsp) {
@@ -489,15 +522,15 @@
public static class C4 {
@EndPoint(method = GET,
- path = "$plugin-name/my/plugin",
- permission = PermissionNameProvider.Name.READ_PERM)
+ path = "$plugin-name/my/plugin",
+ permission = PermissionNameProvider.Name.READ_PERM)
public void m1(SolrQueryRequest req, SolrQueryResponse rsp) {
rsp.add("method.name", "m1");
}
@EndPoint(method = GET,
- path = "$path-prefix/their/plugin",
- permission = PermissionNameProvider.Name.READ_PERM)
+ path = "$path-prefix/their/plugin",
+ permission = PermissionNameProvider.Name.READ_PERM)
public void m2(SolrQueryRequest req, SolrQueryResponse rsp) {
rsp.add("method.name", "m2");
}
@@ -505,7 +538,7 @@
}
@SuppressWarnings("unchecked")
- public static void waitForAllNodesToSync(MiniSolrCloudCluster cluster, String path, Map<String,Object> expected) throws Exception {
+ public static void waitForAllNodesToSync(MiniSolrCloudCluster cluster, String path, Map<String, Object> expected) throws Exception {
for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
String baseUrl = jettySolrRunner.getBaseUrl().toString().replace("/solr", "/api");
String url = baseUrl + path + "?wt=javabin";