blob: da458908b5c7ccee8fd10196f7b4ba3324248b02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.spy;
public class PutDatabaseRecordTest {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
private final static String DB_LOCATION = "target/db_pdr";
TestRunner runner;
PutDatabaseRecord processor;
DBCPServiceSimpleImpl dbcp;
@BeforeClass
public static void setupBeforeClass() throws IOException {
System.setProperty("derby.stream.error.file", "target/derby.log");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ignore) {
// Do nothing, may not have existed
}
}
@AfterClass
public static void cleanUpAfterClass() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
} catch (SQLNonTransientConnectionException ignore) {
// Do nothing, this is what happens at Derby shutdown
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ignore) {
// Do nothing, may not have existed
}
}
@Before
public void setUp() throws Exception {
processor = new PutDatabaseRecord();
//Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
}
@Test
public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException, SQLException, IOException {
// Need to override the @Before method with a new processor that behaves badly
processor = new PutDatabaseRecordUnmatchedField();
//Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("extra", RecordFieldType.STRING);
parser.addSchemaField("dt", RecordFieldType.DATE);
LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in UTC
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in URC
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
parser.addRecord(1, "rec1", "test", nifiDate1);
parser.addRecord(2, "rec2", "test", nifiDate2);
parser.addRecord(3, "rec3", "test", null);
parser.addRecord(4, "rec4", "test", null);
parser.addRecord(5, null, null, null);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.FAIL_UNMATCHED_FIELD);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 0);
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
}
private void recreateTable(String createSQL) throws ProcessException, SQLException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {
stmt.execute("drop table PERSONS");
stmt.execute(createSQL);
} catch (SQLException ignore) {
// Do nothing, may not have existed
}
}
static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
@Override
SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, SQLException {
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0,1,2,3));
}
}
}