| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.elasticsearch; |
| |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.FAMOUS_SCIENTISTS; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.NUM_SCIENTISTS; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByMatch; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByScientistName; |
| import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshIndexAndGetCurrentNumDocs; |
| import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; |
| import static org.hamcrest.Matchers.greaterThan; |
| import static org.hamcrest.Matchers.lessThan; |
| import static org.hamcrest.core.Is.isA; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.function.BiFunction; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate; |
| import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.SourceTestUtils; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.transforms.Count; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFnTester; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.entity.ContentType; |
| import org.apache.http.nio.entity.NStringEntity; |
| import org.elasticsearch.client.Response; |
| import org.elasticsearch.client.RestClient; |
| import org.hamcrest.CustomMatcher; |
| import org.joda.time.Duration; |
| import org.junit.rules.ExpectedException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Common test class for {@link ElasticsearchIO}. */ |
| class ElasticsearchIOTestCommon implements Serializable { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTestCommon.class); |
| |
| private static final RetryPredicate CUSTOM_RETRY_PREDICATE = new DefaultRetryPredicate(400); |
| |
| private static final int EXPECTED_RETRIES = 2; |
| private static final int MAX_ATTEMPTS = 3; |
| private static final String[] BAD_FORMATTED_DOC = {"{ \"x\" :a,\"y\":\"ab\" }"}; |
| private static final String OK_REQUEST = |
| "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n" |
| + "{ \"field1\" : 1 }\n"; |
| private static final String BAD_REQUEST = |
| "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n" |
| + "{ \"field1\" : @ }\n"; |
| |
| static String getEsIndex() { |
| return "beam" + Thread.currentThread().getId(); |
| } |
| |
| static final String ES_TYPE = "test"; |
| static final long NUM_DOCS_UTESTS = 400L; |
| static final long NUM_DOCS_ITESTS = 50000L; |
| static final float ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE = 0.5f; |
| private static final long AVERAGE_DOC_SIZE = 25L; |
| |
| private static final long BATCH_SIZE = 200L; |
| private static final long BATCH_SIZE_BYTES = 2048L; |
| |
| public static final String UPDATE_INDEX = "partial_update"; |
| public static final String UPDATE_TYPE = "test"; |
| |
| private final long numDocs; |
| private final ConnectionConfiguration connectionConfiguration; |
| private final RestClient restClient; |
| private final boolean useAsITests; |
| |
| private TestPipeline pipeline; |
| private ExpectedException expectedException; |
| |
| ElasticsearchIOTestCommon( |
| ConnectionConfiguration connectionConfiguration, RestClient restClient, boolean useAsITests) { |
| this.connectionConfiguration = connectionConfiguration; |
| this.restClient = restClient; |
| this.numDocs = useAsITests ? NUM_DOCS_ITESTS : NUM_DOCS_UTESTS; |
| this.useAsITests = useAsITests; |
| } |
| |
| // lazy init of the test rules (cannot be static) |
| void setPipeline(TestPipeline pipeline) { |
| this.pipeline = pipeline; |
| } |
| |
| void setExpectedException(ExpectedException expectedException) { |
| this.expectedException = expectedException; |
| } |
| |
| void testSplit(final int desiredBundleSizeBytes) throws Exception { |
| if (!useAsITests) { |
| ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); |
| } |
| PipelineOptions options = PipelineOptionsFactory.create(); |
| Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); |
| BoundedElasticsearchSource initialSource = |
| new BoundedElasticsearchSource(read, null, null, null); |
| List<? extends BoundedSource<String>> splits = |
| initialSource.split(desiredBundleSizeBytes, options); |
| SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); |
| long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration); |
| |
| int expectedNumSources; |
| if (desiredBundleSizeBytes == 0) { |
| // desiredBundleSize is ignored because in ES 2.x there is no way to split shards. |
| // 5 is the number of ES shards |
| // (By default, each index in Elasticsearch is allocated 5 primary shards) |
| expectedNumSources = 5; |
| } else { |
| float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes; |
| expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat); |
| } |
| assertEquals("Wrong number of splits", expectedNumSources, splits.size()); |
| |
| int emptySplits = 0; |
| for (BoundedSource<String> subSource : splits) { |
| if (readFromSource(subSource, options).isEmpty()) { |
| emptySplits += 1; |
| } |
| } |
| assertThat( |
| "There are too many empty splits, parallelism is sub-optimal", |
| emptySplits, |
| lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size()))); |
| } |
| |
| void testSizes() throws Exception { |
| if (!useAsITests) { |
| ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); |
| } |
| PipelineOptions options = PipelineOptionsFactory.create(); |
| Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); |
| BoundedElasticsearchSource initialSource = |
| new BoundedElasticsearchSource(read, null, null, null); |
| // can't use equal assert as Elasticsearch indexes never have same size |
| // (due to internal Elasticsearch implementation) |
| long estimatedSize = initialSource.getEstimatedSizeBytes(options); |
| LOG.info("Estimated size: {}", estimatedSize); |
| assertThat("Wrong estimated size", estimatedSize, greaterThan(AVERAGE_DOC_SIZE * numDocs)); |
| } |
| |
| void testRead() throws Exception { |
| if (!useAsITests) { |
| ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); |
| } |
| |
| PCollection<String> output = |
| pipeline.apply( |
| ElasticsearchIO.read() |
| .withConnectionConfiguration(connectionConfiguration) |
| // set to default value, useful just to test parameter passing. |
| .withScrollKeepalive("5m") |
| // set to default value, useful just to test parameter passing. |
| .withBatchSize(100L)); |
| PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(numDocs); |
| pipeline.run(); |
| } |
| |
| void testReadWithQueryString() throws Exception { |
| testReadWithQueryInternal(Read::withQuery); |
| } |
| |
| void testReadWithQueryValueProvider() throws Exception { |
| testReadWithQueryInternal( |
| (read, query) -> read.withQuery(ValueProvider.StaticValueProvider.of(query))); |
| } |
| |
| private void testReadWithQueryInternal(BiFunction<Read, String, Read> queryConfigurer) |
| throws IOException { |
| if (!useAsITests) { |
| ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); |
| } |
| |
| String query = |
| "{\n" |
| + " \"query\": {\n" |
| + " \"match\" : {\n" |
| + " \"scientist\" : {\n" |
| + " \"query\" : \"Einstein\"\n" |
| + " }\n" |
| + " }\n" |
| + " }\n" |
| + "}"; |
| |
| Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); |
| |
| read = queryConfigurer.apply(read, query); |
| |
| PCollection<String> output = pipeline.apply(read); |
| |
| PAssert.thatSingleton(output.apply("Count", Count.globally())) |
| .isEqualTo(numDocs / NUM_SCIENTISTS); |
| pipeline.run(); |
| } |
| |
| /** Test reading metadata by reading back the id of a document after writing it. */ |
| void testReadWithMetadata() throws Exception { |
| if (!useAsITests) { |
| ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, 1, restClient); |
| } |
| |
| PCollection<String> output = |
| pipeline.apply( |
| ElasticsearchIO.read() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withMetadata()); |
| PAssert.that(output).satisfies(new ContainsStringCheckerFn("\"_id\":\"0\"")); |
| pipeline.run(); |
| } |
| |
| void testWrite() throws Exception { |
| Write write = ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration); |
| executeWriteTest(write); |
| } |
| |
| void testWriteWithErrors() throws Exception { |
| Write write = |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withMaxBatchSize(BATCH_SIZE); |
| List<String> input = |
| ElasticsearchIOTestUtils.createDocuments( |
| numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS); |
| expectedException.expect(isA(IOException.class)); |
| expectedException.expectMessage( |
| new CustomMatcher<String>("RegExp matcher") { |
| @Override |
| public boolean matches(Object o) { |
| String message = (String) o; |
| // This regexp tests that 2 malformed documents are actually in error |
| // and that the message contains their IDs. |
| // It also ensures that root reason, root error type, |
| // caused by reason and caused by error type are present in message. |
| // To avoid flakiness of the test in case of Elasticsearch error message change, |
| // only "failed to parse" root reason is matched, |
| // the other messages are matched using .+ |
| return message.matches( |
| "(?is).*Error writing to Elasticsearch, some elements could not be inserted" |
| + ".*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*" |
| + "Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*"); |
| } |
| }); |
| // write bundles size is the runner decision, we cannot force a bundle size, |
| // so we test the Writer as a DoFn outside of a runner. |
| try (DoFnTester<String, Void> fnTester = DoFnTester.of(new Write.WriteFn(write))) { |
| // inserts into Elasticsearch |
| fnTester.processBundle(input); |
| } |
| } |
| |
| void testWriteWithMaxBatchSize() throws Exception { |
| Write write = |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withMaxBatchSize(BATCH_SIZE); |
| // write bundles size is the runner decision, we cannot force a bundle size, |
| // so we test the Writer as a DoFn outside of a runner. |
| try (DoFnTester<String, Void> fnTester = DoFnTester.of(new Write.WriteFn(write))) { |
| List<String> input = |
| ElasticsearchIOTestUtils.createDocuments( |
| numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); |
| long numDocsProcessed = 0; |
| long numDocsInserted = 0; |
| for (String document : input) { |
| fnTester.processElement(document); |
| numDocsProcessed++; |
| // test every 100 docs to avoid overloading ES |
| if ((numDocsProcessed % 100) == 0) { |
| // force the index to upgrade after inserting for the inserted docs |
| // to be searchable immediately |
| long currentNumDocs = |
| refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); |
| if ((numDocsProcessed % BATCH_SIZE) == 0) { |
| /* bundle end */ |
| assertEquals( |
| "we are at the end of a bundle, we should have inserted all processed documents", |
| numDocsProcessed, |
| currentNumDocs); |
| numDocsInserted = currentNumDocs; |
| } else { |
| /* not bundle end */ |
| assertEquals( |
| "we are not at the end of a bundle, we should have inserted no more documents", |
| numDocsInserted, |
| currentNumDocs); |
| } |
| } |
| } |
| } |
| } |
| |
| void testWriteWithMaxBatchSizeBytes() throws Exception { |
| Write write = |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withMaxBatchSizeBytes(BATCH_SIZE_BYTES); |
| // write bundles size is the runner decision, we cannot force a bundle size, |
| // so we test the Writer as a DoFn outside of a runner. |
| try (DoFnTester<String, Void> fnTester = DoFnTester.of(new Write.WriteFn(write))) { |
| List<String> input = |
| ElasticsearchIOTestUtils.createDocuments( |
| numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); |
| long numDocsProcessed = 0; |
| long sizeProcessed = 0; |
| long numDocsInserted = 0; |
| long batchInserted = 0; |
| for (String document : input) { |
| fnTester.processElement(document); |
| numDocsProcessed++; |
| sizeProcessed += document.getBytes(StandardCharsets.UTF_8).length; |
| // test every 40 docs to avoid overloading ES |
| if ((numDocsProcessed % 40) == 0) { |
| // force the index to upgrade after inserting for the inserted docs |
| // to be searchable immediately |
| long currentNumDocs = |
| refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); |
| if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) { |
| /* bundle end */ |
| assertThat( |
| "we have passed a bundle size, we should have inserted some documents", |
| currentNumDocs, |
| greaterThan(numDocsInserted)); |
| numDocsInserted = currentNumDocs; |
| batchInserted = (sizeProcessed / BATCH_SIZE_BYTES); |
| } else { |
| /* not bundle end */ |
| assertEquals( |
| "we are not at the end of a bundle, we should have inserted no more documents", |
| numDocsInserted, |
| currentNumDocs); |
| } |
| } |
| } |
| } |
| } |
| |
| /** Extracts the name field from the JSON document. */ |
| private static class ExtractValueFn implements Write.FieldValueExtractFn { |
| private final String fieldName; |
| |
| private ExtractValueFn(String fieldName) { |
| this.fieldName = fieldName; |
| } |
| |
| @Override |
| public String apply(JsonNode input) { |
| return input.path(fieldName).asText(); |
| } |
| } |
| |
| /** |
| * Tests that when using the scientist name as the document identifier only as many documents as |
| * scientists are created, since subsequent calls with the same name invoke updates. |
| */ |
| void testWriteWithIdFn() throws Exception { |
| List<String> data = |
| ElasticsearchIOTestUtils.createDocuments( |
| numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); |
| pipeline |
| .apply(Create.of(data)) |
| .apply( |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withIdFn(new ExtractValueFn("scientist"))); |
| pipeline.run(); |
| |
| long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); |
| assertEquals(NUM_SCIENTISTS, currentNumDocs); |
| |
| int count = countByScientistName(connectionConfiguration, restClient, "Einstein"); |
| assertEquals(1, count); |
| } |
| |
| /** |
| * Tests that documents are dynamically routed to different indexes and not the one specified in |
| * the configuration. Documents should be routed to an index named the same as the scientist in |
| * the document. Multiple indexes adds significant work to the ES server and even passing moderate |
| * number of docs can overload the bulk queue and workers. The post explains more |
| * https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster. |
| * Therefore limit to a small number of docs to test routing behavior only. |
| */ |
| void testWriteWithIndexFn() throws Exception { |
| long docsPerScientist = 10; // very conservative |
| long adjustedNumDocs = docsPerScientist * FAMOUS_SCIENTISTS.length; |
| |
| List<String> data = |
| ElasticsearchIOTestUtils.createDocuments( |
| adjustedNumDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); |
| pipeline |
| .apply(Create.of(data)) |
| .apply( |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withIndexFn(new ExtractValueFn("scientist"))); |
| pipeline.run(); |
| |
| // verify counts on each index |
| for (String scientist : FAMOUS_SCIENTISTS) { |
| String index = scientist.toLowerCase(); |
| long count = |
| refreshIndexAndGetCurrentNumDocs(restClient, index, connectionConfiguration.getType()); |
| assertEquals(scientist + " index holds incorrect count", docsPerScientist, count); |
| } |
| } |
| |
| /** Returns TYPE_0 or TYPE_1 based on the modulo 2 of the hash of the named field. */ |
| static class Modulo2ValueFn implements Write.FieldValueExtractFn { |
| private final String fieldName; |
| |
| Modulo2ValueFn(String fieldName) { |
| this.fieldName = fieldName; |
| } |
| |
| @Override |
| public String apply(JsonNode input) { |
| return "TYPE_" + input.path(fieldName).asText().hashCode() % 2; |
| } |
| } |
| |
| /** |
| * Tests that documents are dynamically routed to different types and not the type that is given |
| * in the configuration. Documents should be routed to the a type of type_0 or type_1 using a |
| * modulo approach of the explicit id. |
| * |
| * <p>This test does not work with ES 6 because ES 6 does not allow one mapping has more than 1 |
| * type |
| */ |
| void testWriteWithTypeFn2x5x() throws Exception { |
| // defensive coding: this test requires an even number of docs |
| long adjustedNumDocs = (numDocs & 1) == 0 ? numDocs : numDocs + 1; |
| |
| List<String> data = |
| ElasticsearchIOTestUtils.createDocuments( |
| adjustedNumDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); |
| pipeline |
| .apply(Create.of(data)) |
| .apply( |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withTypeFn(new Modulo2ValueFn("id"))); |
| pipeline.run(); |
| |
| for (int i = 0; i < 2; i++) { |
| String type = "TYPE_" + i; |
| long count = |
| refreshIndexAndGetCurrentNumDocs(restClient, connectionConfiguration.getIndex(), type); |
| assertEquals(type + " holds incorrect count", adjustedNumDocs / 2, count); |
| } |
| } |
| |
| /** |
| * Tests that documents are correctly routed when index, type and document ID functions are |
| * provided to overwrite the defaults of using the configuration and auto-generation of the |
| * document IDs by Elasticsearch. The scientist name is used for the index, type and document ID. |
| * As a result there should be only a single document in each index/type. |
| */ |
| void testWriteWithFullAddressing() throws Exception { |
| List<String> data = |
| ElasticsearchIOTestUtils.createDocuments( |
| numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); |
| pipeline |
| .apply(Create.of(data)) |
| .apply( |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withIdFn(new ExtractValueFn("id")) |
| .withIndexFn(new ExtractValueFn("scientist")) |
| .withTypeFn(new Modulo2ValueFn("scientist"))); |
| pipeline.run(); |
| |
| for (String scientist : FAMOUS_SCIENTISTS) { |
| String index = scientist.toLowerCase(); |
| for (int i = 0; i < 2; i++) { |
| String type = "TYPE_" + scientist.hashCode() % 2; |
| long count = refreshIndexAndGetCurrentNumDocs(restClient, index, type); |
| assertEquals("Incorrect count for " + index + "/" + type, numDocs / NUM_SCIENTISTS, count); |
| } |
| } |
| } |
| |
| /** |
| * Tests partial updates by adding a group field to each document in the standard test set. The |
| * group field is populated as the modulo 2 of the document id allowing for a test to ensure the |
| * documents are split into 2 groups. |
| */ |
| void testWritePartialUpdate() throws Exception { |
| if (!useAsITests) { |
| ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); |
| } |
| |
| // defensive coding to ensure our initial state is as expected |
| |
| long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); |
| assertEquals(numDocs, currentNumDocs); |
| |
| // partial documents containing the ID and group only |
| List<String> data = new ArrayList<>(); |
| for (int i = 0; i < numDocs; i++) { |
| data.add(String.format("{\"id\" : %s, \"group\" : %s}", i, i % 2)); |
| } |
| |
| pipeline |
| .apply(Create.of(data)) |
| .apply( |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withIdFn(new ExtractValueFn("id")) |
| .withUsePartialUpdate(true)); |
| pipeline.run(); |
| |
| currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); |
| |
| // check we have not unwittingly modified existing behaviour |
| assertEquals(numDocs, currentNumDocs); |
| assertEquals( |
| numDocs / NUM_SCIENTISTS, |
| countByScientistName(connectionConfiguration, restClient, "Einstein")); |
| |
| // Partial update assertions |
| assertEquals(numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "0")); |
| assertEquals(numDocs / 2, countByMatch(connectionConfiguration, restClient, "group", "1")); |
| } |
| |
| /** Tests partial updates with errors by adding some invalid info to test set. */ |
| void testWritePartialUpdateWithErrors() throws Exception { |
| // put a mapping to simulate error of insertion |
| ElasticsearchIOTestUtils.setIndexMapping(connectionConfiguration, restClient); |
| |
| if (!useAsITests) { |
| ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); |
| } |
| |
| // try to partial update a document with an incompatible date format for the age to generate |
| // an update error |
| List<String> data = new ArrayList<>(); |
| data.add("{\"id\" : 1, \"age\" : \"2018-08-10:00:00\"}"); |
| |
| try { |
| pipeline |
| .apply(Create.of(data)) |
| .apply( |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withIdFn(new ExtractValueFn("id")) |
| .withUsePartialUpdate(true)); |
| pipeline.run(); |
| } catch (Exception e) { |
| boolean matches = |
| e.getLocalizedMessage() |
| .matches( |
| "(?is).*Error writing to Elasticsearch, some elements could not be inserted:" |
| + ".*Document id .+: failed to parse .*Caused by: .*" |
| + ".*For input string: \"2018-08-10:00:00\".*"); |
| |
| assertTrue(matches); |
| } |
| } |
| |
| /** |
| * Function for checking if any string in iterable contains expected substring. Fails if no match |
| * is found. |
| */ |
| private static class ContainsStringCheckerFn |
| implements SerializableFunction<Iterable<String>, Void> { |
| |
| private String expectedSubString; |
| |
| ContainsStringCheckerFn(String expectedSubString) { |
| this.expectedSubString = expectedSubString; |
| } |
| |
| @Override |
| public Void apply(Iterable<String> input) { |
| for (String s : input) { |
| if (s.contains(expectedSubString)) { |
| return null; |
| } |
| } |
| fail("No string found containing " + expectedSubString); |
| return null; |
| } |
| } |
| |
| /** Test that the default predicate correctly parses chosen error code. */ |
| void testDefaultRetryPredicate(RestClient restClient) throws IOException { |
| |
| HttpEntity entity1 = new NStringEntity(BAD_REQUEST, ContentType.APPLICATION_JSON); |
| Response response1 = |
| restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), entity1); |
| assertTrue(CUSTOM_RETRY_PREDICATE.test(response1.getEntity())); |
| |
| HttpEntity entity2 = new NStringEntity(OK_REQUEST, ContentType.APPLICATION_JSON); |
| Response response2 = |
| restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), entity2); |
| assertFalse(DEFAULT_RETRY_PREDICATE.test(response2.getEntity())); |
| } |
| |
| /** |
| * Test that retries are invoked when Elasticsearch returns a specific error code. We invoke this |
| * by issuing corrupt data and retrying on the `400` error code. Normal behaviour is to retry on |
| * `429` only but that is difficult to simulate reliably. The logger is used to verify expected |
| * behavior. |
| */ |
| void testWriteRetry() throws Throwable { |
| expectedException.expectCause(isA(IOException.class)); |
| // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and |
| // retry started. |
| expectedException.expectMessage( |
| String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG, EXPECTED_RETRIES)); |
| |
| ElasticsearchIO.Write write = |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withRetryConfiguration( |
| ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000)) |
| .withRetryPredicate(CUSTOM_RETRY_PREDICATE)); |
| pipeline.apply(Create.of(Arrays.asList(BAD_FORMATTED_DOC))).apply(write); |
| |
| pipeline.run(); |
| } |
| |
| void testWriteRetryValidRequest() throws Exception { |
| Write write = |
| ElasticsearchIO.write() |
| .withConnectionConfiguration(connectionConfiguration) |
| .withRetryConfiguration( |
| ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000)) |
| .withRetryPredicate(CUSTOM_RETRY_PREDICATE)); |
| executeWriteTest(write); |
| } |
| |
| private void executeWriteTest(ElasticsearchIO.Write write) throws Exception { |
| List<String> data = |
| ElasticsearchIOTestUtils.createDocuments( |
| numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); |
| pipeline.apply(Create.of(data)).apply(write); |
| pipeline.run(); |
| |
| long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient); |
| assertEquals(numDocs, currentNumDocs); |
| |
| int count = countByScientistName(connectionConfiguration, restClient, "Einstein"); |
| assertEquals(numDocs / NUM_SCIENTISTS, count); |
| } |
| } |