Merge pull request #9305: Revert "[BEAM-7916] Add ElasticsearchIO query parameter to take a ValueProvider"

diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index f234716..19bda80 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -131,15 +131,9 @@
   }
 
   @Test
-  public void testReadWithQueryString() throws Exception {
+  public void testReadWithQuery() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryString();
-  }
-
-  @Test
-  public void testReadWithQueryValueProvider() throws Exception {
-    elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+    elasticsearchIOTestCommon.testReadWithQuery();
   }
 
   @Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d809cfd..05686cd 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -122,21 +122,12 @@
   }
 
   @Test
-  public void testReadWithQueryString() throws Exception {
+  public void testReadWithQuery() throws Exception {
     // need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
     createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryString();
-  }
-
-  @Test
-  public void testReadWithQueryValueProvider() throws Exception {
-    // need to create the index using the helper method (not create it at first insertion)
-    // for the indexSettings() to be run
-    createIndex(getEsIndex());
-    elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+    elasticsearchIOTestCommon.testReadWithQuery();
   }
 
   @Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 84696e5..6638b7d 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -122,21 +122,12 @@
   }
 
   @Test
-  public void testReadWithQueryString() throws Exception {
+  public void testReadWithQuery() throws Exception {
     // need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
     createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryString();
-  }
-
-  @Test
-  public void testReadWithQueryValueProvider() throws Exception {
-    // need to create the index using the helper method (not create it at first insertion)
-    // for the indexSettings() to be run
-    createIndex(getEsIndex());
-    elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+    elasticsearchIOTestCommon.testReadWithQuery();
   }
 
   @Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index a39f2c7..112e0e2 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -45,13 +45,11 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.BiFunction;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate;
 import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -198,17 +196,7 @@
     pipeline.run();
   }
 
-  void testReadWithQueryString() throws Exception {
-    testReadWithQueryInternal(Read::withQuery);
-  }
-
-  void testReadWithQueryValueProvider() throws Exception {
-    testReadWithQueryInternal(
-        (read, query) -> read.withQuery(ValueProvider.StaticValueProvider.of(query)));
-  }
-
-  private void testReadWithQueryInternal(BiFunction<Read, String, Read> queryConfigurer)
-      throws IOException {
+  void testReadWithQuery() throws Exception {
     if (!useAsITests) {
       ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
     }
@@ -224,12 +212,11 @@
             + "  }\n"
             + "}";
 
-    Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
-
-    read = queryConfigurer.apply(read, query);
-
-    PCollection<String> output = pipeline.apply(read);
-
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withQuery(query));
     PAssert.thatSingleton(output.apply("Count", Count.globally()))
         .isEqualTo(numDocs / NUM_SCIENTISTS);
     pipeline.run();
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 41b2eb9..ec688fb 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -51,7 +51,6 @@
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -455,7 +454,7 @@
     abstract ConnectionConfiguration getConnectionConfiguration();
 
     @Nullable
-    abstract ValueProvider<String> getQuery();
+    abstract String getQuery();
 
     abstract boolean isWithMetadata();
 
@@ -469,7 +468,7 @@
     abstract static class Builder {
       abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
 
-      abstract Builder setQuery(ValueProvider<String> query);
+      abstract Builder setQuery(String query);
 
       abstract Builder setWithMetadata(boolean withMetadata);
 
@@ -503,20 +502,6 @@
     public Read withQuery(String query) {
       checkArgument(query != null, "query can not be null");
       checkArgument(!query.isEmpty(), "query can not be empty");
-      return withQuery(ValueProvider.StaticValueProvider.of(query));
-    }
-
-    /**
-     * Provide a {@link ValueProvider} that provides the query used while reading from
-     * Elasticsearch. This is useful for cases when the query must be dynamic.
-     *
-     * @param query the query. See <a
-     *     href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html">Query
-     *     DSL</a>
-     * @return a {@link PTransform} reading data from Elasticsearch.
-     */
-    public Read withQuery(ValueProvider<String> query) {
-      checkArgument(query != null, "query can not be null");
       return builder().setQuery(query).build();
     }
 
@@ -741,7 +726,7 @@
     public boolean start() throws IOException {
       restClient = source.spec.getConnectionConfiguration().createClient();
 
-      String query = source.spec.getQuery() != null ? source.spec.getQuery().get() : null;
+      String query = source.spec.getQuery();
       if (query == null) {
         query = "{\"query\": { \"match_all\": {} }}";
       }