/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.standard

import org.apache.commons.dbcp2.DelegatingConnection
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.util.pattern.RollbackOnFailure
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.MockRecordFailureType
import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.apache.nifi.util.file.FileUtils
import org.junit.AfterClass
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4

import java.sql.Blob
import java.sql.Clob
import java.sql.Connection
import java.sql.Date
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLDataException
import java.sql.SQLException
import java.sql.SQLNonTransientConnectionException
import java.sql.Statement
import java.time.LocalDate
import java.time.ZoneOffset
import java.util.function.Supplier

import static org.junit.Assert.assertEquals
import static org.junit.Assert.assertFalse
import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertNull
import static org.junit.Assert.assertTrue
import static org.junit.Assert.fail
import static org.mockito.ArgumentMatchers.anyMap
import static org.mockito.Mockito.doAnswer
import static org.mockito.Mockito.spy
import static org.mockito.Mockito.times
import static org.mockito.Mockito.verify

/**
 * Unit tests for the PutDatabaseRecord processor
 */
@RunWith(JUnit4.class)
class TestPutDatabaseRecord {

    private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
    private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
    private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
    private final static String DB_LOCATION = "target/db_pdr"

    TestRunner runner
    PutDatabaseRecord processor
    DBCPServiceSimpleImpl dbcp

    @BeforeClass
    static void setupBeforeClass() throws IOException {
        System.setProperty("derby.stream.error.file", "target/derby.log")

        // remove previous test database, if any
        final File dbLocation = new File(DB_LOCATION)
        try {
            FileUtils.deleteFile(dbLocation, true)
        } catch (IOException ignore) {
            // Do nothing, may not have existed
        }
    }

    @AfterClass
    static void cleanUpAfterClass() throws Exception {
        try {
            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true")
        } catch (SQLNonTransientConnectionException ignore) {
            // Do nothing, this is what happens at Derby shutdown
        }
        // remove previous test database, if any
        final File dbLocation = new File(DB_LOCATION)
        try {
            FileUtils.deleteFile(dbLocation, true)
        } catch (IOException ignore) {
            // Do nothing, may not have existed
        }
    }

