blob: ad5555ebad02aa8f9f8a179e08d9b7fdb307d36e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.streaming.ConnectionError;
import org.apache.hive.streaming.HiveRecordWriter;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.InvalidTable;
import org.apache.hive.streaming.RecordsEOFException;
import org.apache.hive.streaming.SerializationError;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
import org.apache.hive.streaming.StreamingIOFailure;
import org.apache.hive.streaming.TransactionError;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.ValidationResources;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
@Tags({"hive", "streaming", "put", "database", "store"})
@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. If 'Static Partition Values' is not set, then "
+ "the partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
+ "each record should be field A. If 'Static Partition Values' is set, those values will be used as the partition values, and any record fields corresponding to "
+ "partition columns will be ignored.")
@WritesAttributes({
@WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
@WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
})
@RequiresInstanceClassLoading
public class PutHive3Streaming extends AbstractProcessor {
// Attributes
public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
// Properties
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
.name("hive3-stream-metastore-uri")
.displayName("Hive Metastore URI")
.description("The URI location(s) for the Hive metastore. This is a comma-separated list of Hive metastore URIs; note that this is not the location of the Hive Server. "
+ "The default port for the Hive metastore is 9043. If this field is not set, then the 'hive.metastore.uris' property from any provided configuration resources "
+ "will be used, and if none are provided, then the default value from a default hive-site.xml will be used (usually thrift://localhost:9083).")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URI_LIST_VALIDATOR)
.build();
static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("hive3-config-resources")
.displayName("Hive Configuration Resources")
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set "
+ "to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. "
+ "Please see the Hive documentation for more details.")
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
.name("hive3-stream-database-name")
.displayName("Database Name")
.description("The name of the database in which to put the data.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hive3-stream-table-name")
.displayName("Table Name")
.description("The name of the database table in which to put the data.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.name("hive3-stream-part-vals")
.displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
+ "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
+ "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
+ "${name},${age}. If this property is set, the values will be used as the partition values, and any record fields corresponding to "
+ "partition columns will be ignored. If this property is not set, then the partition values are expected to be the last fields of each record.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder()
.name("hive3-stream-records-per-transaction")
.displayName("Records per Transaction")
.description("Number of records to process before committing the transaction. If set to zero (0), all records will be written in a single transaction.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("0")
.build();
static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
.name("hive3-stream-transactions-per-batch")
.displayName("Transactions per Batch")
.description("A hint to Hive Streaming indicating how many transactions the processor task will need. The product of Records per Transaction (if not zero) "
+ "and Transactions per Batch should be larger than the largest expected number of records in the flow file(s), this will ensure any failed "
+ "transaction batches cause a full rollback.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive3-stream-call-timeout")
.displayName("Call Timeout")
.description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. "
+ "Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder()
.name("hive3-stream-disable-optimizations")
.displayName("Disable Streaming Optimizations")
.description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
"NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
" (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
" then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
" Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
.displayName("Kerberos Principal")
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
.name("kerberos-password")
.displayName("Kerberos Password")
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
+ "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
+ "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
+ "can be used to provide a retry capability since full rollback is not possible.")
.build();
private List<PropertyDescriptor> propertyDescriptors;
private Set<Relationship> relationships;
protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
protected volatile UserGroupInformation ugi;
final protected AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
protected volatile HiveConf hiveConfig;
protected volatile int callTimeout;
protected ExecutorService callTimeoutPool;
protected volatile boolean rollbackOnFailure;
// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
@Override
protected void init(ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(RECORD_READER);
props.add(METASTORE_URI);
props.add(HIVE_CONFIGURATION_RESOURCES);
props.add(DB_NAME);
props.add(TABLE_NAME);
props.add(STATIC_PARTITION_VALUES);
props.add(RECORDS_PER_TXN);
props.add(TXNS_PER_BATCH);
props.add(CALL_TIMEOUT);
props.add(DISABLE_STREAMING_OPTIMIZATIONS);
props.add(ROLLBACK_ON_FAILURE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
propertyDescriptors = Collections.unmodifiableList(props);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
_relationships.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
final List<ValidationResult> problems = new ArrayList<>();
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String explicitPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : explicitPrincipal;
final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
if (confFileProvided) {
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder, getLogger()));
}
if (credentialsService != null && (explicitPrincipal != null || explicitPassword != null)) {
problems.add(new ValidationResult.Builder()
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
.valid(false)
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
.build());
}
return problems;
}
@OnScheduled
public void setup(final ProcessContext context) throws IOException {
ComponentLog log = getLogger();
rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
// If more than one concurrent task, force 'hcatalog.hive.client.cache.disabled' to true
if (context.getMaxConcurrentTasks() > 1) {
hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
}
// add any dynamic properties to the Hive configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
hiveConfig.set(descriptor.getName(), entry.getValue());
}
}
hiveConfigurator.preload(hiveConfig);
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
final String resolvedPrincipal = credentialsService != null ? credentialsService.getPrincipal() : explicitPrincipal;
final String resolvedKeytab = credentialsService != null ? credentialsService.getKeytab() : null;
if (resolvedKeytab != null) {
kerberosUserReference.set(new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
} else if (explicitPassword != null) {
kerberosUserReference.set(new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
log.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{resolvedPrincipal});
} else {
throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
}
try {
ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get());
} catch (AuthenticationFailedException ae) {
log.error(ae.getMessage(), ae);
throw new ProcessException(ae);
}
log.info("Successfully logged in as principal " + resolvedPrincipal);
} else {
ugi = SecurityUtil.loginSimple(hiveConfig);
kerberosUserReference.set(null);
}
callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000; // milliseconds
String timeoutName = "put-hive3-streaming-%d";
this.callTimeoutPool = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
}
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
getUgi().doAs((PrivilegedAction<Void>) () -> {
FlowFile flowFile = session.get();
if (flowFile == null) {
return null;
}
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final ComponentLog log = getLogger();
String metastoreURIs = null;
if (context.getProperty(METASTORE_URI).isSet()) {
metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(metastoreURIs)) {
// Shouldn't be empty at this point, log an error, penalize the flow file, and return
log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure");
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
}
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
// Override the Hive Metastore URIs in the config if set by the user
if (metastoreURIs != null) {
hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
}
final int recordsPerTransaction = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
final int transactionsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger();
HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
.withHiveConf(hiveConfig)
.withCallTimeout(callTimeout)
.withStreamingOptimizations(!disableStreamingOptimizations)
.withTransactionBatchSize(transactionsPerBatch);
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
o = o.withStaticPartitionValues(staticPartitionValues);
}
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String resolvedPrincipal;
if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
o = o.withKerberosKeytab(credentialsService.getKeytab());
} else resolvedPrincipal = explicitPrincipal;
o = o.withKerberosPrincipal(resolvedPrincipal);
}
final HiveOptions options = o;
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
StreamingConnection hiveStreamingConnection = null;
try {
final RecordReader reader;
try (final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
} catch (Exception e) {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
}
hiveStreamingConnection = makeStreamingConnection(options, reader, recordsPerTransaction);
// Write records to Hive streaming, then commit and close
boolean exitLoop = false;
while (!exitLoop) {
hiveStreamingConnection.beginTransaction();
// The HiveRecordWriter keeps track of records per transaction and will complete writing for the transaction
// once the limit has been reached. It is then reset for the next iteration of the loop.
try {
hiveStreamingConnection.write(in);
} catch (RecordsEOFException reofe) {
exitLoop = true;
}
hiveStreamingConnection.commitTransaction();
}
in.close();
Map<String, String> updateAttributes = new HashMap<>();
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
} catch (TransactionError te) {
if (rollbackOnFailure) {
throw new ProcessException(te.getLocalizedMessage(), te);
} else {
throw new ShouldRetryException(te.getLocalizedMessage(), te);
}
} catch (RecordReaderFactoryException rrfe) {
if (rollbackOnFailure) {
throw new ProcessException(rrfe);
} else {
log.error(
"Failed to create {} for {} - routing to failure",
new Object[]{RecordReader.class.getSimpleName(), flowFile},
rrfe
);
session.transfer(flowFile, REL_FAILURE);
return null;
}
}
session.transfer(flowFile, REL_SUCCESS);
} catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
if (rollbackOnFailure) {
if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection);
}
throw new ProcessException(e.getLocalizedMessage(), e);
} else {
Map<String, String> updateAttributes = new HashMap<>();
final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0";
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute);
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
log.error(
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
e
);
session.transfer(flowFile, REL_FAILURE);
}
} catch (DiscontinuedException e) {
// The input FlowFile processing is discontinued. Keep it in the input queue.
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
session.transfer(flowFile, Relationship.SELF);
} catch (ConnectionError ce) {
// If we can't connect to the metastore, yield the processor
context.yield();
throw new ProcessException("A connection to metastore cannot be established", ce);
} catch (ShouldRetryException e) {
// This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
getLogger().error(e.getLocalizedMessage(), e);
if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection);
}
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
} catch (StreamingException se) {
// Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
Throwable cause = se.getCause();
if (cause == null) cause = se;
// This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
if (rollbackOnFailure) {
if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection);
}
throw new ProcessException(cause.getLocalizedMessage(), cause);
} else {
flowFile = session.penalize(flowFile);
Map<String, String> updateAttributes = new HashMap<>();
final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0";
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute);
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
log.error(
"Exception while trying to stream {} to hive - routing to failure",
new Object[]{flowFile},
se
);
session.transfer(flowFile, REL_FAILURE);
}
} catch (Throwable t) {
if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection);
}
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
} finally {
closeConnection(hiveStreamingConnection);
// Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader);
}
return null;
});
}
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader, int recordsPerTransaction) throws StreamingException {
return HiveStreamingConnection.newBuilder()
.withDatabase(options.getDatabaseName())
.withTable(options.getTableName())
.withStaticPartitionValues(options.getStaticPartitionValues())
.withHiveConf(options.getHiveConf())
.withRecordWriter(new HiveRecordWriter(reader, getLogger(), recordsPerTransaction))
.withTransactionBatchSize(options.getTransactionBatchSize())
.withAgentInfo("NiFi " + this.getClass().getSimpleName() + " [" + this.getIdentifier()
+ "] thread " + Thread.currentThread().getId() + "[" + Thread.currentThread().getName() + "]")
.connect();
}
@OnStopped
public void cleanup() {
validationResourceHolder.set(null); // trigger re-validation of resources
ComponentLog log = getLogger();
if (callTimeoutPool != null) {
callTimeoutPool.shutdown();
try {
while (!callTimeoutPool.isTerminated()) {
callTimeoutPool.awaitTermination(callTimeout, TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
log.warn("shutdown interrupted on " + callTimeoutPool, t);
}
callTimeoutPool = null;
}
ugi = null;
kerberosUserReference.set(null);
}
private void abortAndCloseConnection(StreamingConnection connection) {
try {
abortConnection(connection);
closeConnection(connection);
} catch (Exception ie) {
getLogger().warn("unable to close hive connections. ", ie);
}
}
/**
* Abort current Txn on the connection
*/
private void abortConnection(StreamingConnection connection) {
if (connection != null) {
try {
connection.abortTransaction();
} catch (Exception e) {
getLogger().error("Failed to abort Hive Streaming transaction " + connection + " due to exception ", e);
}
}
}
/**
* Close the streaming connection
*/
private void closeConnection(StreamingConnection connection) {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
getLogger().error("Failed to close Hive Streaming connection " + connection + " due to exception ", e);
}
}
}
private static class ShouldRetryException extends RuntimeException {
private ShouldRetryException(String message, Throwable cause) {
super(message, cause);
}
}
UserGroupInformation getUgi() {
getLogger().trace("getting UGI instance");
if (kerberosUserReference.get() != null) {
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
KerberosUser kerberosUser = kerberosUserReference.get();
getLogger().debug("kerberosUser is " + kerberosUser);
try {
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
kerberosUser.checkTGTAndRelogin();
} catch (final KerberosLoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
}
return ugi;
}
}