/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.standard;

import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.lang3.RandomUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

public class TestPutSQL {
    private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
    private static final String createPersonsAutoId = "CREATE TABLE PERSONS_AI (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))";

    @ClassRule
    public static TemporaryFolder folder = new TemporaryFolder();

    /**
     * Setting up Connection pooling is expensive operation.
     * So let's do this only once and reuse MockDBCPService in each test.
     */
    static protected DBCPService service;

    @BeforeClass
    public static void setupClass() throws ProcessException, SQLException {
        System.setProperty("derby.stream.error.file", "target/derby.log");
        final File tempDir = folder.getRoot();
        final File dbDir = new File(tempDir, "db");
        service = new MockDBCPService(dbDir.getAbsolutePath());
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate(createPersons);
                stmt.executeUpdate(createPersonsAutoId);
            }
        }
    }

    @Test
    public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();

        recreateTable("PERSONS", createPersons);

        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }

        runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes());
        runner.run();

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("George", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testCommitOnCleanup() throws InitializationException, ProcessException, SQLException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.AUTO_COMMIT, "false");

        recreateTable("PERSONS", createPersons);

        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");

        recreateTable("PERSONS_AI",createPersonsAutoId);
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        final MockFlowFile mff = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0);
        mff.assertAttributeEquals("sql.generated.key", "1");

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testProvenanceEventsWithBatchMode() throws InitializationException, ProcessException, SQLException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");

        testProvenanceEvents(runner);
    }

    @Test
    public void testProvenanceEventsWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");

        testProvenanceEvents(runner);
    }

    @Test
    public void testProvenanceEventsWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");

        testProvenanceEvents(runner);
    }

    private void testProvenanceEvents(final TestRunner runner) throws ProcessException, SQLException {
        recreateTable("PERSONS", createPersons);

        runner.enqueue("DELETE FROM PERSONS WHERE ID = 1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)");
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 2);

        List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
        assertEquals(2, provenanceEvents.size());
        for (ProvenanceEventRecord event: provenanceEvents) {
            assertEquals(ProvenanceEventType.SEND, event.getEventType());
        }
    }

    @Test
    public void testKeepFlowFileOrderingWithBatchMode() throws InitializationException, ProcessException, SQLException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");

        testKeepFlowFileOrdering(runner);
    }

    @Test
    public void testKeepFlowFileOrderingWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");

        testKeepFlowFileOrdering(runner);
    }

    @Test
    public void testKeepFlowFileOrderingWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");

        testKeepFlowFileOrdering(runner);
    }

    private void testKeepFlowFileOrdering(final TestRunner runner) throws ProcessException, SQLException {
        recreateTable("PERSONS", createPersons);

        final String delete = "DELETE FROM PERSONS WHERE ID = ?";
        final String insert = "INSERT INTO PERSONS (ID) VALUES (?)";

        final String[] statements = {delete, insert, insert, delete, delete, insert};

        final Function<Integer, Map<String, String>> createSqlAttributes = (id) -> {
            final Map<String, String> attributes = new HashMap<>();
            attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
            attributes.put("sql.args.1.value", String.valueOf(id));
            return attributes;
        };

        final int flowFileCount = statements.length;

        for (int i = 0; i < flowFileCount; i++) {
            runner.enqueue(statements[i], createSqlAttributes.apply(i));
        }

        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, flowFileCount);
        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS);
        for (int i = 0; i < flowFileCount; i++) {
            MockFlowFile flowFile = flowFiles.get(i);
            assertEquals(statements[i], flowFile.getContent());
            assertEquals(String.valueOf(i), flowFile.getAttribute("sql.args.1.value"));
        }
        List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
        assertEquals(flowFileCount, provenanceEvents.size());
        for (int i = 0; i < flowFileCount; i++) {
            ProvenanceEventRecord event = provenanceEvents.get(i);
            assertEquals(String.valueOf(i), event.getAttribute("sql.args.1.value"));
        }
    }


    @Test
    public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        testFailInMiddleWithBadStatement(runner);
        runner.run();

        runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
    }

    @Test
    public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        testFailInMiddleWithBadStatement(runner);
        runner.run();

        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
    }

    private void testFailInMiddleWithBadStatement(final TestRunner runner) throws InitializationException {
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());
    }



    @Test
    public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());

        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
            runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
            runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        }
    }

    @Test
    public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        testFailInMiddleWithBadParameterType(runner);
        runner.run();

        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
    }

    @Test
    public void testFailInMiddleWithBadParameterTypeAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        testFailInMiddleWithBadParameterType(runner);
        runner.run();

        runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
    }

    private void testFailInMiddleWithBadParameterType(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");

        final Map<String, String> goodAttributes = new HashMap<>();
        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        goodAttributes.put("sql.args.1.value", "84");

        final Map<String, String> badAttributes = new HashMap<>();
        badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
        badAttributes.put("sql.args.1.value", "hello");

        final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);
    }

    @Test
    public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");

        final Map<String, String> goodAttributes = new HashMap<>();
        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        goodAttributes.put("sql.args.1.value", "84");

        final Map<String, String> badAttributes = new HashMap<>();
        badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
        badAttributes.put("sql.args.1.value", "hello");

        final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);

        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
            runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
            runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        }
    }

    @Test
    public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        testFailInMiddleWithBadParameterValue(runner);
        runner.run();

        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
        runner.assertTransferCount(PutSQL.REL_RETRY, 4);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        testFailInMiddleWithBadParameterValue(runner);
        runner.run();

        runner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
        runner.assertTransferCount(PutSQL.REL_RETRY, 2);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    private void testFailInMiddleWithBadParameterValue(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        recreateTable("PERSONS_AI",createPersonsAutoId);
        final Map<String, String> goodAttributes = new HashMap<>();
        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        goodAttributes.put("sql.args.1.value", "84");

        final Map<String, String> badAttributes = new HashMap<>();
        badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        badAttributes.put("sql.args.1.value", "9999");

        final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);
    }

    @Test
    public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");

        recreateTable("PERSONS_AI",createPersonsAutoId);

        final Map<String, String> goodAttributes = new HashMap<>();
        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        goodAttributes.put("sql.args.1.value", "84");

        final Map<String, String> badAttributes = new HashMap<>();
        badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        badAttributes.put("sql.args.1.value", "9999");

        final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);

        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
            runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
            runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        }

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
                assertFalse(rs.next());
            }
        }
    }


    @Test
    public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer primary key, name varchar(100), code bigint)");
            }
        }

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", "-5");
        attributes.put("sql.args.1.value", "84");
        runner.enqueue("INSERT INTO PERSONS2 (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS2");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    // Not specifying a format for the date fields here to continue to test backwards compatibility
    @Test
    public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST1 (id integer primary key, ts1 timestamp, ts2 timestamp)");
            }
        }

        final String arg2TS = "2001-01-01 00:01:01.001";
        final String art3TS = "2002-02-02 12:02:02.002";
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        java.util.Date parsedDate = dateFormat.parse(arg2TS);

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP));
        attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
        attributes.put("sql.args.2.value", art3TS);

        runner.enqueue("INSERT INTO TIMESTAMPTEST1 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST1");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals(arg2TS, rs.getString(2));
                assertEquals(art3TS, rs.getString(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testUsingTimestampValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST2 (id integer primary key, ts1 timestamp, ts2 timestamp)");
            }
        }

        final String dateStr1 = "2002-02-02T12:02:02";
        final String dateStrTimestamp1 = "2002-02-02 12:02:02";
        final long dateInt1 = Timestamp.valueOf(dateStrTimestamp1).getTime();

        final String dateStr2 = "2002-02-02T12:02:02.123456789";
        final String dateStrTimestamp2 = "2002-02-02 12:02:02.123456789";
        final long dateInt2 = Timestamp.valueOf(dateStrTimestamp2).getTime();
        final long nanoInt2 = 123456789L;

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP));
        attributes.put("sql.args.1.value", dateStr1);
        attributes.put("sql.args.1.format", "ISO_LOCAL_DATE_TIME");
        attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
        attributes.put("sql.args.2.value", dateStr2);
        attributes.put("sql.args.2.format", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");

        runner.enqueue("INSERT INTO TIMESTAMPTEST2 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST2");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals(dateInt1, rs.getTimestamp(2).getTime());
                assertEquals(dateInt2, rs.getTimestamp(3).getTime());
                assertEquals(nanoInt2, rs.getTimestamp(3).getNanos());
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)");
            }
        }

        final String dateStr = "2002-03-04";
        final String timeStr = "02:03:04";
        final String timeFormatString = "HH:mm:ss";
        final String dateFormatString ="yyyy-MM-dd";

        final DateTimeFormatter timeFormatter= DateTimeFormatter.ISO_LOCAL_TIME;
        LocalTime parsedTime = LocalTime.parse(timeStr, timeFormatter);
        Time expectedTime = Time.valueOf(parsedTime);

        final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_LOCAL_DATE;
        LocalDate parsedDate = LocalDate.parse(dateStr, dateFormatter);
        Date expectedDate = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());

        final long expectedTimeInLong = expectedTime.getTime();
        final long expectedDateInLong = expectedDate.getTime();

        // test with ISO LOCAL format attribute
        Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
        attributes.put("sql.args.1.value", timeStr);
        attributes.put("sql.args.1.format", "ISO_LOCAL_TIME");
        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
        attributes.put("sql.args.2.value", dateStr);
        attributes.put("sql.args.2.format", "ISO_LOCAL_DATE");

        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);

        // Since Derby database which is used for unit test does not have timezone in DATE and TIME type,
        // and PutSQL converts date string into long representation using local timezone,
        // we need to use local timezone.
        SimpleDateFormat timeFormat = new SimpleDateFormat(timeFormatString);
        java.util.Date parsedLocalTime = timeFormat.parse(timeStr);

        SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
        java.util.Date parsedLocalDate = dateFormat.parse(dateStr);

        // test Long pattern without format attribute
        attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
        attributes.put("sql.args.1.value", Long.toString(parsedLocalTime.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
        attributes.put("sql.args.2.value", Long.toString(parsedLocalDate.getTime()));

        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes);

        // test with format attribute
        attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
        attributes.put("sql.args.1.value", "020304000");
        attributes.put("sql.args.1.format", "HHmmssSSS");
        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
        attributes.put("sql.args.2.value", "20020304");
        attributes.put("sql.args.2.format", "yyyyMMdd");

        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, ?, ?)".getBytes(), attributes);

        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals(expectedTimeInLong, rs.getTime(2).getTime());
                assertEquals(expectedDateInLong, rs.getDate(3).getTime());

                assertTrue(rs.next());
                assertEquals(2, rs.getInt(1));
                assertEquals(parsedLocalTime.getTime(), rs.getTime(2).getTime());
                assertEquals(parsedLocalDate.getTime(), rs.getDate(3).getTime());

                assertTrue(rs.next());
                assertEquals(3, rs.getInt(1));
                assertEquals(expectedTimeInLong, rs.getTime(2).getTime());
                assertEquals(expectedDateInLong, rs.getDate(3).getTime());

                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testBitType() throws SQLException, InitializationException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE BITTESTS (id integer primary key, bt1 BOOLEAN)");
            }
        }

        final byte[] insertStatement = "INSERT INTO BITTESTS (ID, bt1) VALUES (?, ?)".getBytes();

        Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "1");
        runner.enqueue(insertStatement, attributes);

        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "2");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "0");
        runner.enqueue(insertStatement, attributes);

        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "3");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "-5");
        runner.enqueue(insertStatement, attributes);

        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "4");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "t");
        runner.enqueue(insertStatement, attributes);

        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "5");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "f");
        runner.enqueue(insertStatement, attributes);

        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "6");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "T");
        runner.enqueue(insertStatement, attributes);

        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "7");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "true");
        runner.enqueue(insertStatement, attributes);

        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "8");
        attributes.put("sql.args.2.type", String.valueOf(Types.BIT));
        attributes.put("sql.args.2.value", "false");
        runner.enqueue(insertStatement, attributes);

        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 8);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM BITTESTS");

                //First test (true)
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertTrue(rs.getBoolean(2));

                //Second test (false)
                assertTrue(rs.next());
                assertEquals(2, rs.getInt(1));
                assertFalse(rs.getBoolean(2));

                //Third test (false)
                assertTrue(rs.next());
                assertEquals(3, rs.getInt(1));
                assertFalse(rs.getBoolean(2));

                //Fourth test (true)
                assertTrue(rs.next());
                assertEquals(4, rs.getInt(1));
                assertTrue(rs.getBoolean(2));

                //Fifth test (false)
                assertTrue(rs.next());
                assertEquals(5, rs.getInt(1));
                assertFalse(rs.getBoolean(2));

                //Sixth test (true)
                assertTrue(rs.next());
                assertEquals(6, rs.getInt(1));
                assertTrue(rs.getBoolean(2));

                //Seventh test (true)
                assertTrue(rs.next());
                assertEquals(7, rs.getInt(1));
                assertTrue(rs.getBoolean(2));

                //Eighth test (false)
                assertTrue(rs.next());
                assertEquals(8, rs.getInt(1));
                assertFalse(rs.getBoolean(2));

                assertFalse(rs.next());
            }
        }

    }

    @Test
    public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)");
            }
        }

        final String arg2TS = "00:01:02";
        final String art3TS = "02:03:04";
        final String timeFormatString = "HH:mm:ss";
        SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
        java.util.Date parsedDate = dateFormat.parse(arg2TS);


        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
        attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(Types.TIME));
        attributes.put("sql.args.2.value", art3TS);
        attributes.put("sql.args.2.format", timeFormatString);

        runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS");

                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals(arg2TS, dateFormat.format(rs.getTime(2)));
                assertEquals(art3TS, rs.getString(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)");
            }
        }

        final String arg2TS = "2001-01-01";
        final String art3TS = "2002-02-02";
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
        java.util.Date parsedDate = dateFormat.parse(arg2TS);

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.DATE));
        attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
        attributes.put("sql.args.2.value", art3TS);

        runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM DATETESTS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals(arg2TS, rs.getString(2));
                assertEquals(art3TS, rs.getString(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
        final TestRunner runner = initTestRunner();
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("CREATE TABLE BINARYTESTS (id integer primary key, bn1 CHAR(8) FOR BIT DATA, bn2 VARCHAR(100) FOR BIT DATA, " +
                        "bn3 LONG VARCHAR FOR BIT DATA)");
            }
        }

        final byte[] insertStatement = "INSERT INTO BINARYTESTS (ID, bn1, bn2, bn3) VALUES (?, ?, ?, ?)".getBytes();

        final String arg2BIN = fixedSizeByteArrayAsASCIIString(8);
        final String art3VARBIN = fixedSizeByteArrayAsASCIIString(50);
        final String art4LongBin = fixedSizeByteArrayAsASCIIString(32700); //max size supported by Derby

        //ASCII (default) binary formatn
        Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
        attributes.put("sql.args.2.value", arg2BIN);
        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
        attributes.put("sql.args.3.value", art3VARBIN);
        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
        attributes.put("sql.args.4.value", art4LongBin);

        runner.enqueue(insertStatement, attributes);

        //ASCII with specified format
        attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "2");
        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
        attributes.put("sql.args.2.value", arg2BIN);
        attributes.put("sql.args.2.format", "ascii");
        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
        attributes.put("sql.args.3.value", art3VARBIN);
        attributes.put("sql.args.3.format", "ascii");
        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
        attributes.put("sql.args.4.value", art4LongBin);
        attributes.put("sql.args.4.format", "ascii");

        runner.enqueue(insertStatement, attributes);

        //Hex
        final String arg2HexBIN = fixedSizeByteArrayAsHexString(8);
        final String art3HexVARBIN = fixedSizeByteArrayAsHexString(50);
        final String art4HexLongBin = fixedSizeByteArrayAsHexString(32700);

        attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "3");
        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
        attributes.put("sql.args.2.value", arg2HexBIN);
        attributes.put("sql.args.2.format", "hex");
        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
        attributes.put("sql.args.3.value", art3HexVARBIN);
        attributes.put("sql.args.3.format", "hex");
        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
        attributes.put("sql.args.4.value", art4HexLongBin);
        attributes.put("sql.args.4.format", "hex");

        runner.enqueue(insertStatement, attributes);

        //Base64
        final String arg2Base64BIN = fixedSizeByteArrayAsBase64String(8);
        final String art3Base64VARBIN = fixedSizeByteArrayAsBase64String(50);
        final String art4Base64LongBin = fixedSizeByteArrayAsBase64String(32700);

        attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "4");
        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
        attributes.put("sql.args.2.value", arg2Base64BIN);
        attributes.put("sql.args.2.format", "base64");
        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
        attributes.put("sql.args.3.value", art3Base64VARBIN);
        attributes.put("sql.args.3.format", "base64");
        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
        attributes.put("sql.args.4.value", art4Base64LongBin);
        attributes.put("sql.args.4.format", "base64");

        runner.enqueue(insertStatement, attributes);

        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 4);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM BINARYTESTS");

                //First Batch
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)));
                assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)));
                assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)));

                //Second batch
                assertTrue(rs.next());
                assertEquals(2, rs.getInt(1));
                assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), rs.getBytes(2)));
                assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), rs.getBytes(3)));
                assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), rs.getBytes(4)));

                //Third Batch (Hex)
                assertTrue(rs.next());
                assertEquals(3, rs.getInt(1));
                assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(arg2HexBIN), rs.getBytes(2)));
                assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art3HexVARBIN), rs.getBytes(3)));
                assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art4HexLongBin), rs.getBytes(4)));

                //Fourth Batch (Base64)
                assertTrue(rs.next());
                assertEquals(4, rs.getInt(1));
                assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(arg2Base64BIN), rs.getBytes(2)));
                assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art3Base64VARBIN), rs.getBytes(3)));
                assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art4Base64LongBin), rs.getBytes(4)));

                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();

        recreateTable("PERSONS", createPersons);

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }

        runner.clearTransferState();

        attributes.clear();
        attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.1.value", "George");

        attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.2.value", "1");

        runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("George", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }


    @Test
    public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();

        recreateTable("PERSONS", createPersons);

        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.4.value", "1");

        runner.enqueue(sql.getBytes(), attributes);
        runner.run();

        // should fail because of the semicolon
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");

        recreateTable("PERSONS", createPersons);

        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.4.value", "1");

        runner.enqueue(sql.getBytes(), attributes);
        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
        }

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertFalse(rs.next());
            }
        }
    }


    @Test
    public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));

        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(0, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();

        recreateTable("PERSONS", createPersons);

        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
                "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.4.value", "1");

        runner.enqueue(sql.getBytes(), attributes);
        runner.run();

        // should fail because of the semicolon
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testInvalidStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");

        recreateTable("PERSONS", createPersons);

        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
                "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.4.value", "1");

        runner.enqueue(sql.getBytes(), attributes);
        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
        }

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertFalse(rs.next());
            }
        }
    }


    @Test
    public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
        final DBCPService service = new SQLExceptionService(null);
        runner.addControllerService("dbcp", service);
        runner.enableControllerService(service);

        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");

        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.4.value", "1");

        runner.enqueue(sql.getBytes(), attributes);
        runner.run();

        // should fail because of the semicolon
        runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
    }

    @Test
    public void testRetryableFailureRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
        final DBCPService service = new SQLExceptionService(null);
        runner.addControllerService("dbcp", service);
        runner.enableControllerService(service);

        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");

        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.4.value", "1");

        runner.enqueue(sql.getBytes(), attributes);
        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
            // Should not be routed to retry.
            runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 0);
        }

    }

    @Test
    public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "1");

        recreateTable("PERSONS", createPersons);

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        runner.run();

        // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);

        attributes.clear();
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "1");

        runner.clearTransferState();
        runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes);
        runner.run();

        // Both FlowFiles with fragment identifier 1 should be successful
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 2);
        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
        runner.assertTransferCount(PutSQL.REL_RETRY, 0);
        for (final MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) {
            mff.assertAttributeEquals("fragment.identifier", "1");
        }

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Leonard", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    @Test
    public void testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "1");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");

        recreateTable("PERSONS", createPersons);

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.1.value", "1");

        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
        attributes.put("sql.args.2.value", "Mark");

        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
        attributes.put("sql.args.3.value", "84");

        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        // ProcessException should not be thrown in this case, because the input FlowFiles are simply differed.
        runner.run();

        // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);

    }

    @Test
    public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();

        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");

        final MockFlowFile mff = new MockFlowFile(0L) {
            @Override
            public Long getLastQueueDate() {
                return System.currentTimeMillis() - 10000L; // return 10 seconds ago
            }

            @Override
            public Map<String, String> getAttributes() {
                return attributes;
            }

            @Override
            public String getAttribute(final String attrName) {
                return attributes.get(attrName);
            }
        };

        runner.enqueue(mff);
        runner.run();

        // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
    }

    @Test
    public void testTransactionTimeoutRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();

        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");

        final MockFlowFile mff = new MockFlowFile(0L) {
            @Override
            public Long getLastQueueDate() {
                return System.currentTimeMillis() - 10000L; // return 10 seconds ago
            }

            @Override
            public Map<String, String> getAttributes() {
                return attributes;
            }

            @Override
            public String getAttribute(final String attrName) {
                return attributes.get(attrName);
            }
        };

        runner.enqueue(mff);
        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
        }

        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
    }

    @Test
    public void testNullFragmentCountRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();

        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        final Map<String, String> attribute1 = new HashMap<>();
        attribute1.put("fragment.identifier", "1");
        attribute1.put("fragment.count", "2");
        attribute1.put("fragment.index", "0");

        final Map<String, String> attribute2 = new HashMap<>();
        attribute2.put("fragment.identifier", "1");
