blob: 33be374750d56aee0d83a82d846f3e4b41a23800 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.junit.Assert
import org.junit.Test
abstract class AbstractByQueryElasticsearchTest {
private static final String INDEX = "test_idx"
private static final String TYPE = "test_type"
private static final String CLIENT_NAME = "clientService"
private TestElasticsearchClientService client
abstract String queryAttr()
abstract String tookAttr()
abstract String errorAttr()
abstract TestRunner setupRunner()
abstract void expectError(final TestElasticsearchClientService client)
private void initClient(TestRunner runner) throws Exception {
client = new TestElasticsearchClientService(true)
runner.addControllerService(CLIENT_NAME, client)
runner.enableControllerService(client)
runner.setProperty(AbstractByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
}
private void postTest(TestRunner runner, String queryParam) {
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 0)
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 1)
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_SUCCESS)
final String attr = flowFiles.get(0).getAttribute(tookAttr())
final String query = flowFiles.get(0).getAttribute(queryAttr())
Assert.assertNotNull(attr)
Assert.assertEquals(attr, "100")
Assert.assertNotNull(query)
Assert.assertEquals(queryParam, query)
}
@Test
void testWithFlowfileInput() throws Exception {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.assertValid()
runner.enqueue(query)
runner.run()
postTest(runner, query)
Assert.assertTrue(client.getRequestParameters().isEmpty())
}
@Test
void testWithFlowfileInputAndRequestParameters() throws Exception {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
initClient(runner)
runner.assertValid()
runner.enqueue(query, [slices: "auto"])
runner.run()
postTest(runner, query)
Assert.assertEquals(2, client.getRequestParameters().size())
Assert.assertEquals("true", client.getRequestParameters().get("refresh"))
Assert.assertEquals("auto", client.getRequestParameters().get("slices"))
}
@Test
void testWithQuery() throws Exception {
final String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"\${field.name}.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
final Map<String, String> attrs = new HashMap<String, String>(){{
put("field.name", "test_field")
}}
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.assertValid()
runner.enqueue("", attrs)
runner.run()
postTest(runner, query.replace('${field.name}', "test_field"))
runner.clearTransferState()
final String query2 = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"test_field.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query2)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query2)
}
@Test
void testErrorAttribute() throws Exception {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
expectError(client)
runner.assertValid()
runner.enqueue("")
runner.run()
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 0)
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 1)
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_FAILURE).get(0)
final String attr = mockFlowFile.getAttribute(errorAttr())
Assert.assertNotNull(attr)
}
@Test
void testInputHandling() {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.assertValid()
runner.run()
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 0)
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 0)
}
@Test
void testNoInputHandling() {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query)
Assert.assertTrue(client.getRequestParameters().isEmpty())
}
@Test
void testNoInputHandlingWithRequestParameters() {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
runner.setVariable("slices", "auto")
initClient(runner)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query)
Assert.assertEquals(2, client.getRequestParameters().size())
Assert.assertEquals("true", client.getRequestParameters().get("refresh"))
Assert.assertEquals("auto", client.getRequestParameters().get("slices"))
}
}