| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.processors.kudu; |
| |
| import org.apache.kudu.ColumnSchema; |
| import org.apache.kudu.ColumnTypeAttributes; |
| import org.apache.kudu.Schema; |
| import org.apache.kudu.Type; |
| import org.apache.kudu.client.Insert; |
| import org.apache.kudu.client.KuduException; |
| import org.apache.kudu.client.KuduSession; |
| import org.apache.kudu.client.OperationResponse; |
| import org.apache.kudu.client.PartialRow; |
| import org.apache.kudu.client.RowError; |
| import org.apache.kudu.client.RowErrorsAndOverflowStatus; |
| import org.apache.kudu.client.SessionConfiguration.FlushMode; |
| import org.apache.nifi.controller.AbstractControllerService; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.kerberos.KerberosCredentialsService; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.provenance.ProvenanceEventRecord; |
| import org.apache.nifi.provenance.ProvenanceEventType; |
| import org.apache.nifi.reporting.InitializationException; |
| import org.apache.nifi.schema.access.SchemaNotFoundException; |
| import org.apache.nifi.serialization.MalformedRecordException; |
| import org.apache.nifi.serialization.RecordReader; |
| import org.apache.nifi.serialization.RecordReaderFactory; |
| import org.apache.nifi.serialization.SimpleRecordSchema; |
| import org.apache.nifi.serialization.record.MapRecord; |
| import org.apache.nifi.serialization.record.MockRecordParser; |
| import org.apache.nifi.serialization.record.RecordField; |
| import org.apache.nifi.serialization.record.RecordFieldType; |
| import org.apache.nifi.serialization.record.RecordSchema; |
| import org.apache.nifi.util.MockFlowFile; |
| import org.apache.nifi.util.TestRunner; |
| import org.apache.nifi.util.TestRunners; |
| import org.apache.nifi.util.Tuple; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.stubbing.OngoingStubbing; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.math.BigDecimal; |
| import java.sql.Timestamp; |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION; |
| import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL; |
| import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| public class TestPutKudu { |
| |
| public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; |
| public static final String DEFAULT_MASTERS = "testLocalHost:7051"; |
| public static final String SKIP_HEAD_LINE = "false"; |
| public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal,dateVal"; |
| |
| private static final String DATE_FIELD = "created"; |
| private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01"; |
| private static final String ISO_8601_YEAR_MONTH_DAY_PATTERN = "yyyy-MM-dd"; |
| |
| private TestRunner testRunner; |
| |
| private MockPutKudu processor; |
| |
| private MockRecordParser readerFactory; |
| |
| private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis()); |
| |
| @Before |
| public void setUp() { |
| processor = new MockPutKudu(); |
| testRunner = TestRunners.newTestRunner(processor); |
| setUpTestRunner(testRunner); |
| } |
| |
| private void setUpTestRunner(TestRunner testRunner) { |
| testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME); |
| testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS); |
| testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE); |
| testRunner.setProperty(PutKudu.IGNORE_NULL, "true"); |
| testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "true"); |
| testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); |
| testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString()); |
| } |
| |
| @After |
| public void close() { |
| testRunner = null; |
| } |
| |
| private void createRecordReader(int numOfRecord) throws InitializationException { |
| |
| readerFactory = new MockRecordParser(); |
| readerFactory.addSchemaField("id", RecordFieldType.INT); |
| readerFactory.addSchemaField("stringVal", RecordFieldType.STRING); |
| readerFactory.addSchemaField("num32Val", RecordFieldType.INT); |
| readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); |
| readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3))); |
| readerFactory.addSchemaField("dateVal", RecordFieldType.DATE); |
| for (int i = 0; i < numOfRecord; i++) { |
| readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, |
| new BigDecimal("111.111").add(BigDecimal.valueOf(i)), today); |
| } |
| |
| testRunner.addControllerService("mock-reader-factory", readerFactory); |
| testRunner.enableControllerService(readerFactory); |
| } |
| |
| @Test |
| public void testCustomValidate() throws InitializationException { |
| createRecordReader(1); |
| |
| testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal"); |
| testRunner.assertNotValid(); |
| |
| testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL); |
| testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password"); |
| testRunner.assertNotValid(); |
| |
| testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal"); |
| testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password"); |
| testRunner.assertValid(); |
| |
| final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab"); |
| testRunner.addControllerService("kerb", kerberosCredentialsService); |
| testRunner.enableControllerService(kerberosCredentialsService); |
| testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb"); |
| testRunner.assertNotValid(); |
| |
| testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL); |
| testRunner.removeProperty(PutKudu.KERBEROS_PASSWORD); |
| testRunner.assertValid(); |
| } |
| |
| @Test |
| public void testWriteKuduWithDefaults() throws InitializationException { |
| createRecordReader(100); |
| |
| final String filename = "testWriteKudu-" + System.currentTimeMillis(); |
| |
| final Map<String, String> flowFileAttributes = new HashMap<>(); |
| flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); |
| |
| testRunner.enqueue("trigger", flowFileAttributes); |
| testRunner.run(); |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); |
| |
| // verify the successful flow file has the expected content & attributes |
| final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); |
| mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename); |
| mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100"); |
| mockFlowFile.assertContentEquals("trigger"); |
| |
| // verify we generated a provenance event |
| final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents(); |
| Assert.assertEquals(1, provEvents.size()); |
| |
| // verify it was a SEND event with the correct URI |
| final ProvenanceEventRecord provEvent = provEvents.get(0); |
| Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); |
| } |
| |
| @Test |
| public void testKerberosEnabled() throws InitializationException { |
| createRecordReader(1); |
| |
| final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab"); |
| testRunner.addControllerService("kerb", kerberosCredentialsService); |
| testRunner.enableControllerService(kerberosCredentialsService); |
| |
| testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb"); |
| |
| testRunner.run(1, false); |
| |
| final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor(); |
| assertTrue(proc.loggedIn()); |
| assertFalse(proc.loggedOut()); |
| |
| testRunner.run(1, true, false); |
| assertTrue(proc.loggedOut()); |
| } |
| |
| @Test |
| public void testInsecureClient() throws InitializationException { |
| createRecordReader(1); |
| |
| testRunner.run(1, false); |
| |
| final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor(); |
| assertFalse(proc.loggedIn()); |
| assertFalse(proc.loggedOut()); |
| |
| testRunner.run(1, true, false); |
| assertFalse(proc.loggedOut()); |
| } |
| |
| |
| @Test |
| public void testInvalidReaderShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException { |
| createRecordReader(0); |
| |
| // simulate throwing an IOException when the factory creates a reader which is what would happen when |
| // invalid Avro is passed to the Avro reader factory |
| final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class); |
| when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); |
| when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenThrow(new IOException("NOT AVRO")); |
| |
| testRunner.addControllerService("mock-reader-factory", readerFactory); |
| testRunner.enableControllerService(readerFactory); |
| testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); |
| |
| final String filename = "testInvalidAvroShouldRouteToFailure-" + System.currentTimeMillis(); |
| |
| final Map<String, String> flowFileAttributes = new HashMap<>(); |
| flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); |
| |
| testRunner.enqueue("trigger", flowFileAttributes); |
| testRunner.run(); |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1); |
| } |
| |
| @Test |
| public void testValidSchemaShouldBeSuccessful() throws InitializationException { |
| createRecordReader(10); |
| final String filename = "testValidSchemaShouldBeSuccessful-" + System.currentTimeMillis(); |
| |
| // don't provide my.schema as an attribute |
| final Map<String, String> flowFileAttributes = new HashMap<>(); |
| flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); |
| flowFileAttributes.put("my.schema", TABLE_SCHEMA); |
| |
| testRunner.enqueue("trigger", flowFileAttributes); |
| testRunner.run(); |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); |
| } |
| |
| @Test |
| public void testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed() throws InitializationException { |
| processor.setTableSchema(new Schema(Collections.emptyList())); |
| createRecordReader(5); |
| final String filename = "testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed-" + System.currentTimeMillis(); |
| |
| final Map<String, String> flowFileAttributes = new HashMap<>(); |
| flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); |
| |
| testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true"); |
| testRunner.enqueue("trigger", flowFileAttributes); |
| |
| testRunner.run(); |
| |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); |
| } |
| |
| @Test |
| public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { |
| createRecordReader(10); |
| |
| final RecordReader recordReader = mock(RecordReader.class); |
| when(recordReader.nextRecord()).thenThrow(new MalformedRecordException("ERROR")); |
| |
| final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class); |
| when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); |
| when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader); |
| |
| testRunner.addControllerService("mock-reader-factory", readerFactory); |
| testRunner.enableControllerService(readerFactory); |
| testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); |
| |
| final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); |
| |
| final Map<String, String> flowFileAttributes = new HashMap<>(); |
| flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); |
| |
| testRunner.enqueue("trigger", flowFileAttributes); |
| testRunner.run(); |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1); |
| } |
| |
| @Test |
| public void testReadAsStringAndWriteAsInt() throws InitializationException { |
| createRecordReader(0); |
| // add the favorite color as a string |
| readerFactory.addRecord(1, "name0", "0", "89.89", "111.111", today); |
| |
| final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis(); |
| |
| final Map<String, String> flowFileAttributes = new HashMap<>(); |
| flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); |
| |
| testRunner.enqueue("trigger", flowFileAttributes); |
| testRunner.run(); |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); |
| } |
| |
| @Test |
| public void testMissingColumnInReader() throws InitializationException { |
| createRecordReader(0); |
| readerFactory.addRecord("name0", "0", "89.89"); //missing id |
| |
| final String filename = "testMissingColumnInReader-" + System.currentTimeMillis(); |
| |
| final Map<String, String> flowFileAttributes = new HashMap<>(); |
| flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); |
| |
| testRunner.enqueue("trigger", flowFileAttributes); |
| testRunner.run(); |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1); |
| } |
| |
| |
| @Test |
| public void testInsertManyFlowFiles() throws Exception { |
| createRecordReader(50); |
| final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value11\" }"; |
| final String content2 = "{ \"field1\" : \"value1\", \"field2\" : \"value11\" }"; |
| final String content3 = "{ \"field1\" : \"value3\", \"field2\" : \"value33\" }"; |
| |
| testRunner.enqueue(content1.getBytes()); |
| testRunner.enqueue(content2.getBytes()); |
| testRunner.enqueue(content3.getBytes()); |
| |
| testRunner.run(3); |
| |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 3); |
| List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS); |
| |
| flowFiles.get(0).assertContentEquals(content1.getBytes()); |
| flowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); |
| |
| flowFiles.get(1).assertContentEquals(content2.getBytes()); |
| flowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); |
| |
| flowFiles.get(2).assertContentEquals(content3.getBytes()); |
| flowFiles.get(2).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); |
| } |
| |
| @Test |
| public void testUpsertFlowFiles() throws Exception { |
| createRecordReader(50); |
| testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.UPSERT.toString()); |
| testRunner.enqueue("string".getBytes()); |
| |
| testRunner.run(); |
| |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); |
| MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); |
| |
| flowFile.assertContentEquals("string".getBytes()); |
| flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); |
| } |
| |
| @Test |
| public void testDeleteFlowFiles() throws Exception { |
| createRecordReader(50); |
| testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.delete}"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("kudu.record.delete", "DELETE"); |
| |
| testRunner.enqueue("string".getBytes(), attributes); |
| testRunner.run(); |
| |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); |
| MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); |
| |
| flowFile.assertContentEquals("string".getBytes()); |
| flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); |
| } |
| |
| @Test |
| public void testUpdateFlowFiles() throws Exception { |
| createRecordReader(50); |
| testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.update}"); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("kudu.record.update", "UPDATE"); |
| |
| testRunner.enqueue("string".getBytes(), attributes); |
| testRunner.run(); |
| |
| testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); |
| MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); |
| |
| flowFile.assertContentEquals("string".getBytes()); |
| flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); |
| } |
| |
| @Test |
| public void testBuildRow() { |
| buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO", null, false); |
| } |
| |
| @Test |
| public void testBuildPartialRowNullable() { |
| buildPartialRow((long) 1, null, (short) 10, "id", "id", null, null, false); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testBuildPartialRowNullPrimaryKey() { |
| buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", null, false); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testBuildPartialRowNotNullable() { |
| buildPartialRow((long) 1, "foo", null, "id", "id", "SFO", null, false); |
| } |
| |
| @Test |
| public void testBuildPartialRowLowercaseFields() { |
| PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, true); |
| row.getLong("id"); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testBuildPartialRowLowercaseFieldsFalse() { |
| PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, false); |
| row.getLong("id"); |
| } |
| |
| @Test |
| public void testBuildPartialRowLowercaseFieldsKuduUpper() { |
| PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, false); |
| row.getLong("ID"); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testBuildPartialRowLowercaseFieldsKuduUpperFail() { |
| PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, true); |
| row.getLong("ID"); |
| } |
| |
| @Test |
| public void testBuildPartialRowVarCharTooLong() { |
| PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", null, true); |
| Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code")); |
| } |
| |
| @Test |
| public void testBuildPartialRowWithDate() { |
| PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", today, true); |
| // Comparing string representations of dates, because java.sql.Date does not override |
| // java.util.Date.equals method and therefore compares milliseconds instead of |
| // comparing dates, even though java.sql.Date is supposed to ignore time |
| Assert.assertEquals(String.format("Expecting the date to be %s, but got %s", today, row.getDate("sql_date").toString()), |
| row.getDate("sql_date").toString(), today.toString()); |
| } |
| |
| @Test |
| public void testBuildPartialRowWithDateDefaultTimeZone() throws ParseException { |
| final SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_8601_YEAR_MONTH_DAY_PATTERN); |
| final java.util.Date dateFieldValue = dateFormat.parse(ISO_8601_YEAR_MONTH_DAY); |
| |
| assertPartialRowDateFieldEquals(dateFieldValue); |
| } |
| |
| @Test |
| public void testBuildPartialRowWithDateString() { |
| assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY); |
| } |
| |
| private void assertPartialRowDateFieldEquals(final Object dateFieldValue) { |
| final PartialRow row = buildPartialRowDateField(dateFieldValue); |
| final java.sql.Date rowDate = row.getDate(DATE_FIELD); |
| assertEquals("Partial Row Date Field not matched", ISO_8601_YEAR_MONTH_DAY, rowDate.toString()); |
| } |
| |
| private PartialRow buildPartialRowDateField(final Object dateFieldValue) { |
| final Schema kuduSchema = new Schema(Collections.singletonList( |
| new ColumnSchema.ColumnSchemaBuilder(DATE_FIELD, Type.DATE).nullable(true).build() |
| )); |
| |
| final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList( |
| new RecordField(DATE_FIELD, RecordFieldType.DATE.getDataType()) |
| )); |
| |
| final Map<String, Object> values = new HashMap<>(); |
| values.put(DATE_FIELD, dateFieldValue); |
| final MapRecord record = new MapRecord(schema, values); |
| |
| final PartialRow row = kuduSchema.newPartialRow(); |
| processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true); |
| return row; |
| } |
| |
| private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) { |
| final Schema kuduSchema = new Schema(Arrays.asList( |
| new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(), |
| new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(), |
| new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(), |
| new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(), |
| new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes( |
| new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build() |
| ).build(), |
| new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes( |
| new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build() |
| ).build(), |
| new ColumnSchema.ColumnSchemaBuilder("sql_date", Type.DATE).nullable(true).build() |
| )); |
| |
| |
| final RecordSchema schema = new SimpleRecordSchema(Arrays.asList( |
| new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()), |
| new RecordField("name", RecordFieldType.STRING.getDataType()), |
| new RecordField("age", RecordFieldType.SHORT.getDataType()), |
| new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()), |
| new RecordField("score", RecordFieldType.LONG.getDataType()), |
| new RecordField("airport_code", RecordFieldType.STRING.getDataType()), |
| new RecordField("sql_date", RecordFieldType.DATE.getDataType()) |
| )); |
| |
| Map<String, Object> values = new HashMap<>(); |
| PartialRow row = kuduSchema.newPartialRow(); |
| values.put(recordIdName, id); |
| values.put("name", name); |
| values.put("age", age); |
| values.put("updated_at", new Timestamp(System.currentTimeMillis())); |
| values.put("score", 10000L); |
| values.put("airport_code", airport_code); |
| values.put("sql_date", sql_date); |
| processor.buildPartialRow( |
| kuduSchema, |
| row, |
| new MapRecord(schema, values), |
| schema.getFieldNames(), |
| true, |
| lowercaseFields |
| ); |
| return row; |
| } |
| |
| private Tuple<Insert, OperationResponse> insert(boolean success) { |
| Insert insert = mock(Insert.class); |
| OperationResponse response = mock(OperationResponse.class, Mockito.RETURNS_DEEP_STUBS); |
| when(response.hasRowError()).thenReturn(!success); |
| if (!success) { |
| when(response.getRowError().getOperation()).thenReturn(insert); |
| } |
| return new Tuple<>(insert, response); |
| } |
| |
| enum ResultCode { |
| OK, |
| FAIL, |
| EXCEPTION |
| } |
| |
| private LinkedList<OperationResponse> queueInsert(MockPutKudu putKudu, KuduSession session, boolean sync, ResultCode... results) throws Exception { |
| LinkedList<OperationResponse> responses = new LinkedList<>(); |
| for (ResultCode result : results) { |
| boolean ok = result == OK; |
| Tuple<Insert, OperationResponse> tuple = insert(ok); |
| putKudu.queue(tuple.getKey()); |
| |
| if (result == EXCEPTION) { |
| when(session.apply(tuple.getKey())).thenThrow(mock(KuduException.class)); |
| // Stop processing the rest of the records on the first exception |
| break; |
| } else { |
| responses.add(tuple.getValue()); |
| if (sync) { |
| when(session.apply(tuple.getKey())).thenReturn(ok ? null : tuple.getValue()); |
| |
| // In AUTO_FLUSH_SYNC mode, PutKudu immediately knows when an operation has failed. |
| // In that case, it does not process the rest of the records in the FlowFile. |
| if (result == FAIL) break; |
| } |
| } |
| } |
| return responses; |
| } |
| |
| private static <T> void stubSlices(OngoingStubbing<T> stubbing, List<T> slices) { |
| for (T slice : slices) { |
| stubbing = stubbing.thenReturn(slice); |
| } |
| } |
| |
| private void testKuduPartialFailure(FlushMode flushMode, int batchSize) throws Exception { |
| final int numFlowFiles = 4; |
| final int numRecordsPerFlowFile = 3; |
| final ResultCode[][] flowFileResults = new ResultCode[][]{ |
| new ResultCode[]{OK, OK, FAIL}, |
| |
| // The last operation will not be submitted to Kudu if flush mode is AUTO_FLUSH_SYNC |
| new ResultCode[]{OK, FAIL, OK}, |
| |
| // Everything's okay |
| new ResultCode[]{OK, OK, OK}, |
| |
| // The last operation will not be submitted due to an exception from apply() call |
| new ResultCode[]{OK, EXCEPTION, OK}, |
| }; |
| |
| KuduSession session = mock(KuduSession.class); |
| when(session.getFlushMode()).thenReturn(flushMode); |
| MockPutKudu putKudu = new MockPutKudu(session); |
| |
| List<List<OperationResponse>> flowFileResponses = new ArrayList<>(); |
| boolean sync = flushMode == FlushMode.AUTO_FLUSH_SYNC; |
| for (ResultCode[] results : flowFileResults) { |
| flowFileResponses.add(queueInsert(putKudu, session, sync, results)); |
| } |
| |
| switch (flushMode) { |
| case AUTO_FLUSH_SYNC: |
| // flush() or close() returns an empty list |
| when(session.close()).thenReturn(Collections.emptyList()); |
| break; |
| case AUTO_FLUSH_BACKGROUND: |
| // close() will be called for each batch of FlowFiles, however we do not check |
| // the return value of it. Instead, we should check the pending errors of the session |
| // as buffered operations may have already been flushed. |
| when(session.close()).thenReturn(Collections.emptyList()); |
| |
| List<RowErrorsAndOverflowStatus> pendingErrorResponses = new ArrayList<>(); |
| while (!flowFileResponses.isEmpty()) { |
| int sliceSize = Math.min(batchSize, flowFileResponses.size()); |
| List<List<OperationResponse>> slice = flowFileResponses.subList(0, sliceSize); |
| |
| RowErrorsAndOverflowStatus pendingErrorResponse = mock(RowErrorsAndOverflowStatus.class); |
| RowError[] rowErrors = slice.stream() |
| .flatMap(List::stream) |
| .filter(OperationResponse::hasRowError) |
| .map(OperationResponse::getRowError) |
| .toArray(RowError[]::new); |
| when(pendingErrorResponse.getRowErrors()).thenReturn(rowErrors); |
| pendingErrorResponses.add(pendingErrorResponse); |
| |
| flowFileResponses = flowFileResponses.subList(sliceSize, flowFileResponses.size()); |
| } |
| |
| stubSlices(when(session.getPendingErrors()), pendingErrorResponses); |
| break; |
| case MANUAL_FLUSH: |
| // close() will be called at the end of a batch, but flush() will also be called |
| // whenever the mutation buffer of KuduSession becomes full. In PutKudu, we set |
| // the size of the mutation buffer to match batchSize, so flush() is called only |
| // when a FlowFile more than one record. |
| List<List<OperationResponse>> flushes = new ArrayList<>(); |
| List<List<OperationResponse>> closes = new ArrayList<>(); |
| |
| while (!flowFileResponses.isEmpty()) { |
| int sliceSize = Math.min(batchSize, flowFileResponses.size()); |
| List<List<OperationResponse>> slice = flowFileResponses.subList(0, sliceSize); |
| flowFileResponses = flowFileResponses.subList(sliceSize, flowFileResponses.size()); |
| |
| List<OperationResponse> batch = new ArrayList<>(); |
| for (OperationResponse response : slice.stream().flatMap(List::stream).collect(Collectors.toList())) { |
| if (batch.size() == batchSize) { |
| flushes.add(batch); |
| batch = new ArrayList<>(); |
| } |
| batch.add(response); |
| } |
| if (flowFileResponses.isEmpty() && batch.size() == batchSize) { |
| // To handle the case where PutKudu ends the batch with flush() |
| // instead of close() due to the exception from the subsequent apply call. |
| flushes.add(batch); |
| } else if (batch.size() > 0) { |
| closes.add(batch); |
| } |
| |
| if (!flushes.isEmpty()) stubSlices(when(session.flush()), flushes); |
| if (!closes.isEmpty()) stubSlices(when(session.close()), closes); |
| } |
| break; |
| } |
| |
| testRunner = TestRunners.newTestRunner(putKudu); |
| createRecordReader(numRecordsPerFlowFile); |
| setUpTestRunner(testRunner); |
| testRunner.setProperty(PutKudu.FLUSH_MODE, flushMode.name()); |
| testRunner.setProperty(PutKudu.BATCH_SIZE, String.valueOf(batchSize)); |
| testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, String.valueOf(batchSize)); |
| |
| IntStream.range(0, numFlowFiles).forEach(i -> testRunner.enqueue("")); |
| testRunner.run(numFlowFiles); |
| |
| testRunner.assertTransferCount(PutKudu.REL_FAILURE, 3); |
| |
| List<MockFlowFile> failedFlowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_FAILURE); |
| failedFlowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "2"); |
| failedFlowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, sync ? "1" : "2"); |
| failedFlowFiles.get(2).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "1"); |
| |
| testRunner.assertTransferCount(PutKudu.REL_SUCCESS, 1); |
| testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "3"); |
| } |
| |
| private void testKuduPartialFailure(FlushMode flushMode) throws Exception { |
| // Test against different batch sizes (up until the point where every record can be buffered at once) |
| for (int i = 1; i <= 11; i++) { |
| testKuduPartialFailure(flushMode, i); |
| } |
| } |
| |
| @Test |
| public void testKuduPartialFailuresOnAutoFlushSync() throws Exception { |
| testKuduPartialFailure(FlushMode.AUTO_FLUSH_SYNC); |
| } |
| |
| @Test |
| public void testKuduPartialFailuresOnAutoFlushBackground() throws Exception { |
| testKuduPartialFailure(FlushMode.AUTO_FLUSH_BACKGROUND); |
| } |
| |
| @Test |
| public void testKuduPartialFailuresOnManualFlush() throws Exception { |
| testKuduPartialFailure(FlushMode.MANUAL_FLUSH); |
| } |
| |
| public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService { |
| private final String keytab; |
| private final String principal; |
| |
| public MockKerberosCredentialsService(final String keytab, final String principal) { |
| this.keytab = keytab; |
| this.principal = principal; |
| } |
| |
| @Override |
| public String getKeytab() { |
| return keytab; |
| } |
| |
| @Override |
| public String getPrincipal() { |
| return principal; |
| } |
| } |
| } |