blob: 34ed699cd39b0da1b4bb1da0ccdd28aea445e03c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.AdapterActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestPutElasticsearch5 {
private InputStream docExample;
private TestRunner runner;
@Before
public void setUp() throws IOException {
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
docExample = classloader.getResourceAsStream("DocumentExample.json");
}
@After
public void teardown() {
runner = null;
}
@Test
public void testPutElasticSearchOnTrigger() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.assertValid();
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "${hosts}");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.assertValid();
runner.setVariable("cluster.name", "elasticsearch");
runner.setVariable("hosts", "127.0.0.1:9300");
runner.setVariable("ping.timeout", "5s");
runner.setVariable("sampler.interval", "5s");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerBadDocIdentifier() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id2");
runner.assertValid();
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
}
@Test
public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(true)); // simulate failures
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticsearch5OnTriggerWithExceptions() throws IOException {
PutElasticsearch5TestProcessor processor = new PutElasticsearch5TestProcessor(false);
runner = TestRunners.newTestRunner(processor);
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
// No Node Available exception
processor.setExceptionToThrow(new NoNodeAvailableException("test"));
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
runner.clearTransferState();
// Elasticsearch5 Timeout exception
processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652141");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
runner.clearTransferState();
// Receive Timeout Transport exception
processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652142");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
runner.clearTransferState();
// Node Closed exception
processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652143");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
runner.clearTransferState();
// Elasticsearch5 Parse exception
processor.setExceptionToThrow(new ElasticsearchParseException("test"));
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652144");
}});
runner.run(1, true, true);
// This test generates an exception on execute(),routes to failure
runner.assertTransferCount(PutElasticsearch5.REL_FAILURE, 1);
}
@Test
public void testPutElasticsearch5OnTriggerWithNoIdAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(true)); // simulate failures
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
}
@Test
public void testPutElasticsearch5OnTriggerWithIndexFromAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false));
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "${i}");
runner.setProperty(PutElasticsearch5.TYPE, "${type}");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("i", "doc");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_SUCCESS).get(0);
assertNotNull(out);
runner.clearTransferState();
// Now try an empty attribute value, should fail
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_RETRY, 1);
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_RETRY).get(0);
assertNotNull(out2);
}
@Test
public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.assertValid();
runner.setProperty(PutElasticsearch5.INDEX_OP, "index_fail");
runner.assertValid();
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class PutElasticsearch5TestProcessor extends PutElasticsearch5 {
boolean responseHasFailures = false;
Exception exceptionToThrow = null;
public PutElasticsearch5TestProcessor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures;
}
public void setExceptionToThrow(Exception exceptionToThrow) {
this.exceptionToThrow = exceptionToThrow;
}
@Override
protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
String username, String password,
List<InetSocketAddress> esHosts, ComponentLog log)
throws MalformedURLException {
final Client mockClient = mock(Client.class);
BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE));
if (exceptionToThrow != null) {
doThrow(exceptionToThrow).when(bulkRequestBuilder).execute();
} else {
doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures, esHosts.get(0))).when(bulkRequestBuilder).execute();
}
when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder);
when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenAnswer(new Answer<IndexRequestBuilder>() {
@Override
public IndexRequestBuilder answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments();
String arg1 = (String) args[0];
if (arg1.isEmpty()) {
throw new NoNodeAvailableException("Needs index");
}
String arg2 = (String) args[1];
if (arg2.isEmpty()) {
throw new NoNodeAvailableException("Needs doc type");
} else {
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
return indexRequestBuilder;
}
}
});
return mockClient;
}
private static class MockBulkRequestBuilderExecutor
extends AdapterActionFuture<BulkResponse, ActionListener<BulkResponse>>
implements ListenableActionFuture<BulkResponse> {
boolean responseHasFailures = false;
InetSocketAddress address = null;
public MockBulkRequestBuilderExecutor(boolean responseHasFailures, InetSocketAddress address) {
this.responseHasFailures = responseHasFailures;
this.address = address;
}
@Override
protected BulkResponse convert(ActionListener<BulkResponse> bulkResponseActionListener) {
return null;
}
@Override
public void addListener(ActionListener<BulkResponse> actionListener) {
}
@Override
public BulkResponse get() throws InterruptedException, ExecutionException {
BulkResponse response = mock(BulkResponse.class);
when(response.hasFailures()).thenReturn(responseHasFailures);
BulkItemResponse item = mock(BulkItemResponse.class);
when(item.getItemId()).thenReturn(1);
when(item.isFailed()).thenReturn(true);
when(response.getItems()).thenReturn(new BulkItemResponse[]{item});
TransportAddress remoteAddress = mock(TransportAddress.class);
when(remoteAddress.getAddress()).thenReturn(address.toString());
when(response.remoteAddress()).thenReturn(remoteAddress);
return response;
}
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Integration test section below
//
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
// desired test.
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Tests basic ES functionality against a local or test ES cluster
*/
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBasic() {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch5());
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.assertValid();
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(docExample);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBatch() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch5());
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch5.INDEX, "doc");
runner.setProperty(PutElasticsearch5.BATCH_SIZE, "100");
runner.setProperty(PutElasticsearch5.TYPE, "status");
runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
runner.assertValid();
String message = convertStreamToString(docExample);
for (int i = 0; i < 100; i++) {
long newId = 28039652140L + i;
final String newStrId = Long.toString(newId);
runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
put("doc_id", newStrId);
}});
}
runner.run();
runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 100);
}
/**
* Convert an input stream to a stream
*
* @param is input the input stream
* @return return the converted input stream as a string
*/
static String convertStreamToString(java.io.InputStream is) {
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
return s.hasNext() ? s.next() : "";
}
}