blob: 659de9a71cb61e82027c3db4ff301d9e0e6da7c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.Buffer;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.ConnectException;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPutElasticsearchHttpRecord {
private static final int DATE_YEAR = 2018;
private static final int DATE_MONTH = 12;
private static final int DATE_DAY = 20;
private static final int TIME_HOUR = 12;
private static final int TIME_MINUTE = 55;
private static final String ISO_DATE = String.format("%d-%d-%d", DATE_YEAR, DATE_MONTH, DATE_DAY);
private static final String EXPECTED_DATE = String.format("%d/%d/%d", DATE_DAY, DATE_MONTH, DATE_YEAR);
private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE);
private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE);
private TestRunner runner;
@After
public void teardown() {
runner = null;
}
@Test
public void testPutElasticSearchOnTriggerIndex() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("h:m a");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/yyyy h:m a");
processor.setRecordChecks(record -> {
assertEquals(1, record.get("id"));
assertEquals("reç1", record.get("name"));
assertEquals(101, record.get("code"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(2, record.get("id"));
assertEquals("reç2", record.get("name"));
assertEquals(102, record.get("code"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(3, record.get("id"));
assertEquals("reç3", record.get("name"));
assertEquals(103, record.get("code"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(4, record.get("id"));
assertEquals("reç4", record.get("name"));
assertEquals(104, record.get("code"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
});
runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
out.assertAttributeEquals("record.count", "4");
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents);
assertEquals(1, provEvents.size());
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
public void testPutElasticSearchOnTriggerCreate() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "create");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
out.assertAttributeEquals("record.count", "4");
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents);
assertEquals(1, provEvents.size());
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("h:m a");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/yyyy h:m a");
processor.setRecordChecks(record -> {
assertEquals(1, record.get("id"));
assertEquals("reç1", record.get("name"));
assertEquals(101, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(2, record.get("id"));
assertEquals("reç2", record.get("name"));
assertEquals(102, record.get("code"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(3, record.get("id"));
assertEquals("reç3", record.get("name"));
assertEquals(103, record.get("code"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(4, record.get("id"));
assertEquals("reç4", record.get("name"));
assertEquals(104, record.get("code"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
});
runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
out.assertAttributeEquals("record.count", "4");
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents);
assertEquals(1, provEvents.size());
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
public void testPutElasticSearchOnTriggerUpdate() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerUpdate_withoutType() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerDelete() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerDelete_withoutType() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
runner.setVariable("es.url", "http://127.0.0.1:9200");
runner.setVariable("connect.timeout", "5s");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "${no.attr}");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchInvalidConfig() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "");
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "create");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "upsert");
runner.assertNotValid();
}
@Test
public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true);
processor.setStatus(100, "Should fail");
runner = TestRunners.newTestRunner(processor); // simulate failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.clearTransferState();
processor.setStatus(500, "Should retry");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_RETRY, 1);
}
@Test
public void testPutElasticSearchOnTriggerWithConnectException() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true);
processor.setStatus(-1, "Connection Exception");
runner = TestRunners.newTestRunner(processor); // simulate failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
}
@Test
public void testPutElasticsearchOnTriggerWithNoIdPath() throws Exception {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false));
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/none"); // Field does not exist
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
}
@Test
public void testPutElasticsearchOnTriggerWithNoIdField() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
flowFile.assertAttributeEquals("failure.count", "1");
}
@Test
public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false));
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "${type}");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("i", "doc");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
runner.clearTransferState();
// Now try an empty attribute value, should fail
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
assertNotNull(out2);
}
@Test
public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "create");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index_fail");
runner.assertValid();
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
assertNotNull(out);
}
@Test
public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
PutElasticsearchHttpRecordTestProcessor p = new PutElasticsearchHttpRecordTestProcessor(false); // no failures
p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline");
runner = TestRunners.newTestRunner(p);
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
// Set dynamic property, to be added to the URL as a query parameter
runner.setProperty("pipeline", "my-pipeline");
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents);
assertEquals(1, provEvents.size());
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
public void testPutElasticsearchOnTriggerFailureWithWriter() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
generateTestData(1);
generateWriter();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
flowFileFailure.assertAttributeEquals("failure.count", "1");
}
@Test
public void testPutElasticsearchOnTriggerFailureWithWriterMultipleRecords() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
generateTestData();
generateWriter();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
MockFlowFile flowFileSuccess = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
flowFileSuccess.assertAttributeEquals("record.count", "2");
MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
flowFileFailure.assertAttributeEquals("record.count", "2");
flowFileFailure.assertAttributeEquals("failure.count", "2");
assertEquals(1, runner.getLogger().getErrorMessages().size());
}
@Test
public void testPutElasticsearchOnTriggerFailureWithWriterMultipleRecordsLogging() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
generateTestData();
generateWriter();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.LOG_ALL_ERRORS, "true");
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
MockFlowFile flowFileSuccess = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
flowFileSuccess.assertAttributeEquals("record.count", "2");
MockFlowFile flowFileFailure = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
flowFileFailure.assertAttributeEquals("record.count", "2");
flowFileFailure.assertAttributeEquals("failure.count", "2");
assertEquals(2, runner.getLogger().getErrorMessages().size());
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class PutElasticsearchHttpRecordTestProcessor extends PutElasticsearchHttpRecord {
int numResponseFailures;
OkHttpClient client;
int statusCode = 200;
String statusMessage = "OK";
String expectedUrl = null;
Consumer<Map<?, ?>>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.numResponseFailures = responseHasFailures ? 1 : 0;
}
PutElasticsearchHttpRecordTestProcessor(int numResponseFailures) {
this.numResponseFailures = numResponseFailures;
}
void setStatus(int code, String message) {
statusCode = code;
statusMessage = message;
}
void setExpectedUrl(String url) {
expectedUrl = url;
}
@SafeVarargs
final void setRecordChecks(Consumer<Map<?, ?>>... checks) {
recordChecks = checks;
}
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
when(client.newCall(any(Request.class))).thenAnswer(invocationOnMock -> {
final Call call = mock(Call.class);
if (statusCode != -1) {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
if (recordChecks != null) {
final ObjectMapper mapper = new ObjectMapper();
Buffer sink = new Buffer();
realRequest.body().writeTo(sink);
String line;
int recordIndex = 0;
boolean content = false;
while ((line = sink.readUtf8Line()) != null) {
if (content) {
content = false;
if (recordIndex < recordChecks.length) {
recordChecks[recordIndex++].accept(mapper.readValue(line, Map.class));
}
} else {
content = true;
}
}
}
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
sb.append(numResponseFailures > 0);
sb.append("\", \"items\": [");
for (int i = 0; i < numResponseFailures; i ++) {
// This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
}
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
sb.append(statusCode);
sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
sb.append("]}");
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(sb.toString(), MediaType.parse("application/json")))
.build();
when(call.execute()).thenReturn(mockResponse);
} else {
when(call.execute()).thenThrow(ConnectException.class);
}
return call;
});
}
@Override
protected OkHttpClient getClient() {
return client;
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Integration test section below
//
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
// desired test.
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Tests basic ES functionality against a local or test ES cluster
*/
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBasic() throws InitializationException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
final RecordReaderFactory reader = new JsonTreeReader();
runner.addControllerService("reader", reader);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
runner.enableControllerService(reader);
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "_doc");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
runner.enqueue("{\"id\": 28039652140}");
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertNotNull(provEvents);
assertEquals(1, provEvents.size());
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBatch() throws InitializationException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
final RecordReaderFactory reader = new JsonTreeReader();
runner.addControllerService("reader", reader);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
runner.enableControllerService(reader);
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "_doc");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < 100; i++) {
long newId = 28039652140L + i;
final String newStrId = Long.toString(newId);
sb.append("{\"id\": ").append(newStrId).append("}\n");
}
runner.enqueue(sb.toString().getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final String content = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0).getContent();
assertEquals(sb.toString(), content);
}
@Test(expected = AssertionError.class)
public void testPutElasticSearchBadHostInEL() {
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "1"));
runner.run();
}
private void generateTestData() throws IOException {
generateTestData(4);
}
private void generateTestData(int numRecords) throws IOException {
final MockRecordParser parser = new MockRecordParser();
try {
runner.addControllerService("parser", parser);
} catch (InitializationException e) {
throw new IOException(e);
}
runner.enableControllerService(parser);
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "parser");
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("date", RecordFieldType.DATE);
parser.addSchemaField("time", RecordFieldType.TIME);
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
parser.addSchemaField("amount", RecordFieldType.DECIMAL);
final Date date = Date.valueOf(ISO_DATE);
final Timestamp timestamp = Timestamp.valueOf(LOCAL_DATE_TIME);
final Time time = Time.valueOf(LOCAL_TIME);
for(int i=1; i<=numRecords; i++) {
parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
}
}
private void generateWriter() throws IOException {
final MockRecordWriter writer = new MockRecordWriter();
try {
runner.addControllerService("writer", writer);
} catch (InitializationException e) {
throw new IOException(e);
}
runner.enableControllerService(writer);
runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
}
}