HIVE-21146 Enforce TransactionBatch size=1 for blob stores (#797)
* HIVE-21146 Enforce TransactionBatch size=1 for blob stores
Change-Id: Ia5f94c34a044c2990e95204de03b661d162874c7
* Apply _ prefix to tmp verification file
* Rely on /tmp instead
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index f4e71f9..27dc6f2 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -32,8 +32,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -520,6 +523,28 @@
LOG.error(errMsg);
throw new ConnectionError(errMsg);
}
+
+ // batch size is only used for managed transactions, not for unmanaged single transactions
+ if (transactionBatchSize > 1) {
+ try (FileSystem fs = tableObject.getDataLocation().getFileSystem(conf)) {
+ if (BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) {
+ // currently not all filesystems implement StreamCapabilities, while FSDataOutputStream does
+ Path path = new Path("/tmp", "_tmp_stream_verify_" + UUID.randomUUID().toString());
+ try(FSDataOutputStream out = fs.create(path, false)){
+ if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
+ throw new ConnectionError(
+ "The backing filesystem only supports transaction batch sizes of 1, but " + transactionBatchSize
+ + " was requested.");
+ }
+ fs.deleteOnExit(path);
+ } catch (IOException e){
+ throw new ConnectionError("Could not create path for database", e);
+ }
+ }
+ } catch (IOException e) {
+ throw new ConnectionError("Could not retrieve FileSystem of table", e);
+ }
+ }
}
private void beginNextTransaction() throws StreamingException {
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 055672f..58b3ae2 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -114,7 +114,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class TestStreaming {
private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
@@ -1314,6 +1313,35 @@
connection.close();
}
+ @Test
+ public void testTransactionBatchSizeValidation() throws Exception {
+ final String schemes = conf.get(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);
+ // the output stream of this FS doesn't support hflush, so the below test will fail
+ conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, "raw");
+
+ StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+
+ try {
+ HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withAgentInfo("UT_" + Thread.currentThread().getName())
+ .withRecordWriter(writer)
+ .withTransactionBatchSize(2)
+ .withHiveConf(conf)
+ .connect();
+
+ Assert.fail();
+ } catch (ConnectionError e) {
+ Assert.assertTrue("Expected connection error due to batch sizes",
+ e.getMessage().contains("only supports transaction batch"));
+ } finally {
+ conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, schemes);
+ }
+ }
+
/**
* check that transactions that have not heartbeated and timedout get properly aborted
*