HIVE-21146 Enforce TransactionBatch size=1 for blob stores (#797)

* HIVE-21146 Enforce TransactionBatch size=1 for blob stores

Change-Id: Ia5f94c34a044c2990e95204de03b661d162874c7

* Apply _ prefix to tmp verification file

* Rely on /tmp instead
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index f4e71f9..27dc6f2 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -32,8 +32,11 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -520,6 +523,28 @@
       LOG.error(errMsg);
       throw new ConnectionError(errMsg);
     }
+
+    // batch size is only used for managed transactions, not for unmanaged single transactions
+    if (transactionBatchSize > 1) {
+      try (FileSystem fs = tableObject.getDataLocation().getFileSystem(conf)) {
+        if (BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) {
+          // currently not all filesystems implement StreamCapabilities, while FSDataOutputStream does
+          Path path = new Path("/tmp", "_tmp_stream_verify_" + UUID.randomUUID().toString());
+          try(FSDataOutputStream out = fs.create(path, false)){
+            if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
+              throw new ConnectionError(
+                  "The backing filesystem only supports transaction batch sizes of 1, but " + transactionBatchSize
+                      + " was requested.");
+            }
+            fs.deleteOnExit(path);
+          } catch (IOException e){
+            throw new ConnectionError("Could not create path for database", e);
+          }
+        }
+      } catch (IOException e) {
+        throw new ConnectionError("Could not retrieve FileSystem of table", e);
+      }
+    }
   }
 
   private void beginNextTransaction() throws StreamingException {
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 055672f..58b3ae2 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -114,7 +114,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TestStreaming {
   private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
 
@@ -1314,6 +1313,35 @@
     connection.close();
   }
 
+  @Test
+  public void testTransactionBatchSizeValidation() throws Exception {
+    final String schemes = conf.get(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);
+    // the output stream of this FS doesn't support hflush, so the below test will fail
+    conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, "raw");
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+
+    try {
+      HiveStreamingConnection.newBuilder()
+          .withDatabase(dbName)
+          .withTable(tblName)
+          .withAgentInfo("UT_" + Thread.currentThread().getName())
+          .withRecordWriter(writer)
+          .withTransactionBatchSize(2)
+          .withHiveConf(conf)
+          .connect();
+
+      Assert.fail();
+    } catch (ConnectionError e) {
+      Assert.assertTrue("Expected connection error due to batch sizes",
+          e.getMessage().contains("only supports transaction batch"));
+    } finally {
+      conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, schemes);
+    }
+  }
+
   /**
    * check that transactions that have not heartbeated and timedout get properly aborted
    *