blob: b1dce43e92d2343391ea5c4f556d84f2109ea5fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.lang3.RandomUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
public class TestPutSQL {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
private static final String createPersonsAutoId = "CREATE TABLE PERSONS_AI (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))";
@ClassRule
public static TemporaryFolder folder = new TemporaryFolder();
/**
* Setting up Connection pooling is expensive operation.
* So let's do this only once and reuse MockDBCPService in each test.
*/
static protected DBCPService service;
@BeforeClass
public static void setupClass() throws ProcessException, SQLException {
System.setProperty("derby.stream.error.file", "target/derby.log");
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
service = new MockDBCPService(dbDir.getAbsolutePath());
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
stmt.executeUpdate(createPersonsAutoId);
}
}
}
@Test
public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes());
runner.run();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testCommitOnCleanup() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.AUTO_COMMIT, "false");
recreateTable("PERSONS", createPersons);
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
recreateTable("PERSONS_AI",createPersonsAutoId);
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0);
mff.assertAttributeEquals("sql.generated.key", "1");
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testProvenanceEventsWithBatchMode() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testProvenanceEvents(runner);
}
@Test
public void testProvenanceEventsWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testProvenanceEvents(runner);
}
@Test
public void testProvenanceEventsWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
testProvenanceEvents(runner);
}
private void testProvenanceEvents(final TestRunner runner) throws ProcessException, SQLException {
recreateTable("PERSONS", createPersons);
runner.enqueue("DELETE FROM PERSONS WHERE ID = 1");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)");
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 2);
List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
for (ProvenanceEventRecord event: provenanceEvents) {
assertEquals(ProvenanceEventType.SEND, event.getEventType());
}
}
@Test
public void testKeepFlowFileOrderingWithBatchMode() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testKeepFlowFileOrdering(runner);
}
@Test
public void testKeepFlowFileOrderingWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testKeepFlowFileOrdering(runner);
}
@Test
public void testKeepFlowFileOrderingWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
testKeepFlowFileOrdering(runner);
}
private void testKeepFlowFileOrdering(final TestRunner runner) throws ProcessException, SQLException {
recreateTable("PERSONS", createPersons);
final String delete = "DELETE FROM PERSONS WHERE ID = ?";
final String insert = "INSERT INTO PERSONS (ID) VALUES (?)";
final String[] statements = {delete, insert, insert, delete, delete, insert};
final Function<Integer, Map<String, String>> createSqlAttributes = (id) -> {
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", String.valueOf(id));
return attributes;
};
final int flowFileCount = statements.length;
for (int i = 0; i < flowFileCount; i++) {
runner.enqueue(statements[i], createSqlAttributes.apply(i));
}
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, flowFileCount);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS);
for (int i = 0; i < flowFileCount; i++) {
MockFlowFile flowFile = flowFiles.get(i);
assertEquals(statements[i], flowFile.getContent());
assertEquals(String.valueOf(i), flowFile.getAttribute("sql.args.1.value"));
}
List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(flowFileCount, provenanceEvents.size());
for (int i = 0; i < flowFileCount; i++) {
ProvenanceEventRecord event = provenanceEvents.get(i);
assertEquals(String.valueOf(i), event.getAttribute("sql.args.1.value"));
}
}
@Test
public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
testFailInMiddleWithBadStatement(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
@Test
public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadStatement(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
private void testFailInMiddleWithBadStatement(final TestRunner runner) throws InitializationException {
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());
}
@Test
public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
}
@Test
public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadParameterType(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
@Test
public void testFailInMiddleWithBadParameterTypeAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
testFailInMiddleWithBadParameterType(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
private void testFailInMiddleWithBadParameterType(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
badAttributes.put("sql.args.1.value", "hello");
final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
}
@Test
public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
badAttributes.put("sql.args.1.value", "hello");
final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
}
@Test
public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
testFailInMiddleWithBadParameterValue(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
runner.assertTransferCount(PutSQL.REL_RETRY, 4);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
assertFalse(rs.next());
}
}
}
@Test
public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadParameterValue(runner);
runner.run();
runner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_RETRY, 2);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
private void testFailInMiddleWithBadParameterValue(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
recreateTable("PERSONS_AI",createPersonsAutoId);
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
badAttributes.put("sql.args.1.value", "9999");
final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
}
@Test
public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
recreateTable("PERSONS_AI",createPersonsAutoId);
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
goodAttributes.put("sql.args.1.value", "84");
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
badAttributes.put("sql.args.1.value", "9999");
final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
runner.enqueue(data, badAttributes);
runner.enqueue(data, goodAttributes);
runner.enqueue(data, goodAttributes);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
}
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
assertFalse(rs.next());
}
}
}
@Test
public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer primary key, name varchar(100), code bigint)");
}
}
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", "-5");
attributes.put("sql.args.1.value", "84");
runner.enqueue("INSERT INTO PERSONS2 (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS2");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
// Not specifying a format for the date fields here to continue to test backwards compatibility
@Test
public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST1 (id integer primary key, ts1 timestamp, ts2 timestamp)");
}
}
final String arg2TS = "2001-01-01 00:01:01.001";
final String art3TS = "2002-02-02 12:02:02.002";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
java.util.Date parsedDate = dateFormat.parse(arg2TS);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
attributes.put("sql.args.2.value", art3TS);
runner.enqueue("INSERT INTO TIMESTAMPTEST1 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST1");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}
@Test
public void testUsingTimestampValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST2 (id integer primary key, ts1 timestamp, ts2 timestamp)");
}
}
final String dateStr1 = "2002-02-02T12:02:02";
final String dateStrTimestamp1 = "2002-02-02 12:02:02";
final long dateInt1 = Timestamp.valueOf(dateStrTimestamp1).getTime();
final String dateStr2 = "2002-02-02T12:02:02.123456789";
final String dateStrTimestamp2 = "2002-02-02 12:02:02.123456789";
final long dateInt2 = Timestamp.valueOf(dateStrTimestamp2).getTime();
final long nanoInt2 = 123456789L;
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP));
attributes.put("sql.args.1.value", dateStr1);
attributes.put("sql.args.1.format", "ISO_LOCAL_DATE_TIME");
attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
attributes.put("sql.args.2.value", dateStr2);
attributes.put("sql.args.2.format", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
runner.enqueue("INSERT INTO TIMESTAMPTEST2 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST2");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(dateInt1, rs.getTimestamp(2).getTime());
assertEquals(dateInt2, rs.getTimestamp(3).getTime());
assertEquals(nanoInt2, rs.getTimestamp(3).getNanos());
assertFalse(rs.next());
}
}
}
@Test
public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)");
}
}
final String dateStr = "2002-03-04";
final String timeStr = "02:03:04";
final String timeFormatString = "HH:mm:ss";
final String dateFormatString ="yyyy-MM-dd";
final DateTimeFormatter timeFormatter= DateTimeFormatter.ISO_LOCAL_TIME;
LocalTime parsedTime = LocalTime.parse(timeStr, timeFormatter);
Time expectedTime = Time.valueOf(parsedTime);
final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_LOCAL_DATE;
LocalDate parsedDate = LocalDate.parse(dateStr, dateFormatter);
Date expectedDate = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
final long expectedTimeInLong = expectedTime.getTime();
final long expectedDateInLong = expectedDate.getTime();
// test with ISO LOCAL format attribute
Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", timeStr);
attributes.put("sql.args.1.format", "ISO_LOCAL_TIME");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", dateStr);
attributes.put("sql.args.2.format", "ISO_LOCAL_DATE");
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
// Since Derby database which is used for unit test does not have timezone in DATE and TIME type,
// and PutSQL converts date string into long representation using local timezone,
// we need to use local timezone.
SimpleDateFormat timeFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedLocalTime = timeFormat.parse(timeStr);
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
java.util.Date parsedLocalDate = dateFormat.parse(dateStr);
// test Long pattern without format attribute
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", Long.toString(parsedLocalTime.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", Long.toString(parsedLocalDate.getTime()));
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes);
// test with format attribute
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", "020304000");
attributes.put("sql.args.1.format", "HHmmssSSS");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", "20020304");
attributes.put("sql.args.2.format", "yyyyMMdd");
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(expectedTimeInLong, rs.getTime(2).getTime());
assertEquals(expectedDateInLong, rs.getDate(3).getTime());
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals(parsedLocalTime.getTime(), rs.getTime(2).getTime());
assertEquals(parsedLocalDate.getTime(), rs.getDate(3).getTime());
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals(expectedTimeInLong, rs.getTime(2).getTime());
assertEquals(expectedDateInLong, rs.getDate(3).getTime());
assertFalse(rs.next());
}
}
}
@Test
public void testBitType() throws SQLException, InitializationException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE BITTESTS (id integer primary key, bt1 BOOLEAN)");
}
}
final byte[] insertStatement = "INSERT INTO BITTESTS (ID, bt1) VALUES (?, ?)".getBytes();
Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "1");
runner.enqueue(insertStatement, attributes);
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "2");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "0");
runner.enqueue(insertStatement, attributes);
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "3");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "-5");
runner.enqueue(insertStatement, attributes);
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "4");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "t");
runner.enqueue(insertStatement, attributes);
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "5");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "f");
runner.enqueue(insertStatement, attributes);
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "6");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "T");
runner.enqueue(insertStatement, attributes);
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "7");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "true");
runner.enqueue(insertStatement, attributes);
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "8");
attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
attributes.put("sql.args.2.value", "false");
runner.enqueue(insertStatement, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 8);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM BITTESTS");
//First test (true)
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertTrue(rs.getBoolean(2));
//Second test (false)
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertFalse(rs.getBoolean(2));
//Third test (false)
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertFalse(rs.getBoolean(2));
//Fourth test (true)
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
assertTrue(rs.getBoolean(2));
//Fifth test (false)
assertTrue(rs.next());
assertEquals(5, rs.getInt(1));
assertFalse(rs.getBoolean(2));
//Sixth test (true)
assertTrue(rs.next());
assertEquals(6, rs.getInt(1));
assertTrue(rs.getBoolean(2));
//Seventh test (true)
assertTrue(rs.next());
assertEquals(7, rs.getInt(1));
assertTrue(rs.getBoolean(2));
//Eighth test (false)
assertTrue(rs.next());
assertEquals(8, rs.getInt(1));
assertFalse(rs.getBoolean(2));
assertFalse(rs.next());
}
}
}
@Test
public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)");
}
}
final String arg2TS = "00:01:02";
final String art3TS = "02:03:04";
final String timeFormatString = "HH:mm:ss";
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(arg2TS);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.TIME));
attributes.put("sql.args.2.value", art3TS);
attributes.put("sql.args.2.format", timeFormatString);
runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, dateFormat.format(rs.getTime(2)));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}
@Test
public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)");
}
}
final String arg2TS = "2001-01-01";
final String art3TS = "2002-02-02";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
java.util.Date parsedDate = dateFormat.parse(arg2TS);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.DATE));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", art3TS);
runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM DATETESTS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}
@Test
public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE BINARYTESTS (id integer primary key, bn1 CHAR(8) FOR BIT DATA, bn2 VARCHAR(100) FOR BIT DATA, " +
"bn3 LONG VARCHAR FOR BIT DATA)");
}
}
final byte[] insertStatement = "INSERT INTO BINARYTESTS (ID, bn1, bn2, bn3) VALUES (?, ?, ?, ?)".getBytes();
final String arg2BIN = fixedSizeByteArrayAsASCIIString(8);
final String art3VARBIN = fixedSizeByteArrayAsASCIIString(50);
final String art4LongBin = fixedSizeByteArrayAsASCIIString(32700); //max size supported by Derby
//ASCII (default) binary formatn
Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2BIN);
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3VARBIN);
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4LongBin);
runner.enqueue(insertStatement, attributes);
//ASCII with specified format
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "2");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2BIN);
attributes.put("sql.args.2.format", "ascii");
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3VARBIN);
attributes.put("sql.args.3.format", "ascii");
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4LongBin);
attributes.put("sql.args.4.format", "ascii");
runner.enqueue(insertStatement, attributes);
//Hex
final String arg2HexBIN = fixedSizeByteArrayAsHexString(8);
final String art3HexVARBIN = fixedSizeByteArrayAsHexString(50);
final String art4HexLongBin = fixedSizeByteArrayAsHexString(32700);
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "3");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2HexBIN);
attributes.put("sql.args.2.format", "hex");
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3HexVARBIN);
attributes.put("sql.args.3.format", "hex");
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4HexLongBin);
attributes.put("sql.args.4.format", "hex");
runner.enqueue(insertStatement, attributes);
//Base64
final String arg2Base64BIN = fixedSizeByteArrayAsBase64String(8);
final String art3Base64VARBIN = fixedSizeByteArrayAsBase64String(50);
final String art4Base64LongBin = fixedSizeByteArrayAsBase64String(32700);
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "4");
attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
attributes.put("sql.args.2.value", arg2Base64BIN);
attributes.put("sql.args.2.format", "base64");
attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
attributes.put("sql.args.3.value", art3Base64VARBIN);
attributes.put("sql.args.3.format", "base64");
attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
attributes.put("sql.args.4.value", art4Base64LongBin);
attributes.put("sql.args.4.format", "base64");
runner.enqueue(insertStatement, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 4);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM BINARYTESTS");
//First Batch
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)));
assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)));
assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)));
//Second batch
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)));
assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)));
assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)));
//Third Batch (Hex)
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(arg2HexBIN), rs.getBytes(2)));
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art3HexVARBIN), rs.getBytes(3)));
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art4HexLongBin), rs.getBytes(4)));
//Fourth Batch (Base64)
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(arg2Base64BIN), rs.getBytes(2)));
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art3Base64VARBIN), rs.getBytes(3)));
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art4Base64LongBin), rs.getBytes(4)));
assertFalse(rs.next());
}
}
}
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.clearTransferState();
attributes.clear();
attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.1.value", "George");
attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.2.value", "1");
runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
@Test
public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
recreateTable("PERSONS", createPersons);
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
}
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
@Test
public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(0, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
@Test
public void testInvalidStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
recreateTable("PERSONS", createPersons);
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
}
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());
}
}
}
@Test
public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final DBCPService service = new SQLExceptionService(null);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
}
@Test
public void testRetryableFailureRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final DBCPService service = new SQLExceptionService(null);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
// Should not be routed to retry.
runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 0);
}
}
@Test
public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "1");
recreateTable("PERSONS", createPersons);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "0");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
runner.run();
// No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
attributes.clear();
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "1");
runner.clearTransferState();
runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes);
runner.run();
// Both FlowFiles with fragment identifier 1 should be successful
runner.assertTransferCount(PutSQL.REL_SUCCESS, 2);
runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
runner.assertTransferCount(PutSQL.REL_RETRY, 0);
for (final MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) {
mff.assertAttributeEquals("fragment.identifier", "1");
}
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Leonard", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
@Test
public void testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "1");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
recreateTable("PERSONS", createPersons);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("sql.args.2.value", "Mark");
attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.3.value", "84");
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "0");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
// ProcessException should not be thrown in this case, because the input FlowFiles are simply differed.
runner.run();
// No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
}
@Test
public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
final Map<String, String> attributes = new HashMap<>();
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "0");
final MockFlowFile mff = new MockFlowFile(0L) {
@Override
public Long getLastQueueDate() {
return System.currentTimeMillis() - 10000L; // return 10 seconds ago
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
@Override
public String getAttribute(final String attrName) {
return attributes.get(attrName);
}
};
runner.enqueue(mff);
runner.run();
// No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
}
@Test
public void testTransactionTimeoutRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> attributes = new HashMap<>();
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
attributes.put("fragment.index", "0");
final MockFlowFile mff = new MockFlowFile(0L) {
@Override
public Long getLastQueueDate() {
return System.currentTimeMillis() - 10000L; // return 10 seconds ago
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
@Override
public String getAttribute(final String attrName) {
return attributes.get(attrName);
}
};
runner.enqueue(mff);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
}
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
}
@Test
public void testNullFragmentCountRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> attribute1 = new HashMap<>();
attribute1.put("fragment.identifier", "1");
attribute1.put("fragment.count", "2");
attribute1.put("fragment.index", "0");
final Map<String, String> attribute2 = new HashMap<>();
attribute2.put("fragment.identifier", "1");
// attribute2.put("fragment.count", null);
attribute2.put("fragment.index", "1");
runner.enqueue(new byte[]{}, attribute1);
runner.enqueue(new byte[]{}, attribute2);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
}
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
}
@Test
public void testStatementsFromProperty() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (${row.id}, 'Mark', 84)");
recreateTable("PERSONS", createPersons);
runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>() {{
put("row.id", "1");
}});
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE ID=${row.id}");
runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>() {{
put("row.id", "1");
}});
runner.run();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
private Map<String, String> createFragmentedTransactionAttributes(String id, int count, int index) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("fragment.identifier", id);
attributes.put("fragment.count", String.valueOf(count));
attributes.put("fragment.index", String.valueOf(index));
return attributes;
}
@Test
public void testTransactionalFlowFileFilter() {
final MockFlowFile ff0 = new MockFlowFile(0);
final MockFlowFile ff1 = new MockFlowFile(1);
final MockFlowFile ff2 = new MockFlowFile(2);
final MockFlowFile ff3 = new MockFlowFile(3);
final MockFlowFile ff4 = new MockFlowFile(4);
ff0.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 0));
ff1.putAttributes(Collections.singletonMap("accept", "false"));
ff2.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 1));
ff3.putAttributes(Collections.singletonMap("accept", "true"));
ff4.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 2));
// TEST 1: Fragmented TX with null service filter
// Even if the controller service does not have filtering rule, tx filter should work.
FlowFileFilter txFilter = new PutSQL.TransactionalFlowFileFilter(null);
// Should perform a fragmented tx.
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));
// TEST 2: Non-Fragmented TX with null service filter
txFilter = new PutSQL.TransactionalFlowFileFilter(null);
// Should perform a non-fragmented tx.
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff1));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));
final FlowFileFilter nonTxFilter = flowFile -> "true".equals(flowFile.getAttribute("accept"))
? ACCEPT_AND_CONTINUE
: REJECT_AND_CONTINUE;
// TEST 3: Fragmented TX with a service filter
// Even if the controller service does not have filtering rule, tx filter should work.
txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
// Should perform a fragmented tx. The nonTxFilter doesn't affect in this case.
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));
// TEST 4: Non-Fragmented TX with a service filter
txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
// Should perform a non-fragmented tx and use the nonTxFilter.
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private final String dbLocation;
public MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return conn;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
/**
* Simple implementation only for testing purposes
*/
private static class SQLExceptionService extends AbstractControllerService implements DBCPService {
private final DBCPService service;
private int allowedBeforeFailure = 0;
private int successful = 0;
public SQLExceptionService(final DBCPService service) {
this.service = service;
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
if (++successful > allowedBeforeFailure) {
final Connection conn = Mockito.mock(Connection.class);
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException"));
return conn;
} else {
return service.getConnection();
}
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
}
private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("drop table " + tableName);
stmt.executeUpdate(createSQL);
}
}
}
private String fixedSizeByteArrayAsASCIIString(int length){
byte[] bBinary = RandomUtils.nextBytes(length);
ByteBuffer bytes = ByteBuffer.wrap(bBinary);
StringBuffer sbBytes = new StringBuffer();
for (int i = bytes.position(); i < bytes.limit(); i++)
sbBytes.append((char)bytes.get(i));
return sbBytes.toString();
}
private String fixedSizeByteArrayAsHexString(int length){
byte[] bBinary = RandomUtils.nextBytes(length);
return DatatypeConverter.printHexBinary(bBinary);
}
private String fixedSizeByteArrayAsBase64String(int length){
byte[] bBinary = RandomUtils.nextBytes(length);
return DatatypeConverter.printBase64Binary(bBinary);
}
private TestRunner initTestRunner() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
return runner;
}
}