[pulsar-io-elastic-search] NumberOfReplicas can be 0 (#4328)
Motivation
Replicating shards in Elasticsearch is not mandatory, so we should have a way to set numberOfReplicas to 0 in sink configuration.
Modifications
Change condition on indexNumberOfReplicas to accept 0 value
Update error message
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 1268f19..a9ee622 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -111,11 +111,11 @@
}
if (indexNumberOfShards < 1) {
- throw new IllegalArgumentException("indexNumberOfShards must be a positive integer");
+ throw new IllegalArgumentException("indexNumberOfShards must be a strictly positive integer");
}
- if (indexNumberOfReplicas < 1) {
+ if (indexNumberOfReplicas < 0) {
throw new IllegalArgumentException("indexNumberOfReplicas must be a positive integer");
}
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
index 65b6c22..63fbe16 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java
@@ -70,6 +70,19 @@
config.validate();
}
+ @Test
+ public final void zeroReplicasValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("elasticSearchUrl", "http://localhost:90902");
+ map.put("indexName", "myIndex");
+ map.put("username", "racerX");
+ map.put("password", "go-speedie-go");
+ map.put("indexNumberOfReplicas", "0");
+
+ ElasticSearchConfig config = ElasticSearchConfig.load(map);
+ config.validate();
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Required property not set.")
public final void missingRequiredPropertiesTest() throws IOException {
@@ -81,7 +94,7 @@
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "indexNumberOfShards must be a positive integer")
+ expectedExceptionsMessageRegExp = "indexNumberOfShards must be a strictly positive integer")
public final void invalidPropertyValueTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:90902");