NIFI-8376: Gracefully handle SQL exceptions in ResultSetRecordSet

This closes #4951

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 78a68b3..7bf48f0 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -30,6 +30,7 @@
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Date;
@@ -203,17 +204,17 @@
                 // the base type. However, if the base type is, itself, an array, we will simply return a base type of
                 // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not
                 // support calling Array.getResultSet() and will throw an Exception if that is not supported.
-                if (rs.isAfterLast()) {
+                try {
+                    final Array array = rs.getArray(columnIndex);
+
+                    if (array == null) {
+                        return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+                    }
+                    final DataType baseType = getArrayBaseType(array);
+                    return RecordFieldType.ARRAY.getArrayDataType(baseType);
+                } catch (SQLFeatureNotSupportedException sfnse) {
                     return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
                 }
-
-                final Array array = rs.getArray(columnIndex);
-                if (array == null) {
-                    return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
-                }
-
-                final DataType baseType = getArrayBaseType(array);
-                return RecordFieldType.ARRAY.getArrayDataType(baseType);
             case Types.BINARY:
             case Types.LONGVARBINARY:
             case Types.VARBINARY:
@@ -318,9 +319,6 @@
         if (arrayValue instanceof short[]) {
             return RecordFieldType.SHORT.getDataType();
         }
-        if (arrayValue instanceof byte[]) {
-            return RecordFieldType.BYTE.getDataType();
-        }
         if (arrayValue instanceof float[]) {
             return RecordFieldType.FLOAT.getDataType();
         }
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
index 0b768fd..3cefc4e 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -21,6 +21,7 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -30,6 +31,7 @@
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Types;
 import java.time.LocalDate;
 import java.time.ZoneOffset;
@@ -39,6 +41,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -241,6 +244,45 @@
         assertEquals(bigDecimal5Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_5));
     }
 
+    @Test
+    public void testCreateSchemaArrayThrowsException() throws SQLException {
+        // given
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("column", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final ResultSet resultSet = givenResultSetForArrayThrowsException(true);
+
+        // when
+        assertThrows(SQLException.class, () -> new ResultSetRecordSet(resultSet, recordSchema));
+    }
+
+    @Test
+    public void testCreateSchemaArrayThrowsNotSupportedException() throws SQLException {
+        // given
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("column", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final ResultSet resultSet = givenResultSetForArrayThrowsException(false);
+
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        assertEquals(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), resultSchema.getField(0).getDataType());
+    }
+
+    private ResultSet givenResultSetForArrayThrowsException(boolean featureSupported) throws SQLException {
+        final ResultSet resultSet = Mockito.mock(ResultSet.class);
+        final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
+        when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        when(resultSet.getArray(ArgumentMatchers.anyInt())).thenThrow(featureSupported ? new SQLException("test exception") : new SQLFeatureNotSupportedException("not supported"));
+        when(resultSetMetaData.getColumnCount()).thenReturn(1);
+        when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
+        when(resultSetMetaData.getColumnType(1)).thenReturn(Types.ARRAY);
+        return resultSet;
+    }
+
     private ResultSet givenResultSetForOther() throws SQLException {
         final ResultSet resultSet = Mockito.mock(ResultSet.class);
         final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);