blob: ca273b61b38a54a09b97f79d4b24a8d2e32ce43e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
"to the specified Kudu's table. The schema for the Kudu table is inferred from the schema of the Record Reader." +
" If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu")
public class PutKudu extends AbstractKuduProcessor {
static final AllowableValue FAILURE_STRATEGY_ROUTE = new AllowableValue("route-to-failure", "Route to Failure",
"The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship");
static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("rollback", "Rollback Session",
"If any Record cannot be inserted, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, " +
"it will block any subsequent data from be pushed to Kudu as well until the issue is resolved. However, this may be advantageous if a strict ordering is required.");
protected static final PropertyDescriptor TABLE_NAME = new Builder()
.name("Table Name")
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor FAILURE_STRATEGY = new Builder()
.name("Failure Strategy")
.displayName("Failure Strategy")
.description("If one or more Records in a batch cannot be transferred to Kudu, specifies how to handle the failure")
.required(true)
.allowableValues(FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK)
.defaultValue(FAILURE_STRATEGY_ROUTE.getValue())
.build();
protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
.name("Skip head line")
.description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
"(e.g. \"Treat First Line as Header\" property of CSVReader)")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor LOWERCASE_FIELD_NAMES = new Builder()
.name("Lowercase Field Names")
.description("Convert column names to lowercase when finding index of Kudu table columns")
.defaultValue("false")
.required(true)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new Builder()
.name("Handle Schema Drift")
.description("If set to true, when fields with names that are not in the target Kudu table " +
"are encountered, the Kudu table will be altered to include new columns for those fields.")
.defaultValue("false")
.required(true)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor DATA_RECORD_PATH = new Builder()
.name("Data RecordPath")
.displayName("Data RecordPath")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
" Kudu instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to Kudu.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.build();
static final PropertyDescriptor OPERATION_RECORD_PATH = new Builder()
.name("Operation RecordPath")
.displayName("Operation RecordPath")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " +
"RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property" +
" will be ignored.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.build();
protected static final Validator OperationTypeValidator = new Validator() {
@Override
public ValidationResult validate(String subject, String value, ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value)
.explanation("Expression Language Present").valid(true).build();
}
boolean valid;
try {
OperationType.valueOf(value.toUpperCase());
valid = true;
} catch (IllegalArgumentException ex) {
valid = false;
}
final String explanation = valid ? null :
"Value must be one of: " +
Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "));
return new ValidationResult.Builder().subject(subject).input(value).valid(valid)
.explanation(explanation).build();
}
};
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.displayName("Kudu Operation Type")
.description("Specify operationType for this processor.\n" +
"Valid values are: " +
Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")) +
". This Property will be ignored if the <Operation RecordPath> property is set.")
.defaultValue(OperationType.INSERT.toString())
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(OperationTypeValidator)
.build();
protected static final PropertyDescriptor FLUSH_MODE = new Builder()
.name("Flush Mode")
.description("Set the new flush mode for a kudu session.\n" +
"AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
"AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" +
" operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" +
"MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.")
.allowableValues(SessionConfiguration.FlushMode.values())
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new Builder()
.name("FlowFiles per Batch")
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
"Depending on your memory size, and data size per row set an appropriate batch size " +
"for the number of FlowFiles to process per client connection setup." +
"Gradually increase this number, only if your FlowFiles typically contain a few records.")
.defaultValue("1")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new Builder()
.name("Batch Size")
.displayName("Max Records per Batch")
.description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. " +
"Depending on your memory size, and data size per row set an appropriate batch size. " +
"Gradually increase this number to find out the best one for best performances.")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor IGNORE_NULL = new Builder()
.name("Ignore NULL")
.description("Ignore NULL on Kudu Put Operation, Update only non-Null columns if set true")
.defaultValue("false")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
.build();
protected static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to Kudu")
.build();
public static final String RECORD_COUNT_ATTR = "record.count";
// Properties set in onScheduled.
private volatile int batchSize = 100;
private volatile int ffbatch = 1;
private volatile SessionConfiguration.FlushMode flushMode;
private volatile Function<Record, OperationType> recordPathOperationType;
private volatile RecordPath dataRecordPath;
private volatile String failureStrategy;
private volatile boolean supportsInsertIgnoreOp;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KUDU_MASTERS);
properties.add(TABLE_NAME);
properties.add(FAILURE_STRATEGY);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_PASSWORD);
properties.add(SKIP_HEAD_LINE);
properties.add(LOWERCASE_FIELD_NAMES);
properties.add(HANDLE_SCHEMA_DRIFT);
properties.add(RECORD_READER);
properties.add(DATA_RECORD_PATH);
properties.add(OPERATION_RECORD_PATH);
properties.add(INSERT_OPERATION);
properties.add(FLUSH_MODE);
properties.add(FLOWFILE_BATCH_SIZE);
properties.add(BATCH_SIZE);
properties.add(IGNORE_NULL);
properties.add(KUDU_OPERATION_TIMEOUT_MS);
properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
properties.add(WORKER_COUNT);
properties.add(KUDU_SASL_PROTOCOL_NAME);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
return rels;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws LoginException {
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKerberosUserAndOrKuduClient(context);
supportsInsertIgnoreOp = supportsIgnoreOperations();
final String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue();
if (operationRecordPathValue == null) {
recordPathOperationType = null;
} else {
final RecordPath recordPath = RecordPath.compile(operationRecordPathValue);
recordPathOperationType = new RecordPathOperationType(recordPath);
}
final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
failureStrategy = context.getProperty(FAILURE_STRATEGY).getValue();
}
private boolean isRollbackOnFailure() {
return FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(failureStrategy);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(ffbatch);
if (flowFiles.isEmpty()) {
return;
}
final KerberosUser user = getKerberosUser();
if (user == null) {
executeOnKuduClient(kuduClient -> processFlowFiles(context, session, flowFiles, kuduClient));
return;
}
final PrivilegedExceptionAction<Void> privilegedAction = () -> {
executeOnKuduClient(kuduClient -> processFlowFiles(context, session, flowFiles, kuduClient));
return null;
};
final KerberosAction<Void> action = new KerberosAction<>(user, privilegedAction, getLogger());
action.execute();
}
private void processFlowFiles(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles, final KuduClient kuduClient) {
final Map<FlowFile, Integer> processedRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
final List<RowError> pendingRowErrors = new ArrayList<>();
final KuduSession kuduSession = createKuduSession(kuduClient);
try {
processRecords(flowFiles,
processedRecords,
flowFileFailures,
operationFlowFileMap,
pendingRowErrors,
session,
context,
kuduClient,
kuduSession);
} finally {
try {
flushKuduSession(kuduSession, true, pendingRowErrors);
} catch (final KuduException|RuntimeException e) {
getLogger().error("KuduSession.close() Failed", e);
}
}
if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) {
logFailures(pendingRowErrors, operationFlowFileMap);
session.rollback();
context.yield();
} else {
transferFlowFiles(flowFiles, processedRecords, flowFileFailures, operationFlowFileMap, pendingRowErrors, session);
}
}
private void processRecords(final List<FlowFile> flowFiles,
final Map<FlowFile, Integer> processedRecords,
final Map<FlowFile, Object> flowFileFailures,
final Map<Operation, FlowFile> operationFlowFileMap,
final List<RowError> pendingRowErrors,
final ProcessSession session,
final ProcessContext context,
final KuduClient kuduClient,
final KuduSession kuduSession) {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
int bufferedRecords = 0;
OperationType prevOperationType = OperationType.INSERT;
for (FlowFile flowFile : flowFiles) {
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
final boolean ignoreNull = Boolean.parseBoolean(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
final boolean lowercaseFields = Boolean.parseBoolean(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
final boolean handleSchemaDrift = Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
final Function<Record, OperationType> operationTypeFunction;
if (recordPathOperationType == null) {
final OperationType staticOperationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
operationTypeFunction = record -> staticOperationType;
} else {
operationTypeFunction = recordPathOperationType;
}
final RecordSet recordSet = recordReader.createRecordSet();
KuduTable kuduTable = kuduClient.openTable(tableName);
// Get the first record so that we can evaluate the Kudu table for Schema drift.
Record record = recordSet.next();
// If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
if (handleSchemaDrift) {
final boolean driftDetected = handleSchemaDrift(kuduTable, kuduClient, flowFile, record, lowercaseFields);
if (driftDetected) {
// Re-open the table to get the new schema.
kuduTable = kuduClient.openTable(tableName);
}
}
recordReaderLoop: while (record != null) {
final OperationType operationType = operationTypeFunction.apply(record);
final List<Record> dataRecords;
if (dataRecordPath == null) {
dataRecords = Collections.singletonList(record);
} else {
final RecordPathResult result = dataRecordPath.evaluate(record);
final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
if (fieldValues.isEmpty()) {
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
}
for (final FieldValue fieldValue : fieldValues) {
final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
if (fieldType != RecordFieldType.RECORD) {
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
" " + fieldType);
}
}
dataRecords = new ArrayList<>(fieldValues.size());
for (final FieldValue fieldValue : fieldValues) {
dataRecords.add((Record) fieldValue.getValue());
}
}
for (final Record dataRecord : dataRecords) {
// If supportsIgnoreOps is false, in the case of INSERT_IGNORE the Kudu session
// is modified to ignore row errors.
// Because the session is shared across flow files, for batching efficiency, we
// need to flush when changing to and from INSERT_IGNORE operation types.
// This should be removed when the lowest supported version of Kudu supports
// ignore operations.
if (!supportsInsertIgnoreOp && prevOperationType != operationType
&& (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
flushKuduSession(kuduSession, false, pendingRowErrors);
kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
}
prevOperationType = operationType;
final List<String> fieldNames = dataRecord.getSchema().getFieldNames();
Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
// Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
// but the buffer is too big" error. This can happen when flush mode is
// MANUAL_FLUSH and a FlowFile has more than one records.
if (bufferedRecords == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
bufferedRecords = 0;
flushKuduSession(kuduSession, false, pendingRowErrors);
}
// OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
OperationResponse response = kuduSession.apply(operation);
if (response != null && response.hasRowError()) {
// Stop processing the records on the first error.
// Note that Kudu does not support rolling back of previous operations.
flowFileFailures.put(flowFile, response.getRowError());
break recordReaderLoop;
}
bufferedRecords++;
processedRecords.merge(flowFile, 1, Integer::sum);
}
record = recordSet.next();
}
} catch (Exception ex) {
getLogger().error("Failed to push {} to Kudu", new Object[] {flowFile}, ex);
flowFileFailures.put(flowFile, ex);
}
}
}
private boolean handleSchemaDrift(final KuduTable kuduTable, final KuduClient kuduClient, final FlowFile flowFile, final Record record, final boolean lowercaseFields) {
if (record == null) {
getLogger().debug("No Record to evaluate schema drift against for {}", flowFile);
return false;
}
final String tableName = kuduTable.getName();
final Schema schema = kuduTable.getSchema();
final List<RecordField> recordFields;
if (dataRecordPath == null) {
recordFields = record.getSchema().getFields();
} else {
final RecordPathResult recordPathResult = dataRecordPath.evaluate(record);
final List<FieldValue> fieldValues = recordPathResult.getSelectedFields().collect(Collectors.toList());
recordFields = new ArrayList<>();
for (final FieldValue fieldValue : fieldValues) {
final RecordField recordField = fieldValue.getField();
if (recordField.getDataType().getFieldType() == RecordFieldType.RECORD) {
final Object value = fieldValue.getValue();
if (value instanceof Record) {
recordFields.addAll(((Record) value).getSchema().getFields());
}
} else {
recordFields.add(recordField);
}
}
}
final List<RecordField> missing = recordFields.stream()
.filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
.collect(Collectors.toList());
if (missing.isEmpty()) {
getLogger().debug("No schema drift detected for {}", flowFile);
return false;
}
getLogger().info("Adding {} columns to table '{}' to handle schema drift", missing.size(), tableName);
// Add each column one at a time to avoid failing if some of the missing columns
// we created by a concurrent thread or application attempting to handle schema drift.
for (final RecordField field : missing) {
try {
final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
} catch (final KuduException e) {
// Ignore the exception if the column already exists due to concurrent
// threads or applications attempting to handle schema drift.
if (e.getStatus().isAlreadyPresent()) {
getLogger().info("Column already exists in table '{}' while handling schema drift", tableName);
} else {
throw new ProcessException(e);
}
}
}
return true;
}
private void transferFlowFiles(final List<FlowFile> flowFiles,
final Map<FlowFile, Integer> processedRecords,
final Map<FlowFile, Object> flowFileFailures,
final Map<Operation, FlowFile> operationFlowFileMap,
final List<RowError> pendingRowErrors,
final ProcessSession session) {
// Find RowErrors for each FlowFile
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream()
.filter(e -> operationFlowFileMap.get(e.getOperation()) != null)
.collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))
);
long totalCount = 0L;
for (FlowFile flowFile : flowFiles) {
final int count = processedRecords.getOrDefault(flowFile, 0);
totalCount += count;
final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
if (rowErrors != null) {
rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", rowError.toString()));
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, Integer.toString(count - rowErrors.size()));
totalCount -= rowErrors.size(); // Don't include error rows in the the counter.
session.transfer(flowFile, REL_FAILURE);
} else {
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));
if (flowFileFailures.containsKey(flowFile)) {
getLogger().error("Failed to write due to {}", flowFileFailures.get(flowFile));
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, "Successfully added FlowFile to Kudu");
}
}
}
session.adjustCounter("Records Inserted", totalCount, false);
}
private void logFailures(final List<RowError> pendingRowErrors, final Map<Operation, FlowFile> operationFlowFileMap) {
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrors.entrySet()) {
final FlowFile flowFile = entry.getKey();
final List<RowError> errors = entry.getValue();
getLogger().error("Could not write {} to Kudu due to: {}", flowFile, errors);
}
}
private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
PropertyValue evaluatedProperty = context.getProperty(property).evaluateAttributeExpressions(flowFile);
if (property.isRequired() && evaluatedProperty == null) {
throw new ProcessException(String.format("Property `%s` is required but evaluated to null", property.getDisplayName()));
}
return evaluatedProperty.getValue();
}
protected KuduSession createKuduSession(final KuduClient client) {
final KuduSession kuduSession = client.newSession();
kuduSession.setMutationBufferSpace(batchSize);
kuduSession.setFlushMode(flushMode);
return kuduSession;
}
protected Operation createKuduOperation(OperationType operationType, Record record,
List<String> fieldNames, boolean ignoreNull,
boolean lowercaseFields, KuduTable kuduTable) {
Operation operation;
switch (operationType) {
case INSERT:
operation = kuduTable.newInsert();
break;
case INSERT_IGNORE:
// If the target Kudu cluster does not support ignore operations use an insert.
// The legacy session based insert ignore will be used instead.
if (!supportsInsertIgnoreOp) {
operation = kuduTable.newInsert();
} else {
operation = kuduTable.newInsertIgnore();
}
break;
case UPSERT:
operation = kuduTable.newUpsert();
break;
case UPDATE:
operation = kuduTable.newUpdate();
break;
case UPDATE_IGNORE:
operation = kuduTable.newUpdateIgnore();
break;
case DELETE:
operation = kuduTable.newDelete();
break;
case DELETE_IGNORE:
operation = kuduTable.newDeleteIgnore();
break;
default:
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
}
buildPartialRow(kuduTable.getSchema(), operation.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return operation;
}
private static class RecordPathOperationType implements Function<Record, OperationType> {
private final RecordPath recordPath;
public RecordPathOperationType(final RecordPath recordPath) {
this.recordPath = recordPath;
}
@Override
public OperationType apply(final Record record) {
final RecordPathResult recordPathResult = recordPath.evaluate(record);
final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
if (resultList.isEmpty()) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record but got no results");
}
if (resultList.size() > 1) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record and received multiple distinct results (" + resultList + ")");
}
final String resultValue = String.valueOf(resultList.get(0).getValue());
try {
return OperationType.valueOf(resultValue.toUpperCase());
} catch (final IllegalArgumentException iae) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue);
}
}
}
}