Merge pull request #9305: Revert "[BEAM-7916] Add ElasticsearchIO query parameter to take a ValueProvider"
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index f234716..19bda80 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -131,15 +131,9 @@
}
@Test
- public void testReadWithQueryString() throws Exception {
+ public void testReadWithQuery() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
- elasticsearchIOTestCommon.testReadWithQueryString();
- }
-
- @Test
- public void testReadWithQueryValueProvider() throws Exception {
- elasticsearchIOTestCommon.setPipeline(pipeline);
- elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+ elasticsearchIOTestCommon.testReadWithQuery();
}
@Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d809cfd..05686cd 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -122,21 +122,12 @@
}
@Test
- public void testReadWithQueryString() throws Exception {
+ public void testReadWithQuery() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
createIndex(getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
- elasticsearchIOTestCommon.testReadWithQueryString();
- }
-
- @Test
- public void testReadWithQueryValueProvider() throws Exception {
- // need to create the index using the helper method (not create it at first insertion)
- // for the indexSettings() to be run
- createIndex(getEsIndex());
- elasticsearchIOTestCommon.setPipeline(pipeline);
- elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+ elasticsearchIOTestCommon.testReadWithQuery();
}
@Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 84696e5..6638b7d 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -122,21 +122,12 @@
}
@Test
- public void testReadWithQueryString() throws Exception {
+ public void testReadWithQuery() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
createIndex(getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
- elasticsearchIOTestCommon.testReadWithQueryString();
- }
-
- @Test
- public void testReadWithQueryValueProvider() throws Exception {
- // need to create the index using the helper method (not create it at first insertion)
- // for the indexSettings() to be run
- createIndex(getEsIndex());
- elasticsearchIOTestCommon.setPipeline(pipeline);
- elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+ elasticsearchIOTestCommon.testReadWithQuery();
}
@Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index a39f2c7..112e0e2 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -45,13 +45,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.function.BiFunction;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -198,17 +196,7 @@
pipeline.run();
}
- void testReadWithQueryString() throws Exception {
- testReadWithQueryInternal(Read::withQuery);
- }
-
- void testReadWithQueryValueProvider() throws Exception {
- testReadWithQueryInternal(
- (read, query) -> read.withQuery(ValueProvider.StaticValueProvider.of(query)));
- }
-
- private void testReadWithQueryInternal(BiFunction<Read, String, Read> queryConfigurer)
- throws IOException {
+ void testReadWithQuery() throws Exception {
if (!useAsITests) {
ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
}
@@ -224,12 +212,11 @@
+ " }\n"
+ "}";
- Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
-
- read = queryConfigurer.apply(read, query);
-
- PCollection<String> output = pipeline.apply(read);
-
+ PCollection<String> output =
+ pipeline.apply(
+ ElasticsearchIO.read()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withQuery(query));
PAssert.thatSingleton(output.apply("Count", Count.globally()))
.isEqualTo(numDocs / NUM_SCIENTISTS);
pipeline.run();
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 41b2eb9..ec688fb 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -51,7 +51,6 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -455,7 +454,7 @@
abstract ConnectionConfiguration getConnectionConfiguration();
@Nullable
- abstract ValueProvider<String> getQuery();
+ abstract String getQuery();
abstract boolean isWithMetadata();
@@ -469,7 +468,7 @@
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
- abstract Builder setQuery(ValueProvider<String> query);
+ abstract Builder setQuery(String query);
abstract Builder setWithMetadata(boolean withMetadata);
@@ -503,20 +502,6 @@
public Read withQuery(String query) {
checkArgument(query != null, "query can not be null");
checkArgument(!query.isEmpty(), "query can not be empty");
- return withQuery(ValueProvider.StaticValueProvider.of(query));
- }
-
- /**
- * Provide a {@link ValueProvider} that provides the query used while reading from
- * Elasticsearch. This is useful for cases when the query must be dynamic.
- *
- * @param query the query. See <a
- * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html">Query
- * DSL</a>
- * @return a {@link PTransform} reading data from Elasticsearch.
- */
- public Read withQuery(ValueProvider<String> query) {
- checkArgument(query != null, "query can not be null");
return builder().setQuery(query).build();
}
@@ -741,7 +726,7 @@
public boolean start() throws IOException {
restClient = source.spec.getConnectionConfiguration().createClient();
- String query = source.spec.getQuery() != null ? source.spec.getQuery().get() : null;
+ String query = source.spec.getQuery();
if (query == null) {
query = "{\"query\": { \"match_all\": {} }}";
}