MINIFICPP-1392 - Fix and test streams reading/writing zero-length data

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #927
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index be01261..f97b004 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -57,6 +57,9 @@
 
 int HttpStream::write(const uint8_t *value, int size) {
   gsl_Expects(size >= 0);
+  if (size == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(value)) {
     if (!started_) {
       std::lock_guard<std::mutex> lock(mutex_);
@@ -77,6 +80,9 @@
 
 int HttpStream::read(uint8_t *buf, int buflen) {
   gsl_Expects(buflen >= 0);
+  if (buflen == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(buf)) {
     if (!started_) {
       std::lock_guard<std::mutex> lock(mutex_);
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 933bd29..18c2246 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -52,7 +52,13 @@
 
 int RocksDbStream::write(const uint8_t *value, int size) {
   gsl_Expects(size >= 0);
-  if (!IsNullOrEmpty(value) && write_enable_) {
+  if (!write_enable_) {
+    return -1;
+  }
+  if (size == 0) {
+    return 0;
+  }
+  if (!IsNullOrEmpty(value)) {
     auto opendb = db_->open();
     if (!opendb) {
       return -1;
@@ -79,7 +85,13 @@
 
 int RocksDbStream::read(uint8_t *buf, int buflen) {
   gsl_Expects(buflen >= 0);
-  if (!IsNullOrEmpty(buf) && exists_) {
+  if (!exists_) {
+    return -1;
+  }
+  if (buflen == 0) {
+    return 0;
+  }
+  if (!IsNullOrEmpty(buf)) {
     size_t amtToRead = gsl::narrow<size_t>(buflen);
     if (offset_ >= value_.size()) {
       return 0;
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 3476913..8e47b51 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -62,6 +62,9 @@
     return size_;
   }
 
+  using BaseStream::write;
+  using BaseStream::read;
+
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 99bf97c..2784e70 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -45,6 +45,9 @@
 
 int DescriptorStream::write(const uint8_t *value, int size) {
   gsl_Expects(size >= 0);
+  if (size == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(value)) {
     std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
@@ -63,6 +66,9 @@
 
 int DescriptorStream::read(uint8_t *buf, int buflen) {
   gsl_Expects(buflen >= 0);
+  if (buflen == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(buf)) {
 #ifdef WIN32
     auto size_read = _read(fd_, buf, buflen);
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index b64ca98..7df6bb3 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -85,6 +85,9 @@
 
 int FileStream::write(const uint8_t *value, int size) {
   gsl_Expects(size >= 0);
+  if (size == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(value)) {
     std::lock_guard<std::mutex> lock(file_lock_);
     if (file_stream_->write(reinterpret_cast<const char*>(value), size)) {
@@ -106,6 +109,9 @@
 
 int FileStream::read(uint8_t *buf, int buflen) {
   gsl_Expects(buflen >= 0);
+  if (buflen == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(buf)) {
     std::lock_guard<std::mutex> lock(file_lock_);
     if (!file_stream_) {
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index 694959e..c5b0b3f 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -47,6 +47,9 @@
 
 int SecureDescriptorStream::write(const uint8_t *value, int size) {
   gsl_Expects(size >= 0);
+  if (size == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(value)) {
     std::lock_guard<std::recursive_mutex> lock(file_lock_);
     int bytes = 0;
@@ -70,6 +73,9 @@
 
 int SecureDescriptorStream::read(uint8_t *buf, int buflen) {
   gsl_Expects(buflen >= 0);
+  if (buflen == 0) {
+    return 0;
+  }
   if (!IsNullOrEmpty(buf)) {
     int total_read = 0;
       int status = 0;
diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp
new file mode 100644
index 0000000..4ad0ece
--- /dev/null
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "core/repository/VolatileFlowFileRepository.h"
+#include "../../extensions/rocksdb-repos/DatabaseContentRepository.h"
+#include "../../extensions/rocksdb-repos/FlowFileRepository.h"
+
+TEST_CASE("Import null data") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+  LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
+  LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
+  LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
+
+  char format[] = "/var/tmp/test.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto prov_repo = std::make_shared<core::Repository>();
+  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo;
+  SECTION("VolatileContentRepository") {
+    testController.getLogger()->log_info("Using VolatileContentRepository");
+    content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  }
+  SECTION("FileSystemContentRepository") {
+    testController.getLogger()->log_info("Using FileSystemRepository");
+    content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  }
+  SECTION("DatabaseContentRepository") {
+    testController.getLogger()->log_info("Using DatabaseContentRepository");
+    content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  }
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  auto processor = std::make_shared<core::Processor>("dummy");
+  utils::Identifier uuid;
+  processor->getUUID(uuid);
+  auto output = std::make_shared<minifi::Connection>(ff_repository, content_repo, "output");
+  output->addRelationship({"out", ""});
+  output->setSourceUUID(uuid);
+  processor->addConnection(output);
+  auto node = std::make_shared<core::ProcessorNode>(processor);
+  auto context = std::make_shared<core::ProcessContext>(node, nullptr, prov_repo, ff_repository, content_repo);
+  core::ProcessSession session(context);
+
+  minifi::io::BufferStream input{};
+  auto flowFile = session.create();
+  session.transfer(flowFile, {"out", ""});
+  session.importFrom(input, flowFile);
+  session.commit();
+}
+
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
new file mode 100644
index 0000000..88b3929
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "../../extensions/rocksdb-repos/RocksDbStream.h"
+#include "../../extensions/rocksdb-repos/DatabaseContentRepository.h"
+
+class RocksDBStreamTest : TestController {
+ public:
+  RocksDBStreamTest() {
+    char format[] = "/var/tmp/testdb.XXXXXX";
+    dbPath = createTempDirectory(format);
+    rocksdb::Options options;
+    options.create_if_missing = true;
+    options.use_direct_io_for_flush_and_compaction = true;
+    options.use_direct_reads = true;
+    options.merge_operator = std::make_shared<core::repository::StringAppender>();
+    options.error_if_exists = false;
+    options.max_successive_merges = 0;
+    db = utils::make_unique<minifi::internal::RocksDatabase>(options, dbPath);
+    REQUIRE(db->open());
+  }
+
+ protected:
+  std::string dbPath;
+  std::unique_ptr<minifi::internal::RocksDatabase> db;
+};
+
+TEST_CASE_METHOD(RocksDBStreamTest, "Verify simple operation") {
+  std::string content = "banana";
+  minifi::io::RocksDbStream outStream("one", gsl::make_not_null(db.get()), true);
+  outStream.write(content);
+  REQUIRE(outStream.write(content) > 0);
+  minifi::io::RocksDbStream inStream("one", gsl::make_not_null(db.get()));
+  std::string str;
+  inStream.read(str);
+  REQUIRE(str == content);
+}
+
+TEST_CASE_METHOD(RocksDBStreamTest, "Write zero bytes") {
+  minifi::io::RocksDbStream stream("one", gsl::make_not_null(db.get()), true);
+
+  REQUIRE(stream.write(nullptr, 0) == 0);
+
+  minifi::io::RocksDbStream readonlyStream("two", gsl::make_not_null(db.get()), false);
+
+  REQUIRE(readonlyStream.write(nullptr, 0) == -1);
+}
+
+TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
+  minifi::io::RocksDbStream one("one", gsl::make_not_null(db.get()), true);
+  REQUIRE(one.write("banana") > 0);
+
+  minifi::io::RocksDbStream stream("one", gsl::make_not_null(db.get()));
+
+  REQUIRE(stream.read(nullptr, 0) == 0);
+
+  minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
+
+  REQUIRE(nonExistingStream.read(nullptr, 0) == -1);
+}
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index 37a7955..66d2c15 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -246,3 +246,19 @@
 
   std::remove(ss.str().c_str());
 }
+
+TEST_CASE("Write zero bytes") {
+  TestController testController;
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+  minifi::io::FileStream stream(utils::file::concat_path(dir, "test.txt"), 0, true);
+  REQUIRE(stream.write(nullptr, 0) == 0);
+}
+
+TEST_CASE("Read zero bytes") {
+  TestController testController;
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+  minifi::io::FileStream stream(utils::file::concat_path(dir, "test.txt"), 0, true);
+  REQUIRE(stream.read(nullptr, 0) == 0);
+}