| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License") you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.elasticsearch.integration |
| |
| import groovy.json.JsonSlurper |
| import org.apache.maven.artifact.versioning.ComparableVersion |
| import org.apache.nifi.elasticsearch.DeleteOperationResponse |
| import org.apache.nifi.elasticsearch.ElasticSearchClientService |
| import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl |
| import org.apache.nifi.elasticsearch.IndexOperationRequest |
| import org.apache.nifi.elasticsearch.IndexOperationResponse |
| import org.apache.nifi.elasticsearch.SearchResponse |
| import org.apache.nifi.elasticsearch.UpdateOperationResponse |
| import org.apache.nifi.security.util.StandardTlsConfiguration |
| import org.apache.nifi.security.util.TemporaryKeyStoreBuilder |
| import org.apache.nifi.security.util.TlsConfiguration |
| import org.apache.nifi.ssl.SSLContextService |
| import org.apache.nifi.util.StringUtils |
| import org.apache.nifi.util.TestRunner |
| import org.apache.nifi.util.TestRunners |
| import org.apache.nifi.web.util.ssl.SslContextUtils |
| import org.junit.After |
| import org.junit.Assert |
| import org.junit.Assume |
| import org.junit.Before |
| import org.junit.BeforeClass |
| import org.junit.Test |
| |
| import javax.net.ssl.SSLContext |
| |
| import static groovy.json.JsonOutput.prettyPrint |
| import static groovy.json.JsonOutput.toJson |
| import static org.hamcrest.CoreMatchers.is |
| import static org.mockito.Mockito.mock |
| import static org.mockito.Mockito.when |
| |
| class ElasticSearchClientService_IT { |
| private TestRunner runner |
| private ElasticSearchClientServiceImpl service |
| |
| static final String INDEX = "messages" |
| static final String TYPE = StringUtils.isBlank(System.getProperty("type_name")) ? null : System.getProperty("type_name") |
| |
| static final ComparableVersion VERSION = new ComparableVersion(System.getProperty("es_version", "0.0.0")) |
| static final ComparableVersion ES_7_10 = new ComparableVersion("7.10") |
| |
| static final String FLAVOUR = System.getProperty("es_flavour") |
| static final String DEFAULT = "default" |
| |
| private static TlsConfiguration generatedTlsConfiguration |
| private static TlsConfiguration truststoreTlsConfiguration |
| |
| static boolean isElasticsearchSetup() { |
| boolean setup = true |
| if (StringUtils.isBlank(System.getProperty("es_version"))) { |
| System.err.println("Cannot run Elasticsearch integration-tests: Elasticsearch version (5, 6, 7) not specified") |
| setup = false |
| } |
| |
| if (StringUtils.isBlank(System.getProperty("es_flavour"))) { |
| System.err.println("Cannot run Elasticsearch integration-tests: Elasticsearch flavour (oss, default) not specified") |
| setup = false |
| } |
| |
| return setup |
| } |
| |
| @BeforeClass |
| static void beforeAll() throws Exception { |
| Assume.assumeTrue("Elasticsearch integration-tests not setup", isElasticsearchSetup()) |
| |
| System.out.println( |
| String.format("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nVERSION: %s%nFLAVOUR %s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n", |
| TYPE, VERSION, FLAVOUR) |
| ) |
| |
| generatedTlsConfiguration = new TemporaryKeyStoreBuilder().build() |
| truststoreTlsConfiguration = new StandardTlsConfiguration( |
| null, |
| null, |
| null, |
| generatedTlsConfiguration.getTruststorePath(), |
| generatedTlsConfiguration.getTruststorePassword(), |
| generatedTlsConfiguration.getTruststoreType() |
| ) |
| } |
| |
| @Before |
| void before() throws Exception { |
| runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class) |
| service = new ElasticSearchClientServiceImpl() |
| runner.addControllerService("Client Service", service) |
| runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400") |
| runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000") |
| runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000") |
| runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000") |
| runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()) |
| |
| try { |
| runner.enableControllerService(service) |
| } catch (Exception ex) { |
| ex.printStackTrace() |
| throw ex |
| } |
| } |
| |
| @After |
| void after() throws Exception { |
| service.onDisabled() |
| } |
| |
| @Test |
| void testBasicSearch() throws Exception { |
| String query = prettyPrint(toJson([ |
| size: 10, |
| query: [ |
| match_all: [:] |
| ], |
| aggs: [ |
| term_counts: [ |
| terms: [ |
| field: "msg", |
| size: 5 |
| ] |
| ] |
| ] |
| ])) |
| |
| |
| SearchResponse response = service.search(query, INDEX, TYPE, null) |
| Assert.assertNotNull("Response was null", response) |
| |
| Assert.assertEquals("Wrong count", 15, response.numberOfHits) |
| Assert.assertFalse("Timed out", response.isTimedOut()) |
| Assert.assertNotNull("Hits was null", response.getHits()) |
| Assert.assertEquals("Wrong number of hits", 10, response.hits.size()) |
| Assert.assertNotNull("Aggregations are missing", response.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size()) |
| Assert.assertNull("Unexpected ScrollId", response.scrollId) |
| Assert.assertNull("Unexpected Search_After", response.searchAfter) |
| Assert.assertNull("Unexpected pitId", response.pitId) |
| |
| Map termCounts = response.aggregations.get("term_counts") as Map |
| Assert.assertNotNull("Term counts was missing", termCounts) |
| def buckets = termCounts.get("buckets") |
| Assert.assertNotNull("Buckets branch was empty", buckets) |
| def expected = [ |
| "one": 1, |
| "two": 2, |
| "three": 3, |
| "four": 4, |
| "five": 5 |
| ] |
| |
| buckets.each { aggRes -> |
| def key = aggRes["key"] |
| def docCount = aggRes["doc_count"] |
| Assert.assertEquals("${key} did not match.", expected[key as String], docCount) |
| } |
| } |
| |
| @Test |
| void testBasicSearchRequestParameters() throws Exception { |
| String query = prettyPrint(toJson([ |
| size: 10, |
| query: [ |
| match_all: [:] |
| ], |
| aggs: [ |
| term_counts: [ |
| terms: [ |
| field: "msg", |
| size: 5 |
| ] |
| ] |
| ] |
| ])) |
| |
| |
| SearchResponse response = service.search(query, "messages", TYPE, [preference: "_local"]) |
| Assert.assertNotNull("Response was null", response) |
| |
| Assert.assertEquals("Wrong count", 15, response.numberOfHits) |
| Assert.assertFalse("Timed out", response.isTimedOut()) |
| Assert.assertNotNull("Hits was null", response.getHits()) |
| Assert.assertEquals("Wrong number of hits", 10, response.hits.size()) |
| Assert.assertNotNull("Aggregations are missing", response.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size()) |
| |
| Map termCounts = response.aggregations.get("term_counts") as Map |
| Assert.assertNotNull("Term counts was missing", termCounts) |
| def buckets = termCounts.get("buckets") |
| Assert.assertNotNull("Buckets branch was empty", buckets) |
| def expected = [ |
| "one": 1, |
| "two": 2, |
| "three": 3, |
| "four": 4, |
| "five": 5 |
| ] |
| |
| buckets.each { aggRes -> |
| String key = aggRes["key"] |
| def docCount = aggRes["doc_count"] |
| Assert.assertEquals("${key} did not match.", expected[key], docCount) |
| } |
| } |
| |
| @Test |
| void testSearchWarnings() { |
| String query |
| String type = TYPE |
| if (VERSION.toString().startsWith("7.")) { |
| // querying with _type in ES 7.x is deprecated |
| query = prettyPrint(toJson([size: 1, query: [match_all: [:]]])) |
| type = "a-type" |
| } else if (VERSION.toString().startsWith("6.")) { |
| // "query_string" query option "all_fields" in ES 6.x is deprecated |
| query = prettyPrint(toJson([size: 1, query: [query_string: [query: 1, all_fields: true]]])) |
| } else { |
| // "mlt" query in ES 5.x is deprecated |
| query = prettyPrint(toJson([size: 1, query: [mlt: [fields: ["msg"], like: 1]]])) |
| } |
| final SearchResponse response = service.search(query, INDEX, type, null) |
| Assert.assertTrue("Missing warnings", !response.warnings.isEmpty()) |
| } |
| |
| @Test |
| void testScroll() { |
| final String query = prettyPrint(toJson([ |
| size: 2, |
| query: [ match_all: [:] ], |
| aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ] |
| ])) |
| |
| // initiate the scroll |
| final SearchResponse response = service.search(query, INDEX, TYPE, Collections.singletonMap("scroll", "10s")) |
| Assert.assertNotNull("Response was null", response) |
| |
| Assert.assertEquals("Wrong count", 15, response.numberOfHits) |
| Assert.assertFalse("Timed out", response.isTimedOut()) |
| Assert.assertNotNull("Hits was null", response.getHits()) |
| Assert.assertEquals("Wrong number of hits", 2, response.hits.size()) |
| Assert.assertNotNull("Aggregations are missing", response.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size()) |
| Assert.assertNotNull("ScrollId missing", response.scrollId) |
| Assert.assertNull("Unexpected Search_After", response.searchAfter) |
| Assert.assertNull("Unexpected pitId", response.pitId) |
| |
| final Map termCounts = response.aggregations.get("term_counts") as Map |
| Assert.assertNotNull("Term counts was missing", termCounts) |
| Assert.assertEquals("Buckets count is wrong", 5, (termCounts.get("buckets") as List).size()) |
| |
| // scroll the next page |
| final SearchResponse scrollResponse = service.scroll(prettyPrint((toJson([scroll_id: response.scrollId, scroll: "10s"])))) |
| Assert.assertNotNull("Scroll Response was null", scrollResponse) |
| |
| Assert.assertEquals("Wrong count", 15, scrollResponse.numberOfHits) |
| Assert.assertFalse("Timed out", scrollResponse.isTimedOut()) |
| Assert.assertNotNull("Hits was null", scrollResponse.getHits()) |
| Assert.assertEquals("Wrong number of hits", 2, scrollResponse.hits.size()) |
| Assert.assertNotNull("Aggregations missing", scrollResponse.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 0, scrollResponse.aggregations.size()) |
| Assert.assertNotNull("ScrollId missing", scrollResponse.scrollId) |
| Assert.assertNull("Unexpected Search_After", scrollResponse.searchAfter) |
| Assert.assertNull("Unexpected pitId", scrollResponse.pitId) |
| |
| Assert.assertNotEquals("Same results", scrollResponse.hits, response.hits) |
| |
| // delete the scroll |
| DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.scrollId) |
| Assert.assertNotNull("Delete Response was null", deleteResponse) |
| Assert.assertTrue(deleteResponse.took > 0) |
| |
| // delete scroll again (should now be unknown but the 404 caught and ignored) |
| deleteResponse = service.deleteScroll(scrollResponse.scrollId) |
| Assert.assertNotNull("Delete Response was null", deleteResponse) |
| Assert.assertEquals(0L, deleteResponse.took) |
| } |
| |
| @Test |
| void testSearchAfter() { |
| final Map<String, Object> queryMap = [ |
| size: 2, |
| query: [ match_all: [:] ], |
| aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ], |
| sort: [[ msg: "desc" ]] |
| ] |
| final String query = prettyPrint(toJson(queryMap)) |
| |
| // search first page |
| final SearchResponse response = service.search(query, INDEX, TYPE, null) |
| Assert.assertNotNull("Response was null", response) |
| |
| Assert.assertEquals("Wrong count", 15, response.numberOfHits) |
| Assert.assertFalse("Timed out", response.isTimedOut()) |
| Assert.assertNotNull("Hits was null", response.getHits()) |
| Assert.assertEquals("Wrong number of hits", 2, response.hits.size()) |
| Assert.assertNotNull("Aggregations missing", response.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size()) |
| Assert.assertNull("Unexpected ScrollId", response.scrollId) |
| Assert.assertNotNull("Search_After missing", response.searchAfter) |
| Assert.assertNull("Unexpected pitId", response.pitId) |
| |
| final Map termCounts = response.aggregations.get("term_counts") as Map |
| Assert.assertNotNull("Term counts was missing", termCounts) |
| Assert.assertEquals("Buckets count is wrong", 5, (termCounts.get("buckets") as List).size()) |
| |
| // search the next page |
| queryMap.search_after = new JsonSlurper().parseText(response.searchAfter) as Serializable |
| queryMap.remove("aggs") |
| final String secondPage = prettyPrint(toJson(queryMap)) |
| final SearchResponse secondResponse = service.search(secondPage, INDEX, TYPE, null) |
| Assert.assertNotNull("Second Response was null", secondResponse) |
| |
| Assert.assertEquals("Wrong count", 15, secondResponse.numberOfHits) |
| Assert.assertFalse("Timed out", secondResponse.isTimedOut()) |
| Assert.assertNotNull("Hits was null", secondResponse.getHits()) |
| Assert.assertEquals("Wrong number of hits", 2, secondResponse.hits.size()) |
| Assert.assertNotNull("Aggregations missing", secondResponse.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 0, secondResponse.aggregations.size()) |
| Assert.assertNull("Unexpected ScrollId", secondResponse.scrollId) |
| Assert.assertNotNull("Unexpected Search_After", secondResponse.searchAfter) |
| Assert.assertNull("Unexpected pitId", secondResponse.pitId) |
| |
| Assert.assertNotEquals("Same results", secondResponse.hits, response.hits) |
| } |
| |
| @Test |
| void testPointInTime() { |
| // Point in Time only available in 7.10+ with XPack enabled |
| Assume.assumeTrue("Requires version 7.10+", VERSION >= ES_7_10) |
| Assume.assumeThat("Requires XPack features", FLAVOUR, is(DEFAULT)) |
| |
| // initialise |
| final String pitId = service.initialisePointInTime(INDEX, "10s") |
| |
| final Map<String, Object> queryMap = [ |
| size: 2, |
| query: [ match_all: [:] ], |
| aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ], |
| sort: [[ msg: "desc" ]], |
| pit: [ id: pitId, keep_alive: "10s" ] |
| ] |
| final String query = prettyPrint(toJson(queryMap)) |
| |
| // search first page |
| final SearchResponse response = service.search(query, null, TYPE, null) |
| Assert.assertNotNull("Response was null", response) |
| |
| Assert.assertEquals("Wrong count", 15, response.numberOfHits) |
| Assert.assertFalse("Timed out", response.isTimedOut()) |
| Assert.assertNotNull("Hits was null", response.getHits()) |
| Assert.assertEquals("Wrong number of hits", 2, response.hits.size()) |
| Assert.assertNotNull("Aggregations missing", response.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size()) |
| Assert.assertNull("Unexpected ScrollId", response.scrollId) |
| Assert.assertNotNull("Unexpected Search_After", response.searchAfter) |
| Assert.assertNotNull("pitId missing", response.pitId) |
| |
| final Map termCounts = response.aggregations.get("term_counts") as Map |
| Assert.assertNotNull("Term counts was missing", termCounts) |
| Assert.assertEquals("Buckets count is wrong", 5, (termCounts.get("buckets") as List).size()) |
| |
| // search the next page |
| queryMap.search_after = new JsonSlurper().parseText(response.searchAfter) as Serializable |
| queryMap.remove("aggs") |
| final String secondPage = prettyPrint(toJson(queryMap)) |
| final SearchResponse secondResponse = service.search(secondPage, null, TYPE, null) |
| Assert.assertNotNull("Second Response was null", secondResponse) |
| |
| Assert.assertEquals("Wrong count", 15, secondResponse.numberOfHits) |
| Assert.assertFalse("Timed out", secondResponse.isTimedOut()) |
| Assert.assertNotNull("Hits was null", secondResponse.getHits()) |
| Assert.assertEquals("Wrong number of hits", 2, secondResponse.hits.size()) |
| Assert.assertNotNull("Aggregations missing", secondResponse.aggregations) |
| Assert.assertEquals("Aggregation count is wrong", 0, secondResponse.aggregations.size()) |
| Assert.assertNull("Unexpected ScrollId", secondResponse.scrollId) |
| Assert.assertNotNull("Unexpected Search_After", secondResponse.searchAfter) |
| Assert.assertNotNull("pitId missing", secondResponse.pitId) |
| |
| Assert.assertNotEquals("Same results", secondResponse.hits, response.hits) |
| |
| // delete pitId |
| DeleteOperationResponse deleteResponse = service.deletePointInTime(pitId) |
| Assert.assertNotNull("Delete Response was null", deleteResponse) |
| Assert.assertTrue(deleteResponse.took > 0) |
| |
| // delete pitId again (should now be unknown but the 404 caught and ignored) |
| deleteResponse = service.deletePointInTime(pitId) |
| Assert.assertNotNull("Delete Response was null", deleteResponse) |
| Assert.assertEquals(0L, deleteResponse.took) |
| } |
| |
| @Test |
| void testDeleteByQuery() throws Exception { |
| String query = prettyPrint(toJson([ |
| query: [ |
| match: [ |
| msg: "five" |
| ] |
| ] |
| ])) |
| DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, null) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| } |
| |
| @Test |
| void testDeleteByQueryRequestParameters() throws Exception { |
| String query = prettyPrint(toJson([ |
| query: [ |
| match: [ |
| msg: "six" |
| ] |
| ] |
| ])) |
| DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, [refresh: "true"]) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| } |
| |
| @Test |
| void testUpdateByQuery() throws Exception { |
| String query = prettyPrint(toJson([ |
| query: [ |
| match: [ |
| msg: "four" |
| ] |
| ] |
| ])) |
| UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, null) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| } |
| |
| @Test |
| void testUpdateByQueryRequestParameters() throws Exception { |
| String query = prettyPrint(toJson([ |
| query: [ |
| match: [ |
| msg: "four" |
| ] |
| ] |
| ])) |
| UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, [refresh: "true", slices: "1"]) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| } |
| |
| @Test |
| void testDeleteById() throws Exception { |
| final String ID = "1" |
| final def originalDoc = service.get(INDEX, TYPE, ID, null) |
| DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, null) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| def doc = service.get(INDEX, TYPE, ID, null) |
| Assert.assertNull(doc) |
| doc = service.get(INDEX, TYPE, "2", null) |
| Assert.assertNotNull(doc) |
| |
| // replace the deleted doc |
| service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index), null) |
| waitForIndexRefresh() // (affects later tests using _search or _bulk) |
| } |
| |
| @Test |
| void testGet() throws IOException { |
| Map old |
| 1.upto(15) { index -> |
| String id = String.valueOf(index) |
| def doc = service.get(INDEX, TYPE, id, null) |
| Assert.assertNotNull("Doc was null", doc) |
| Assert.assertNotNull("${doc.toString()}\t${doc.keySet().toString()}", doc.get("msg")) |
| old = doc |
| } |
| } |
| |
| @Test |
| void testSSL() { |
| final String serviceIdentifier = SSLContextService.class.getName() |
| final SSLContextService sslContext = mock(SSLContextService.class) |
| when(sslContext.getIdentifier()).thenReturn(serviceIdentifier) |
| |
| final SSLContext clientSslContext = SslContextUtils.createSslContext(truststoreTlsConfiguration) |
| when(sslContext.createContext()).thenReturn(clientSslContext) |
| when(sslContext.createTlsConfiguration()).thenReturn(truststoreTlsConfiguration) |
| |
| runner.addControllerService(serviceIdentifier, sslContext) |
| runner.enableControllerService(sslContext) |
| |
| runner.disableControllerService(service) |
| runner.setProperty(service, ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier) |
| runner.enableControllerService(service) |
| |
| runner.assertValid(service) |
| } |
| |
| @Test |
| void testNullSuppression() { |
| Map<String, Object> doc = new HashMap<String, Object>(){{ |
| put("msg", "test") |
| put("is_null", null) |
| put("is_empty", "") |
| put("is_blank", " ") |
| put("empty_nested", Collections.emptyMap()) |
| put("empty_array", Collections.emptyList()) |
| }} |
| |
| // index with nulls |
| suppressNulls(false) |
| IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)], null) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| waitForIndexRefresh() |
| |
| Map<String, Object> result = service.get("nulls", TYPE, "1", null) |
| Assert.assertEquals(doc, result) |
| |
| // suppress nulls |
| suppressNulls(true) |
| response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)], null) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| waitForIndexRefresh() |
| |
| result = service.get("nulls", TYPE, "2", null) |
| Assert.assertTrue("Non-nulls (present): " + result.toString(), result.keySet().containsAll(["msg", "is_blank"])) |
| Assert.assertFalse("is_null (should be omitted): " + result.toString(), result.keySet().contains("is_null")) |
| Assert.assertFalse("is_empty (should be omitted): " + result.toString(), result.keySet().contains("is_empty")) |
| Assert.assertFalse("empty_nested (should be omitted): " + result.toString(), result.keySet().contains("empty_nested")) |
| Assert.assertFalse("empty_array (should be omitted): " + result.toString(), result.keySet().contains("empty_array")) |
| } |
| |
| private void suppressNulls(final boolean suppressNulls) { |
| runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service") |
| runner.disableControllerService(service) |
| runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls ? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue() : ElasticSearchClientService.NEVER_SUPPRESS.getValue()) |
| runner.enableControllerService(service) |
| runner.assertValid() |
| } |
| |
| @Test |
| void testBulkAddTwoIndexes() throws Exception { |
| List<IndexOperationRequest> payload = new ArrayList<>() |
| for (int x = 0; x < 20; x++) { |
| String index = x % 2 == 0 ? "bulk_a": "bulk_b" |
| payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap<String, Object>(){{ |
| put("msg", "test") |
| }}, IndexOperationRequest.Operation.Index)) |
| } |
| for (int x = 0; x < 5; x++) { |
| payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap<String, Object>(){{ |
| put("msg", "test") |
| }}, IndexOperationRequest.Operation.Index)) |
| } |
| IndexOperationResponse response = service.bulk(payload, [refresh: "true"]) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| waitForIndexRefresh() |
| |
| /* |
| * Now, check to ensure that both indexes got populated appropriately. |
| */ |
| String query = "{ \"query\": { \"match_all\": {}}}" |
| Long indexA = service.count(query, "bulk_a", TYPE, null) |
| Long indexB = service.count(query, "bulk_b", TYPE, null) |
| Long indexC = service.count(query, "bulk_c", TYPE, null) |
| |
| Assert.assertNotNull(indexA) |
| Assert.assertNotNull(indexB) |
| Assert.assertNotNull(indexC) |
| Assert.assertEquals(indexA, indexB) |
| Assert.assertEquals(10, indexA.intValue()) |
| Assert.assertEquals(10, indexB.intValue()) |
| Assert.assertEquals(5, indexC.intValue()) |
| |
| Long total = service.count(query, "bulk_*", TYPE, null) |
| Assert.assertNotNull(total) |
| Assert.assertEquals(25, total.intValue()) |
| } |
| |
| @Test |
| void testBulkRequestParameters() throws Exception { |
| List<IndexOperationRequest> payload = new ArrayList<>() |
| for (int x = 0; x < 20; x++) { |
| String index = x % 2 == 0 ? "bulk_a": "bulk_b" |
| payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap<String, Object>(){{ |
| put("msg", "test") |
| }}, IndexOperationRequest.Operation.Index)) |
| } |
| for (int x = 0; x < 5; x++) { |
| payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap<String, Object>(){{ |
| put("msg", "test") |
| }}, IndexOperationRequest.Operation.Index)) |
| } |
| IndexOperationResponse response = service.bulk(payload, [refresh: "true"]) |
| Assert.assertNotNull(response) |
| Assert.assertTrue(response.getTook() > 0) |
| |
| /* |
| * Now, check to ensure that both indexes got populated and refreshed appropriately. |
| */ |
| String query = "{ \"query\": { \"match_all\": {}}}" |
| Long indexA = service.count(query, "bulk_a", TYPE, null) |
| Long indexB = service.count(query, "bulk_b", TYPE, null) |
| Long indexC = service.count(query, "bulk_c", TYPE, null) |
| |
| Assert.assertNotNull(indexA) |
| Assert.assertNotNull(indexB) |
| Assert.assertNotNull(indexC) |
| Assert.assertEquals(indexA, indexB) |
| Assert.assertEquals(10, indexA.intValue()) |
| Assert.assertEquals(10, indexB.intValue()) |
| Assert.assertEquals(5, indexC.intValue()) |
| |
| Long total = service.count(query, "bulk_*", TYPE, null) |
| Assert.assertNotNull(total) |
| Assert.assertEquals(25, total.intValue()) |
| } |
| |
| @Test |
| void testUpdateAndUpsert() { |
| final String TEST_ID = "update-test" |
| Map<String, Object> doc = new HashMap<>() |
| doc.put("msg", "Buongiorno, mondo") |
| service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index), [refresh: "true"]) |
| Map<String, Object> result = service.get(INDEX, TYPE, TEST_ID, null) |
| Assert.assertEquals("Not the same", doc, result) |
| |
| Map<String, Object> updates = new HashMap<>() |
| updates.put("from", "john.smith") |
| Map<String, Object> merged = new HashMap<>() |
| merged.putAll(updates) |
| merged.putAll(doc) |
| IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update) |
| service.add(request, [refresh: "true"]) |
| result = service.get(INDEX, TYPE, TEST_ID, null) |
| Assert.assertTrue(result.containsKey("from")) |
| Assert.assertTrue(result.containsKey("msg")) |
| Assert.assertEquals("Not the same after update.", merged, result) |
| |
| final String UPSERTED_ID = "upsert-ftw" |
| Map<String, Object> upsertItems = new HashMap<>() |
| upsertItems.put("upsert_1", "hello") |
| upsertItems.put("upsert_2", 1) |
| upsertItems.put("upsert_3", true) |
| request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert) |
| service.add(request, [refresh: "true"]) |
| result = service.get(INDEX, TYPE, UPSERTED_ID, null) |
| Assert.assertEquals(upsertItems, result) |
| |
| List<IndexOperationRequest> deletes = new ArrayList<>() |
| deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete)) |
| deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete)) |
| Assert.assertFalse(service.bulk(deletes, [refresh: "true"]).hasErrors()) |
| waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk) |
| Assert.assertNull(service.get(INDEX, TYPE, TEST_ID, null)) |
| Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID, null)) |
| } |
| |
| @Test |
| void testGetBulkResponsesWithErrors() { |
| def ops = [ |
| new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: 1], IndexOperationRequest.Operation.Index), // OK |
| new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "two", intField: 1], IndexOperationRequest.Operation.Create), // already exists |
| new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: "notaninteger"], IndexOperationRequest.Operation.Index) // can't parse int field |
| ] |
| def response = service.bulk(ops, [refresh: "true"]) |
| assert response.hasErrors() |
| assert response.items.findAll { |
| def key = it.keySet().stream().findFirst().get() |
| it[key].containsKey("error") |
| }.size() == 2 |
| } |
| |
| private static void waitForIndexRefresh() { |
| Thread.sleep(1000) |
| } |
| } |