//        attribute2.put("fragment.count", null);
        attribute2.put("fragment.index", "1");

        runner.enqueue(new byte[]{}, attribute1);
        runner.enqueue(new byte[]{}, attribute2);


        try {
            runner.run();
            fail("ProcessException should be thrown");
        } catch (AssertionError e) {
            assertTrue(e.getCause() instanceof ProcessException);
        }

        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
    }

    @Test
    public void testStatementsFromProperty() throws InitializationException, ProcessException, SQLException, IOException {
        final TestRunner runner = initTestRunner();
        runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (${row.id}, 'Mark', 84)");

        recreateTable("PERSONS", createPersons);

        runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>() {{
            put("row.id", "1");
        }});
        runner.run();

        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("Mark", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }

        runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE ID=${row.id}");
        runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>() {{
            put("row.id", "1");
        }});
        runner.run();

        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("George", rs.getString(2));
                assertEquals(84, rs.getInt(3));
                assertFalse(rs.next());
            }
        }
    }

    private Map<String, String> createFragmentedTransactionAttributes(String id, int count, int index) {
        final Map<String, String> attributes = new HashMap<>();
        attributes.put("fragment.identifier", id);
        attributes.put("fragment.count", String.valueOf(count));
        attributes.put("fragment.index", String.valueOf(index));
        return attributes;
    }

    @Test
    public void testTransactionalFlowFileFilter() {
        final MockFlowFile ff0 = new MockFlowFile(0);
        final MockFlowFile ff1 = new MockFlowFile(1);
        final MockFlowFile ff2 = new MockFlowFile(2);
        final MockFlowFile ff3 = new MockFlowFile(3);
        final MockFlowFile ff4 = new MockFlowFile(4);

        ff0.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 0));
        ff1.putAttributes(Collections.singletonMap("accept", "false"));
        ff2.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 1));
        ff3.putAttributes(Collections.singletonMap("accept", "true"));
        ff4.putAttributes(createFragmentedTransactionAttributes("tx-1", 3, 2));

        // TEST 1: Fragmented TX with null service filter
        // Even if the controller service does not have filtering rule, tx filter should work.
        FlowFileFilter txFilter = new PutSQL.TransactionalFlowFileFilter(null);
        // Should perform a fragmented tx.
        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
        assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));

        // TEST 2: Non-Fragmented TX with null service filter
        txFilter = new PutSQL.TransactionalFlowFileFilter(null);
        // Should perform a non-fragmented tx.
        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff1));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));


        final FlowFileFilter nonTxFilter = flowFile -> "true".equals(flowFile.getAttribute("accept"))
            ? ACCEPT_AND_CONTINUE
            : REJECT_AND_CONTINUE;

        // TEST 3: Fragmented TX with a service filter
        // Even if the controller service does not have filtering rule, tx filter should work.
        txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
        // Should perform a fragmented tx. The nonTxFilter doesn't affect in this case.
        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff0));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff2));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff3));
        assertEquals(ACCEPT_AND_TERMINATE, txFilter.filter(ff4));

        // TEST 4: Non-Fragmented TX with a service filter
        txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
        // Should perform a non-fragmented tx and use the nonTxFilter.
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff1));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff0));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff2));
        assertEquals(ACCEPT_AND_CONTINUE, txFilter.filter(ff3));
        assertEquals(REJECT_AND_CONTINUE, txFilter.filter(ff4));
    }

    /**
     * Simple implementation only for testing purposes
     */
    private static class MockDBCPService extends AbstractControllerService implements DBCPService {
        private final String dbLocation;

        public MockDBCPService(final String dbLocation) {
            this.dbLocation = dbLocation;
        }

        @Override
        public String getIdentifier() {
            return "dbcp";
        }

        @Override
        public Connection getConnection() throws ProcessException {
            try {
                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
                final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
                return conn;
            } catch (final Exception e) {
                e.printStackTrace();
                throw new ProcessException("getConnection failed: " + e);
            }
        }
    }

    /**
     * Simple implementation only for testing purposes
     */
    private static class SQLExceptionService extends AbstractControllerService implements DBCPService {
        private final DBCPService service;
        private int allowedBeforeFailure = 0;
        private int successful = 0;

        public SQLExceptionService(final DBCPService service) {
            this.service = service;
        }

        @Override
        public String getIdentifier() {
            return "dbcp";
        }

        @Override
        public Connection getConnection() throws ProcessException {
            try {
                if (++successful > allowedBeforeFailure) {
                    final Connection conn = Mockito.mock(Connection.class);
                    Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException"));
                    return conn;
                } else {
                    return service.getConnection();
                }
            } catch (final Exception e) {
                e.printStackTrace();
                throw new ProcessException("getConnection failed: " + e);
            }
        }
    }

    private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
        try (final Connection conn = service.getConnection()) {
            try (final Statement stmt = conn.createStatement()) {
                stmt.executeUpdate("drop table " + tableName);
                stmt.executeUpdate(createSQL);
            }
        }
    }

    private String fixedSizeByteArrayAsASCIIString(int length){
        byte[] bBinary = RandomUtils.nextBytes(length);
        ByteBuffer bytes = ByteBuffer.wrap(bBinary);
        StringBuffer sbBytes = new StringBuffer();
        for (int i = bytes.position(); i < bytes.limit(); i++)
            sbBytes.append((char)bytes.get(i));

        return sbBytes.toString();
    }

    private String fixedSizeByteArrayAsHexString(int length){
        byte[] bBinary = RandomUtils.nextBytes(length);
        return DatatypeConverter.printHexBinary(bBinary);
    }

    private String fixedSizeByteArrayAsBase64String(int length){
        byte[] bBinary = RandomUtils.nextBytes(length);
        return DatatypeConverter.printBase64Binary(bBinary);
    }

    private TestRunner initTestRunner() throws InitializationException {
        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);

        runner.addControllerService("dbcp", service);
        runner.enableControllerService(service);
        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");

        return runner;
    }

}