    @Before
    void setUp() throws Exception {
        processor = new PutDatabaseRecord()
        //Mock the DBCP Controller Service so we can control the Results
        dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION))

        final Map<String, String> dbcpProperties = new HashMap<>()

        runner = TestRunners.newTestRunner(processor)
        runner.addControllerService("dbcp", dbcp, dbcpProperties)
        runner.enableControllerService(dbcp)
        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp")
    }

    @Test
    void testGeneratePreparedStatements() throws Exception {

        final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
                                          new RecordField('name', RecordFieldType.STRING.dataType),
                                          new RecordField('code', RecordFieldType.INT.dataType),
                                          new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)]

        def schema = [
                getFields    : {fields},
                getFieldCount: {fields.size()},
                getField     : {int index -> fields[index]},
                getDataTypes : {fields.collect {it.dataType}},
                getFieldNames: {fields.collect {it.fieldName}},
                getDataType  : {fieldName -> fields.find {it.fieldName == fieldName}.dataType}
        ] as RecordSchema

        def tableSchema = [
                [
                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
                ],
                false,
                ['id'] as Set<String>,
                ''
        ] as PutDatabaseRecord.TableSchema

        runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
        runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_FIELD)
        runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN)
        runner.setProperty(PutDatabaseRecord.QUOTE_IDENTIFIERS, 'false')
        runner.setProperty(PutDatabaseRecord.QUOTE_TABLE_IDENTIFIER, 'false')
        def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())

        processor.with {

            assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
                    generateInsert(schema, 'PERSONS', tableSchema, settings).sql)

            assertEquals('UPDATE PERSONS SET name = ?, code = ? WHERE id = ?',
                    generateUpdate(schema, 'PERSONS', null, tableSchema, settings).sql)

            assertEquals('DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
                    generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
        }
    }

    @Test
    void testGeneratePreparedStatementsFailUnmatchedField() throws Exception {

        final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
                      new RecordField('name', RecordFieldType.STRING.dataType),
                      new RecordField('code', RecordFieldType.INT.dataType),
                      new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)]

        def schema = [
                getFields    : {fields},
                getFieldCount: {fields.size()},
                getField     : {int index -> fields[index]},
                getDataTypes : {fields.collect {it.dataType}},
                getFieldNames: {fields.collect {it.fieldName}},
                getDataType  : {fieldName -> fields.find {it.fieldName == fieldName}.dataType}
        ] as RecordSchema

        def tableSchema = [
                [
                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
                ],
                false,
                ['id'] as Set<String>,
                ''

        ] as PutDatabaseRecord.TableSchema

        runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
        runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.FAIL_UNMATCHED_FIELD)
        runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN)
        runner.setProperty(PutDatabaseRecord.QUOTE_IDENTIFIERS, 'false')
        runner.setProperty(PutDatabaseRecord.QUOTE_TABLE_IDENTIFIER, 'false')
        def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())

        processor.with {

            try {
                generateInsert(schema, 'PERSONS', tableSchema, settings)
                fail('generateInsert should fail with unmatched fields')
            } catch (SQLDataException e) {
                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
            }

            try {
                generateUpdate(schema, 'PERSONS', null, tableSchema, settings)
                fail('generateUpdate should fail with unmatched fields')
            } catch (SQLDataException e) {
                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
            }

            try {
                generateDelete(schema, 'PERSONS', tableSchema, settings)
                fail('generateDelete should fail with unmatched fields')
            } catch (SQLDataException e) {
                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
            }
        }
    }

    @Test
    void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)
        parser.addSchemaField("dt", RecordFieldType.DATE)

        LocalDate testDate1 = LocalDate.of(2021, 1, 26)
        Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
        LocalDate testDate2 = LocalDate.of(2021, 7, 26)
        Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ

        parser.addRecord(1, 'rec1', 101, jdbcDate1)
        parser.addRecord(2, 'rec2', 102, jdbcDate2)
        parser.addRecord(3, 'rec3', 103, null)
        parser.addRecord(4, 'rec4', 104, null)
        parser.addRecord(5, null, 105, null)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(101, rs.getInt(3))
        assertEquals(jdbcDate1.toString(), rs.getDate(4).toString())
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(102, rs.getInt(3))
        assertEquals(jdbcDate2.toString(), rs.getDate(4).toString())
        assertTrue(rs.next())
        assertEquals(3, rs.getInt(1))
        assertEquals('rec3', rs.getString(2))
        assertEquals(103, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertTrue(rs.next())
        assertEquals(4, rs.getInt(1))
        assertEquals('rec4', rs.getString(2))
        assertEquals(104, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertTrue(rs.next())
        assertEquals(5, rs.getInt(1))
        assertNull(rs.getString(2))
        assertEquals(105, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("dt", RecordFieldType.DATE)

        LocalDate testDate1 = LocalDate.of(2021, 1, 26)
        Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
        LocalDate testDate2 = LocalDate.of(2021, 7, 26)
        Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ

        parser.addRecord(1, 'rec1', jdbcDate1)
        parser.addRecord(2, 'rec2', jdbcDate2)
        parser.addRecord(3, 'rec3', null)
        parser.addRecord(4, 'rec4', null)
        parser.addRecord(5, null, null)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        // Zero value because of the constraint
        assertEquals(0, rs.getInt(3))
        assertEquals(jdbcDate1.toString(), rs.getDate(4).toString())
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(0, rs.getInt(3))
        assertEquals(jdbcDate2.toString(), rs.getDate(4).toString())
        assertTrue(rs.next())
        assertEquals(3, rs.getInt(1))
        assertEquals('rec3', rs.getString(2))
        assertEquals(0, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertTrue(rs.next())
        assertEquals(4, rs.getInt(1))
        assertEquals('rec4', rs.getString(2))
        assertEquals(0, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertTrue(rs.next())
        assertEquals(5, rs.getInt(1))
        assertNull(rs.getString(2))
        assertEquals(0, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 101)
        parser.addRecord(2, 'rec2', 102)
        parser.addRecord(3, 'rec3', 1000)   // This record violates the constraint on the 'code' column so should result in FlowFile being routed to failure
        parser.addRecord(4, 'rec4', 104)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        // Transaction should be rolled back and table should remain empty.
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 101)
        parser.addRecord(2, 'rec2', 102)
        parser.addRecord(3, 'rec3', 1000)
        parser.addRecord(4, 'rec4', 104)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true')

        runner.enqueue(new byte[0])
        runner.run()

        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        // Transaction should be rolled back and table should remain empty.
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 101)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, '${not.a.real.attr}')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
    }

    @Test
    void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 101)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS2')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0);
        final String errorMessage = flowFile.getAttribute("putdatabaserecord.error")
        assertTrue(errorMessage.contains("PERSONS2"))
    }

    @Test
    void testInsertViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("sql", RecordFieldType.STRING)

        parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101)''')
        parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)''')

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')

        def attrs = [:]
        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
        runner.enqueue(new byte[0], attrs)
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(101, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(102, rs.getInt(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("sql", RecordFieldType.STRING)

        parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)''')

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
        runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true')

        def attrs = [:]
        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
        runner.enqueue(new byte[0], attrs)
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(101, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(102, rs.getInt(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("sql", RecordFieldType.STRING)

        parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);
                        INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);
                        INSERT INTO PERSONS2 (id, name, code) VALUES (2, 'rec2',102);''')

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
        runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true')

        def attrs = [:]
        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
        runner.enqueue(new byte[0], attrs)
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        // The first two legitimate statements should have been rolled back
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInvalidData() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 101)
        parser.addRecord(2, 'rec2', 102)
        parser.addRecord(3, 'rec3', 104)

        parser.failAfter(1)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        // Transaction should be rolled back and table should remain empty.
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 101)
        parser.addRecord(2, 'rec2', 102)
        parser.addRecord(3, 'rec3', 104)

        parser.failAfter(1, MockRecordFailureType.IO_EXCEPTION)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        // Transaction should be rolled back and table should remain empty.
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("sql", RecordFieldType.STRING)

        parser.addRecord('')

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')

        def attrs = [:]
        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
        runner.enqueue(new byte[0], attrs)
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
    }

    @Test
    void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("sql", RecordFieldType.STRING)

        parser.addRecord('')

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true')

        def attrs = [:]
        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
        runner.enqueue(new byte[0], attrs)

        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0)
    }

    @Test
    void testUpdate() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 201)
        parser.addRecord(2, 'rec2', 202)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        // Set some existing records with different values for name and code
        final Connection conn = dbcp.getConnection()
        Statement stmt = conn.createStatement()
        stmt.execute('''INSERT INTO PERSONS VALUES (1,'x1',101, null)''')
        stmt.execute('''INSERT INTO PERSONS VALUES (2,'x2',102, null)''')
        stmt.close()

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(201, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(202, rs.getInt(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testUpdateMultipleSchemas() throws InitializationException, ProcessException, SQLException, IOException {
        // Manually create and drop the tables and schemas
        def conn = dbcp.connection
        def stmt = conn.createStatement()
        stmt.execute('create schema SCHEMA1')
        stmt.execute('create schema SCHEMA2')
        stmt.execute(createPersonsSchema1)
        stmt.execute(createPersonsSchema2)

        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 201)
        parser.addRecord(2, 'rec2', 202)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
        runner.setProperty(PutDatabaseRecord.SCHEMA_NAME, "SCHEMA1")
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        // Set some existing records with different values for name and code
        Exception e
        ResultSet rs
        try {
            stmt.execute('''INSERT INTO SCHEMA1.PERSONS VALUES (1,'x1',101,null)''')
            stmt.execute('''INSERT INTO SCHEMA2.PERSONS VALUES (2,'x2',102,null)''')

            runner.enqueue(new byte[0])
            runner.run()

            runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
            rs = stmt.executeQuery('SELECT * FROM SCHEMA1.PERSONS')
            assertTrue(rs.next())
            assertEquals(1, rs.getInt(1))
            assertEquals('rec1', rs.getString(2))
            assertEquals(201, rs.getInt(3))
            assertFalse(rs.next())
            rs = stmt.executeQuery('SELECT * FROM SCHEMA2.PERSONS')
            assertTrue(rs.next())
            assertEquals(2, rs.getInt(1))
            // Values should not have been updated
            assertEquals('x2', rs.getString(2))
            assertEquals(102, rs.getInt(3))
            assertFalse(rs.next())
        } catch(ex) {
            e = ex
        }

        // Drop the schemas here so as not to interfere with other tests
        stmt.execute("drop table SCHEMA1.PERSONS")
        stmt.execute("drop table SCHEMA2.PERSONS")
        stmt.execute("drop schema SCHEMA1 RESTRICT")
        stmt.execute("drop schema SCHEMA2 RESTRICT")
        stmt.close()

        // Don't proceed if there was a problem with the asserts
        if(e) throw e
        rs = conn.metaData.schemas
        List<String> schemas = new ArrayList<>()
        while(rs.next()) {
            schemas += rs.getString(1)
        }
        assertFalse(schemas.contains('SCHEMA1'))
        assertFalse(schemas.contains('SCHEMA2'))
        conn.close()
    }

    @Test
    void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 101)
        parser.addRecord(2, 'rec2', 102)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        Statement stmt = conn.createStatement()
        ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(101, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(102, rs.getInt(3))
        assertFalse(rs.next())
        stmt.close()
        runner.clearTransferState()

        parser.addRecord(1, 'rec1', 201)
        parser.addRecord(2, 'rec2', 202)
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
        runner.enqueue(new byte[0])
        runner.run(1,true,false)

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        stmt = conn.createStatement()
        rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(201, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(202, rs.getInt(3))
        assertFalse(rs.next())
        stmt.close()
        conn.close()
    }

    @Test
    void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        parser.addRecord(1, 'rec1', 201)
        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0)
        assertEquals('Table \'PERSONS\' not found or does not have a Primary Key and no Update Keys were specified', flowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR))
    }

    @Test
    void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(1, 'rec1', 201)
        parser.addRecord(2, 'rec2', 202)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
        runner.setProperty(PutDatabaseRecord.UPDATE_KEYS, 'id')
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        // Set some existing records with different values for name and code
        final Connection conn = dbcp.getConnection()
        Statement stmt = conn.createStatement()
        stmt.execute('''INSERT INTO PERSONS VALUES (1,'x1',101)''')
        stmt.execute('''INSERT INTO PERSONS VALUES (2,'x2',102)''')
        stmt.close()

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(201, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(202, rs.getInt(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        Connection conn = dbcp.getConnection()
        Statement stmt = conn.createStatement()
        stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
        stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', 102, null)")
        stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103, null)")
        stmt.close()

        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(2, 'rec2', 102)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.DELETE_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(101, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(3, rs.getInt(1))
        assertEquals('rec3', rs.getString(2))
        assertEquals(103, rs.getInt(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        Connection conn = dbcp.getConnection()
        Statement stmt = conn.createStatement()
        stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
        stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null, null)")
        stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103, null)")
        stmt.close()

        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        parser.addRecord(2, 'rec2', null)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.DELETE_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(101, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(3, rs.getInt(1))
        assertEquals('rec3', rs.getString(2))
        assertEquals(103, rs.getInt(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testRecordPathOptions() {
        recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        final List<RecordField> dataFields = new ArrayList<>();
        dataFields.add(new RecordField("id", RecordFieldType.INT.getDataType()))
        dataFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()))
        dataFields.add(new RecordField("code", RecordFieldType.INT.getDataType()))

        final RecordSchema dataSchema = new SimpleRecordSchema(dataFields)
        parser.addSchemaField("operation", RecordFieldType.STRING)
        parser.addSchemaField(new RecordField("data", RecordFieldType.RECORD.getRecordDataType(dataSchema)))

        // CREATE, CREATE, CREATE, DELETE, UPDATE
        parser.addRecord("INSERT", new MapRecord(dataSchema, ["id": 1, "name": "John Doe", "code": 55] as Map))
        parser.addRecord("INSERT", new MapRecord(dataSchema, ["id": 2, "name": "Jane Doe", "code": 44] as Map))
        parser.addRecord("INSERT", new MapRecord(dataSchema, ["id": 3, "name": "Jim Doe", "code": 2] as Map))
        parser.addRecord("DELETE", new MapRecord(dataSchema, ["id": 2, "name": "Jane Doe", "code": 44] as Map))
        parser.addRecord("UPDATE", new MapRecord(dataSchema, ["id": 1, "name": "John Doe", "code": 201] as Map))

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_RECORD_PATH)
        runner.setProperty(PutDatabaseRecord.DATA_RECORD_PATH, "/data")
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE_RECORD_PATH, "/operation")
        runner.setProperty(PutDatabaseRecord.UPDATE_KEYS, 'id')
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1)

        Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('John Doe', rs.getString(2))
        assertEquals(201, rs.getInt(3))
        assertTrue(rs.next())
        assertEquals(3, rs.getInt(1))
        assertEquals('Jim Doe', rs.getString(2))
        assertEquals(2, rs.getInt(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        (1..11).each {
            parser.addRecord(it, "rec$it".toString(), 100 + it)
        }

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
        runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, "5")

        Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy()

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)

        assertEquals(11, getTableSize())

        assertNotNull(spyStmt.get())
        verify(spyStmt.get(), times(3)).executeBatch()
    }

    @Test
    void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)

        (1..11).each {
            parser.addRecord(it, "rec$it".toString(), 100 + it)
        }

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy()

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)

        assertEquals(11, getTableSize())

        assertNotNull(spyStmt.get())
        verify(spyStmt.get(), times(1)).executeBatch()
    }

    private Supplier<PreparedStatement> createPreparedStatementSpy() {
        PreparedStatement spyStmt
        doAnswer({ inv ->
            new DelegatingConnection((Connection)inv.callRealMethod()) {
                @Override
                PreparedStatement prepareStatement(String sql) throws SQLException {
                    spyStmt = spy(getDelegate().prepareStatement(sql))
                }
            }
        }).when(dbcp).getConnection(anyMap())
        return { spyStmt }
    }

    private int getTableSize() {
        final Connection connection = dbcp.getConnection()
        try {
            final Statement stmt = connection.createStatement()
            try {
                final ResultSet rs = stmt.executeQuery('SELECT count(*) FROM PERSONS')
                assertTrue(rs.next())
                rs.getInt(1)
            } finally {
                stmt.close()
            }
        } finally {
            connection.close()
        }
    }

    private void recreateTable(String createSQL) throws ProcessException, SQLException {
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        try {
            stmt.execute("drop table PERSONS")
        } catch (SQLException ignore) {
            // Do nothing, may not have existed
        }
        stmt.execute(createSQL)
        stmt.close()
        conn.close()
    }

    @Test
    void testGenerateTableName() throws Exception {

        final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
                                          new RecordField('name', RecordFieldType.STRING.dataType),
                                          new RecordField('code', RecordFieldType.INT.dataType),
                                          new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)]

        def schema = [
                getFields    : {fields},
                getFieldCount: {fields.size()},
                getField     : {int index -> fields[index]},
                getDataTypes : {fields.collect {it.dataType}},
                getFieldNames: {fields.collect {it.fieldName}},
                getDataType  : {fieldName -> fields.find {it.fieldName == fieldName}.dataType}
        ] as RecordSchema

        def tableSchema = [
                [
                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
                ],
                false,
                ['id'] as Set<String>,
                '"'

        ] as PutDatabaseRecord.TableSchema

        runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
        runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_FIELD)
        runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN)
        runner.setProperty(PutDatabaseRecord.QUOTE_IDENTIFIERS, 'true')
        runner.setProperty(PutDatabaseRecord.QUOTE_TABLE_IDENTIFIER, 'true')
        def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())

        processor.with {

            assertEquals('"test_catalog"."test_schema"."test_table"',
                    generateTableName(settings,"test_catalog","test_schema","test_table",tableSchema))

        }
    }

    @Test
    void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)
        parser.addSchemaField("dt", RecordFieldType.BIGINT)

        LocalDate testDate1 = LocalDate.of(2021, 1, 26)
        Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
        BigInteger nifiDate1 = jdbcDate1.getTime() // in local TZ

        LocalDate testDate2 = LocalDate.of(2021, 7, 26)
        Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
        BigInteger nifiDate2 = jdbcDate2.getTime() // in local TZ

        parser.addRecord(1, 'rec1', 101, nifiDate1)
        parser.addRecord(2, 'rec2', 102, nifiDate2)
        parser.addRecord(3, 'rec3', 103, null)
        parser.addRecord(4, 'rec4', 104, null)
        parser.addRecord(5, null, 105, null)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertEquals(101, rs.getInt(3))
        assertEquals(jdbcDate1.toString(), rs.getDate(4).toString())
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertEquals(102, rs.getInt(3))
        assertEquals(jdbcDate2.toString(), rs.getDate(4).toString())
        assertTrue(rs.next())
        assertEquals(3, rs.getInt(1))
        assertEquals('rec3', rs.getString(2))
        assertEquals(103, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertTrue(rs.next())
        assertEquals(4, rs.getInt(1))
        assertEquals('rec4', rs.getString(2))
        assertEquals(104, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertTrue(rs.next())
        assertEquals(5, rs.getInt(1))
        assertNull(rs.getString(2))
        assertEquals(105, rs.getInt(3))
        assertNull(rs.getDate(4))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }


    @Test
    void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
        recreateTable(createPersons)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.STRING)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)
        parser.addSchemaField("dt", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.FLOAT.getDataType()).getFieldType());

        LocalDate testDate1 = LocalDate.of(2021, 1, 26)
        BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
        Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
        LocalDate testDate2 = LocalDate.of(2021, 7, 26)
        BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
        Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ

        parser.addRecord('1', 'rec1', 101, [1.0,2.0])
        parser.addRecord('2', 'rec2', 102, [3.0,4.0])
        parser.addRecord('3', 'rec3', 103, null)
        parser.addRecord('4', 'rec4', 104, null)
        parser.addRecord('5', null, 105, null)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        // A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY
        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
    }

    @Test
    void testLongVarchar() throws InitializationException, ProcessException, SQLException, IOException {
        // Manually create and drop the tables and schemas
        def conn = dbcp.connection
        def stmt = conn.createStatement()
        try {
            stmt.execute('DROP TABLE TEMP')
        } catch(ex) {
            // Do nothing, table may not exist
        }
        stmt.execute('CREATE TABLE TEMP (id integer primary key, name long varchar)')

        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)

        parser.addRecord(1, 'rec1')
        parser.addRecord(2, 'rec2')

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals('rec1', rs.getString(2))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals('rec2', rs.getString(2))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException, IOException {
        // Manually create and drop the tables and schemas
        def conn = dbcp.connection
        def stmt = conn.createStatement()
        try {
            stmt.execute('DROP TABLE TEMP')
        } catch(ex) {
            // Do nothing, table may not exist
        }
        stmt.execute('CREATE TABLE TEMP (id integer primary key, code integer, name long varchar)')

        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("code", RecordFieldType.INT)

        // change order of columns
        parser.addRecord('rec1', 1, 101)
        parser.addRecord('rec2', 2, 102)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        assertEquals(101, rs.getInt(2))
        assertEquals('rec1', rs.getString(3))
        assertTrue(rs.next())
        assertEquals(2, rs.getInt(1))
        assertEquals(102, rs.getInt(2))
        assertEquals('rec2', rs.getString(3))
        assertFalse(rs.next())

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertWithBlobClob() throws Exception {
        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"

        recreateTable(createTableWithBlob)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        byte[] bytes = "BLOB".getBytes()
        Byte[] blobRecordValue = new Byte[bytes.length]
        (0 .. (bytes.length-1)).each { i -> blobRecordValue[i] = bytes[i].longValue() }

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)
        parser.addSchemaField("content", RecordFieldType.ARRAY)

        parser.addRecord(1, 'rec1', 101, blobRecordValue)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        Clob clob = rs.getClob(2)
        assertNotNull(clob)
        char[] clobText = new char[5]
        int numBytes = clob.characterStream.read(clobText)
        assertEquals(4, numBytes)
        // Ignore last character, it's meant to ensure that only 4 bytes were read even though the buffer is 5 bytes
        assertEquals('rec1', new String(clobText).substring(0,4))
        Blob blob = rs.getBlob(3)
        assertEquals("BLOB", new String(blob.getBytes(1, blob.length() as int)))
        assertEquals(101, rs.getInt(4))

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertWithBlobClobObjectArraySource() throws Exception {
        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"

        recreateTable(createTableWithBlob)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        byte[] bytes = "BLOB".getBytes()
        Object[] blobRecordValue = new Object[bytes.length]
        (0 .. (bytes.length-1)).each { i -> blobRecordValue[i] = bytes[i] }

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)
        parser.addSchemaField("content", RecordFieldType.ARRAY)

        parser.addRecord(1, 'rec1', 101, blobRecordValue)

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        Clob clob = rs.getClob(2)
        assertNotNull(clob)
        char[] clobText = new char[5]
        int numBytes = clob.characterStream.read(clobText)
        assertEquals(4, numBytes)
        // Ignore last character, it's meant to ensure that only 4 bytes were read even though the buffer is 5 bytes
        assertEquals('rec1', new String(clobText).substring(0,4))
        Blob blob = rs.getBlob(3)
        assertEquals("BLOB", new String(blob.getBytes(1, blob.length() as int)))
        assertEquals(101, rs.getInt(4))

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertWithBlobStringSource() throws Exception {
        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"

        recreateTable(createTableWithBlob)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)
        parser.addSchemaField("content", RecordFieldType.STRING)

        parser.addRecord(1, 'rec1', 101, 'BLOB')

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
        final Connection conn = dbcp.getConnection()
        final Statement stmt = conn.createStatement()
        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
        assertTrue(rs.next())
        assertEquals(1, rs.getInt(1))
        Clob clob = rs.getClob(2)
        assertNotNull(clob)
        char[] clobText = new char[5]
        int numBytes = clob.characterStream.read(clobText)
        assertEquals(4, numBytes)
        // Ignore last character, it's meant to ensure that only 4 bytes were read even though the buffer is 5 bytes
        assertEquals('rec1', new String(clobText).substring(0,4))
        Blob blob = rs.getBlob(3)
        assertEquals("BLOB", new String(blob.getBytes(1, blob.length() as int)))
        assertEquals(101, rs.getInt(4))

        stmt.close()
        conn.close()
    }

    @Test
    void testInsertWithBlobIntegerArraySource() throws Exception {
        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"

        recreateTable(createTableWithBlob)
        final MockRecordParser parser = new MockRecordParser()
        runner.addControllerService("parser", parser)
        runner.enableControllerService(parser)

        parser.addSchemaField("id", RecordFieldType.INT)
        parser.addSchemaField("name", RecordFieldType.STRING)
        parser.addSchemaField("code", RecordFieldType.INT)
        parser.addSchemaField("content", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()).getFieldType())

        parser.addRecord(1, 'rec1', 101, [1,2,3] as Integer[])

        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

        runner.enqueue(new byte[0])
        runner.run()

        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 0)
        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
    }
}
