blob: 27dc6f248e22062275341280e185393f5f4d3ab9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.streaming;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Streaming connection implementation for hive. To create a streaming connection, use the builder API
* to create record writer first followed by the connection itself. Once connection is created, clients can
* begin a transaction, keep writing using the connection, commit the transaction and close connection when done.
* To bind to the correct metastore, HiveConf object has to be created from hive-site.xml or HIVE_CONF_DIR.
* If hive conf is manually created, metastore uri has to be set correctly. If hive conf object is not specified,
* "thrift://localhost:9083" will be used as default.
* <br><br>
* NOTE: The streaming connection APIs and record writer APIs are not thread-safe. Streaming connection creation,
* begin/commit/abort transactions, write and close has to be called in the same thread. If close() or
* abortTransaction() has to be triggered from a separate thread it has to be co-ordinated via external variables or
* synchronization mechanism
* <br><br>
* Example usage:
* <pre>{@code
* // create delimited record writer whose schema exactly matches table schema
* StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
* .withFieldDelimiter(',')
* .build();
* // create and open streaming connection (default.src table has to exist already)
* StreamingConnection connection = HiveStreamingConnection.newBuilder()
* .withDatabase("default")
* .withTable("src")
* .withAgentInfo("nifi-agent")
* .withRecordWriter(writer)
* .withHiveConf(hiveConf)
* .connect();
* // begin a transaction, write records and commit 1st transaction
* connection.beginTransaction();
* connection.write("key1,val1".getBytes());
* connection.write("key2,val2".getBytes());
* connection.commitTransaction();
* // begin another transaction, write more records and commit 2nd transaction
* connection.beginTransaction();
* connection.write("key3,val3".getBytes());
* connection.write("key4,val4".getBytes());
* connection.commitTransaction();
* // close the streaming connection
* connection.close();
* }
* </pre>
*/
public class HiveStreamingConnection implements StreamingConnection {
private static final Logger LOG = LoggerFactory.getLogger(HiveStreamingConnection.class.getName());
private static final String DEFAULT_METASTORE_URI = "thrift://localhost:9083";
private static final int DEFAULT_TRANSACTION_BATCH_SIZE = 1;
private static final boolean DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED = true;
public enum TxnState {
INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A"),
PREPARED_FOR_COMMIT("P");
private final String code;
TxnState(String code) {
this.code = code;
}
public String toString() {
return code;
}
}
// fields populated from builder
private String database;
private String table;
private List<String> staticPartitionValues;
private String agentInfo;
private int transactionBatchSize;
private RecordWriter recordWriter;
private StreamingTransaction currentTransactionBatch;
private HiveConf conf;
private boolean streamingOptimizations;
private AtomicBoolean isConnectionClosed = new AtomicBoolean(false);
// internal fields
private boolean isPartitionedTable;
private IMetaStoreClient msClient;
private IMetaStoreClient heartbeatMSClient;
private final String username;
private final boolean secureMode;
private Table tableObject = null;
private String metastoreUri;
private ConnectionStats connectionStats;
private final Long writeId;
private final Integer statementId;
private boolean manageTransactions;
private int countTransactions = 0;
private Set<String> partitions;
private Map<String, WriteDirInfo> writePaths;
private Runnable onShutdownRunner;
private HiveStreamingConnection(Builder builder) throws StreamingException {
this.database = builder.database.toLowerCase();
this.table = builder.table.toLowerCase();
this.staticPartitionValues = builder.staticPartitionValues;
this.conf = builder.hiveConf;
this.agentInfo = builder.agentInfo;
this.streamingOptimizations = builder.streamingOptimizations;
this.writeId = builder.writeId;
this.statementId = builder.statementId;
this.tableObject = builder.tableObject;
this.setPartitionedTable(builder.isPartitioned);
this.manageTransactions = builder.manageTransactions;
this.writePaths = new HashMap<>();
UserGroupInformation loggedInUser = null;
try {
loggedInUser = UserGroupInformation.getLoginUser();
} catch (IOException e) {
LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage());
}
if (loggedInUser == null) {
this.username = System.getProperty("user.name");
this.secureMode = false;
} else {
this.username = loggedInUser.getShortUserName();
this.secureMode = loggedInUser.hasKerberosCredentials();
}
this.transactionBatchSize = builder.transactionBatchSize;
this.recordWriter = builder.recordWriter;
this.connectionStats = new ConnectionStats();
if (agentInfo == null) {
try {
agentInfo = username + ":" + InetAddress.getLocalHost().getHostName() + ":" + Thread.currentThread().getName();
} catch (UnknownHostException e) {
// ignore and use UUID instead
this.agentInfo = UUID.randomUUID().toString();
}
}
if (conf == null) {
conf = createHiveConf(this.getClass(), DEFAULT_METASTORE_URI);
}
overrideConfSettings(conf);
if (manageTransactions) {
this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode,
"streaming-connection");
// We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
// isolated from the other transaction related RPC calls.
this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode,
"streaming-connection-heartbeat");
validateTable();
}
LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private String database;
private String table;
private List<String> staticPartitionValues;
private String agentInfo;
private HiveConf hiveConf;
private int transactionBatchSize = DEFAULT_TRANSACTION_BATCH_SIZE;
private boolean streamingOptimizations = DEFAULT_STREAMING_OPTIMIZATIONS_ENABLED;
private RecordWriter recordWriter;
private long writeId = -1;
private int statementId = -1;
private boolean manageTransactions = true;
private Table tableObject;
private boolean isPartitioned;
/**
* Specify database to use for streaming connection.
*
* @param database - db name
* @return - builder
*/
public Builder withDatabase(final String database) {
this.database = database;
return this;
}
/**
* Specify table to use for streaming connection.
*
* @param table - table name
* @return - builder
*/
public Builder withTable(final String table) {
this.table = table;
return this;
}
/**
* Specify the name of partition to use for streaming connection.
*
* @param staticPartitionValues - static partition values
* @return - builder
*/
public Builder withStaticPartitionValues(final List<String> staticPartitionValues) {
this.staticPartitionValues = staticPartitionValues == null ? null : new ArrayList<>(staticPartitionValues);
return this;
}
/**
* Specify agent info to use for streaming connection.
*
* @param agentInfo - agent info
* @return - builder
*/
public Builder withAgentInfo(final String agentInfo) {
this.agentInfo = agentInfo;
return this;
}
/**
* Specify hive configuration object to use for streaming connection.
* Generate this object by point to already existing hive-site.xml or HIVE_CONF_DIR.
* Make sure if metastore URI has been set correctly else thrift://localhost:9083 will be
* used as default.
*
* @param hiveConf - hive conf object
* @return - builder
*/
public Builder withHiveConf(final HiveConf hiveConf) {
this.hiveConf = hiveConf;
return this;
}
/**
* Transaction batch size to use (default value is 10). This is expert level configuration.
* For every transaction batch a delta directory will be created which will impact
* when compaction will trigger.
* NOTE: This is evolving API and is subject to change/might not be honored in future releases.
*
* @param transactionBatchSize - transaction batch size
* @return - builder
*/
@InterfaceStability.Evolving
public Builder withTransactionBatchSize(final int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
return this;
}
/**
* Whether to enable streaming optimizations. This is expert level configurations.
* Disabling streaming optimizations will have significant impact to performance and memory consumption.
*
* @param enable - flag to enable or not
* @return - builder
*/
public Builder withStreamingOptimizations(final boolean enable) {
this.streamingOptimizations = enable;
return this;
}
/**
* Record writer to use for writing records to destination table.
*
* @param recordWriter - record writer
* @return - builder
*/
public Builder withRecordWriter(final RecordWriter recordWriter) {
this.recordWriter = recordWriter;
return this;
}
/**
* Specify this parameter if we want the current connection
* to join an ongoing transaction without having to query
* the metastore to create it.
* @param writeId write id
* @return builder
*/
public Builder withWriteId(final long writeId) {
this.writeId = writeId;
manageTransactions = false;
return this;
}
/**
* Specify this parameter to set an statement id in the writer.
* This really only makes sense to be specified when a writeId is
* provided as well
* @param statementId statement id
* @return builder
*/
public Builder withStatementId(final int statementId) {
this.statementId = statementId;
return this;
}
/**
* Specify the table object since sometimes no connections
* to the metastore will be opened.
* @param table table object.
* @return builder
*/
public Builder withTableObject(Table table) {
this.tableObject = table;
this.isPartitioned = tableObject.getPartitionKeys() != null
&& !tableObject.getPartitionKeys().isEmpty();
return this;
}
/**
* Returning a streaming connection to hive.
*
* @return - hive streaming connection
*/
public HiveStreamingConnection connect() throws StreamingException {
if (database == null) {
throw new StreamingException("Database cannot be null for streaming connection");
}
if (table == null) {
if (tableObject == null) {
throw new StreamingException("Table and table object cannot be "
+ "null for streaming connection");
} else {
table = tableObject.getTableName();
}
}
if (tableObject != null && !tableObject.getTableName().equals(table)) {
throw new StreamingException("Table must match tableObject table name");
}
if (recordWriter == null) {
throw new StreamingException("Record writer cannot be null for streaming connection");
}
if ((writeId != -1 && tableObject == null) ||
(writeId == -1 && tableObject != null)){
throw new StreamingException("If writeId is set, tableObject "
+ "must be set as well and vice versa");
}
HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this);
streamingConnection.onShutdownRunner = streamingConnection::close;
// assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
// filesystem close (to avoid ClosedChannelException)
ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close());
return streamingConnection;
}
}
private void setPartitionedTable(Boolean isPartitionedTable) {
this.isPartitionedTable = isPartitionedTable;
}
@Override
public String toString() {
return "{ metaStoreUri: " + metastoreUri + ", database: " + database + ", table: " + table + " }";
}
private String toConnectionInfoString() {
return "{ metastore-uri: " + metastoreUri + ", " +
"database: " + database + ", " +
"table: " + table + ", " +
"partitioned-table: " + isPartitionedTable() + ", " +
"dynamic-partitioning: " + isDynamicPartitioning() + ", " +
"username: " + username + ", " +
"secure-mode: " + secureMode + ", " +
"record-writer: " + recordWriter.getClass().getSimpleName() + ", " +
"agent-info: " + agentInfo + ", " +
"writeId: " + writeId + ", " +
"statementId: " + statementId + " }";
}
@VisibleForTesting
String toTransactionString() {
return currentTransactionBatch == null ? "" : currentTransactionBatch.toString();
}
@Override
public PartitionInfo createPartitionIfNotExists(final List<String> partitionValues) throws StreamingException {
String partLocation = null;
String partName = null;
boolean exists = false;
try {
Map<String, String> partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues);
Path location = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec));
location = new Path(Utilities.getQualifiedPath(conf, location));
partLocation = location.toString();
partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues);
Partition partition =
org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(tableObject, partSpec, location);
if (getMSC() == null) {
// We assume it doesn't exist if we can't check it
// so the driver will decide
return new PartitionInfo(partName, partLocation, false);
}
getMSC().add_partition(partition);
if (LOG.isDebugEnabled()) {
LOG.debug("Created partition {} for table {}", partName,
tableObject.getFullyQualifiedName());
}
} catch (AlreadyExistsException e) {
exists = true;
} catch (HiveException | TException e) {
throw new StreamingException("Unable to creation partition for values: " + partitionValues + " connection: " +
toConnectionInfoString(), e);
}
return new PartitionInfo(partName, partLocation, exists);
}
/**
* Returns the file that would be used to store rows under this.
* parameters
* @param partitionValues partition values
* @param bucketId bucket id
* @param minWriteId min write Id
* @param maxWriteId max write Id
* @param statementId statement Id
* @return the location of the file.
* @throws StreamingException when the path is not found
*/
@Override
public Path getDeltaFileLocation(List<String> partitionValues,
Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId)
throws StreamingException {
return recordWriter.getDeltaFileLocation(partitionValues,
bucketId, minWriteId, maxWriteId, statementId, tableObject);
}
IMetaStoreClient getMSC() {
connectionStats.incrementMetastoreCalls();
return msClient;
}
IMetaStoreClient getHeatbeatMSC() {
connectionStats.incrementMetastoreCalls();
return heartbeatMSClient;
}
private void validateTable() throws InvalidTable, ConnectionError {
try {
tableObject = new Table(getMSC().getTable(database, table));
} catch (Exception e) {
LOG.warn("Unable to validate the table for connection: " + toConnectionInfoString(), e);
throw new InvalidTable(database, table, e);
}
// 1 - check that the table is Acid
if (!AcidUtils.isFullAcidTable(tableObject)) {
LOG.error("HiveEndPoint " + this + " must use an acid table");
throw new InvalidTable(database, table, "is not an Acid table");
}
if (tableObject.getPartitionKeys() != null && !tableObject.getPartitionKeys().isEmpty()) {
setPartitionedTable(true);
} else {
setPartitionedTable(false);
}
// partition values are specified on non-partitioned table
if (!isPartitionedTable() && (staticPartitionValues != null && !staticPartitionValues.isEmpty())) {
// Invalid if table is not partitioned, but endPoint's partitionVals is not empty
String errMsg = this.toString() + " specifies partitions for un-partitioned table";
LOG.error(errMsg);
throw new ConnectionError(errMsg);
}
// batch size is only used for managed transactions, not for unmanaged single transactions
if (transactionBatchSize > 1) {
try (FileSystem fs = tableObject.getDataLocation().getFileSystem(conf)) {
if (BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) {
// currently not all filesystems implement StreamCapabilities, while FSDataOutputStream does
Path path = new Path("/tmp", "_tmp_stream_verify_" + UUID.randomUUID().toString());
try(FSDataOutputStream out = fs.create(path, false)){
if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
throw new ConnectionError(
"The backing filesystem only supports transaction batch sizes of 1, but " + transactionBatchSize
+ " was requested.");
}
fs.deleteOnExit(path);
} catch (IOException e){
throw new ConnectionError("Could not create path for database", e);
}
}
} catch (IOException e) {
throw new ConnectionError("Could not retrieve FileSystem of table", e);
}
}
}
private void beginNextTransaction() throws StreamingException {
if (currentTransactionBatch == null) {
currentTransactionBatch = createNewTransactionBatch();
LOG.info("Opened new transaction batch {}", currentTransactionBatch);
}
if (currentTransactionBatch.isClosed()) {
throw new StreamingException("Cannot begin next transaction on a closed streaming connection");
}
if (currentTransactionBatch.remainingTransactions() == 0) {
LOG.info("Transaction batch {} is done. Rolling over to next transaction batch.",
currentTransactionBatch);
closeCurrentTransactionBatch();
currentTransactionBatch = createNewTransactionBatch();
LOG.info("Rolled over to new transaction batch {}", currentTransactionBatch);
}
currentTransactionBatch.beginNextTransaction();
}
private StreamingTransaction createNewTransactionBatch() throws StreamingException {
countTransactions++;
if (manageTransactions) {
return new TransactionBatch(this);
} else {
if (countTransactions > 1) {
throw new StreamingException("If a writeId is passed for the "
+ "construction of HiveStreaming only one transaction batch"
+ " can be done");
}
return new UnManagedSingleTransaction(this);
}
}
private void checkClosedState() throws StreamingException {
if (isConnectionClosed.get()) {
throw new StreamingException("Streaming connection is closed already.");
}
}
private void checkState() throws StreamingException {
checkClosedState();
if (currentTransactionBatch == null) {
throw new StreamingException("Transaction batch is null. Missing beginTransaction?");
}
if (currentTransactionBatch.getCurrentTransactionState() != TxnState.OPEN) {
throw new StreamingException("Transaction state is not OPEN. Missing beginTransaction?");
}
}
private void closeCurrentTransactionBatch() throws StreamingException {
currentTransactionBatch.close();
writePaths.clear();
}
@Override
public void beginTransaction() throws StreamingException {
checkClosedState();
partitions = new HashSet<>();
beginNextTransaction();
}
@Override
public void commitTransaction() throws StreamingException {
commitTransaction(null);
}
@Override
public void commitTransaction(Set<String> partitions)
throws StreamingException {
commitTransaction(partitions, null, null);
}
@Override
public void commitTransaction(Set<String> partitions, String key,
String value) throws StreamingException {
checkState();
Set<String> createdPartitions = new HashSet<>();
if (partitions != null) {
for (String partition: partitions) {
try {
PartitionInfo info = createPartitionIfNotExists(
Warehouse.getPartValuesFromPartName(partition));
if (!info.isExists()) {
createdPartitions.add(partition);
}
} catch (MetaException e) {
throw new StreamingException("Partition " + partition + " is invalid.", e);
}
}
connectionStats.incrementTotalPartitions(partitions.size());
}
currentTransactionBatch.commit(createdPartitions, key, value);
this.partitions.addAll(
currentTransactionBatch.getPartitions());
connectionStats.incrementCreatedPartitions(createdPartitions.size());
connectionStats.incrementCommittedTransactions();
}
@Override
public void abortTransaction() throws StreamingException {
checkState();
currentTransactionBatch.abort();
connectionStats.incrementAbortedTransactions();
}
@Override
public void write(final byte[] record) throws StreamingException {
checkState();
currentTransactionBatch.write(record);
}
@Override
public void write(final InputStream inputStream) throws StreamingException {
checkState();
currentTransactionBatch.write(inputStream);
}
/**
* Close connection
*/
@Override
public void close() {
if (isConnectionClosed.get()) {
return;
}
isConnectionClosed.set(true);
try {
if (currentTransactionBatch != null) {
closeCurrentTransactionBatch();
}
} catch (StreamingException e) {
LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e);
} finally {
if (manageTransactions) {
getMSC().close();
getHeatbeatMSC().close();
}
//remove shutdown hook entry added while creating this connection via HiveStreamingConnection.Builder#connect()
if (!ShutdownHookManager.isShutdownInProgress()) {
ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
}
}
if (LOG.isInfoEnabled()) {
LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
}
}
@Override
public ConnectionStats getConnectionStats() {
return connectionStats;
}
private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String metastoreUri, boolean secureMode,
String owner)
throws ConnectionError {
if (metastoreUri != null) {
conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
}
if (secureMode) {
conf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), true);
}
try {
LOG.info("Creating metastore client for {}", owner);
return HiveMetaStoreUtils.getHiveMetastoreClient(conf);
} catch (MetaException | IOException e) {
throw new ConnectionError("Error connecting to Hive Metastore URI: "
+ metastoreUri + ". " + e.getMessage(), e);
}
}
private static class WriteDirInfo {
List<String> partitionVals;
Path writeDir;
WriteDirInfo(List<String> partitionVals, Path writeDir) {
this.partitionVals = partitionVals;
this.writeDir = writeDir;
}
List<String> getPartitionVals() {
return this.partitionVals;
}
Path getWriteDir() {
return this.writeDir;
}
}
@Override
public void addWriteDirectoryInfo(List<String> partitionValues, Path writeDir) {
String key = (partitionValues == null) ? tableObject.getFullyQualifiedName()
: partitionValues.toString();
if (writePaths.containsKey(key)) {
// This method is invoked once per bucket file within delta directory. So, same partition or
// table entry shall exist already. But the written delta directory should remain same for all
// bucket files.
WriteDirInfo dirInfo = writePaths.get(key);
assert(dirInfo.getWriteDir().equals(writeDir));
} else {
writePaths.put(key, new WriteDirInfo(partitionValues, writeDir));
}
}
/**
* Add Write notification events if it is enabled.
* @throws StreamingException File operation errors or HMS errors.
*/
@Override
public void addWriteNotificationEvents() throws StreamingException {
if (!conf.getBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML)) {
LOG.debug("Write notification log is ignored as dml event logging is disabled.");
return;
}
try {
// Traverse the write paths for the current streaming connection and add one write notification
// event per table or partitions.
// For non-partitioned table, there will be only one entry in writePath and corresponding
// partitionVals is null.
Long currentTxnId = getCurrentTxnId();
Long currentWriteId = getCurrentWriteId();
for (WriteDirInfo writeInfo : writePaths.values()) {
LOG.debug("TxnId: " + currentTxnId + ", WriteId: " + currentWriteId
+ " - Logging write event for the files in path " + writeInfo.getWriteDir());
// List the new files added inside the write path (delta directory).
FileSystem fs = tableObject.getDataLocation().getFileSystem(conf);
List<Path> newFiles = new ArrayList<>();
Hive.listFilesInsideAcidDirectory(writeInfo.getWriteDir(), fs, newFiles);
// If no files are added by this streaming writes, then no need to log write notification event.
if (newFiles.isEmpty()) {
LOG.debug("TxnId: " + currentTxnId + ", WriteId: " + currentWriteId
+ " - Skipping empty path " + writeInfo.getWriteDir());
continue;
}
// Add write notification events into HMS table.
Hive.addWriteNotificationLog(conf, tableObject, writeInfo.getPartitionVals(),
currentTxnId, currentWriteId, newFiles);
}
} catch (IOException | TException | HiveException e) {
throw new StreamingException("Failed to log write notification events.", e);
}
}
@VisibleForTesting
TxnState getCurrentTransactionState() {
return currentTransactionBatch.getCurrentTransactionState();
}
@VisibleForTesting
int remainingTransactions() {
return currentTransactionBatch.remainingTransactions();
}
@VisibleForTesting
Long getCurrentTxnId() {
return currentTransactionBatch.getCurrentTxnId();
}
private HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
HiveConf conf = new HiveConf(clazz);
if (metaStoreUri != null) {
conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metaStoreUri);
}
return conf;
}
private void overrideConfSettings(HiveConf conf) {
setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, DbTxnManager.class.getName());
setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
setHiveConf(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI.getHiveName());
setHiveConf(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
if (streamingOptimizations) {
setHiveConf(conf, HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true);
}
// since same thread creates metastore client for streaming connection thread and heartbeat thread we explicitly
// disable metastore client cache
setHiveConf(conf, HiveConf.ConfVars.METASTORE_CLIENT_CACHE_ENABLED, false);
}
private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
if (LOG.isDebugEnabled()) {
LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
}
conf.setVar(var, value);
}
private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
if (LOG.isDebugEnabled()) {
LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
}
conf.setBoolVar(var, value);
}
private static void setHiveConf(HiveConf conf, String var) {
if (LOG.isDebugEnabled()) {
LOG.debug("Overriding HiveConf setting : " + var + " = " + true);
}
conf.setBoolean(var, true);
}
public List<TxnToWriteId> getTxnToWriteIds() {
if (currentTransactionBatch != null) {
return currentTransactionBatch.getTxnToWriteIds();
}
return null;
}
@Override
public HiveConf getHiveConf() {
return conf;
}
@Override
public String getMetastoreUri() {
return metastoreUri;
}
@Override
public Table getTable() {
return tableObject;
}
@Override
public List<String> getStaticPartitionValues() {
return staticPartitionValues;
}
@Override
public String getAgentInfo() {
return agentInfo;
}
@Override
public boolean isPartitionedTable() {
return isPartitionedTable;
}
@Override
public boolean isDynamicPartitioning() {
return isPartitionedTable() && (staticPartitionValues == null || staticPartitionValues.isEmpty());
}
@Override
public Set<String> getPartitions() {
return partitions;
}
public String getUsername() {
return username;
}
public String getDatabase() {
return database;
}
public RecordWriter getRecordWriter() {
return recordWriter;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
}
public HiveConf getConf() {
return conf;
}
public Long getWriteId() {
return writeId;
}
public Integer getStatementId() {
return statementId;
}
public Long getCurrentWriteId() {
return currentTransactionBatch.getCurrentWriteId();
}
}