MINIFICPP-1392 - Fix and test streams reading/writing zero-length data
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #927
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index be01261..f97b004 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -57,6 +57,9 @@
int HttpStream::write(const uint8_t *value, int size) {
gsl_Expects(size >= 0);
+ if (size == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(value)) {
if (!started_) {
std::lock_guard<std::mutex> lock(mutex_);
@@ -77,6 +80,9 @@
int HttpStream::read(uint8_t *buf, int buflen) {
gsl_Expects(buflen >= 0);
+ if (buflen == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(buf)) {
if (!started_) {
std::lock_guard<std::mutex> lock(mutex_);
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 933bd29..18c2246 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -52,7 +52,13 @@
int RocksDbStream::write(const uint8_t *value, int size) {
gsl_Expects(size >= 0);
- if (!IsNullOrEmpty(value) && write_enable_) {
+ if (!write_enable_) {
+ return -1;
+ }
+ if (size == 0) {
+ return 0;
+ }
+ if (!IsNullOrEmpty(value)) {
auto opendb = db_->open();
if (!opendb) {
return -1;
@@ -79,7 +85,13 @@
int RocksDbStream::read(uint8_t *buf, int buflen) {
gsl_Expects(buflen >= 0);
- if (!IsNullOrEmpty(buf) && exists_) {
+ if (!exists_) {
+ return -1;
+ }
+ if (buflen == 0) {
+ return 0;
+ }
+ if (!IsNullOrEmpty(buf)) {
size_t amtToRead = gsl::narrow<size_t>(buflen);
if (offset_ >= value_.size()) {
return 0;
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 3476913..8e47b51 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -62,6 +62,9 @@
return size_;
}
+ using BaseStream::write;
+ using BaseStream::read;
+
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 99bf97c..2784e70 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -45,6 +45,9 @@
int DescriptorStream::write(const uint8_t *value, int size) {
gsl_Expects(size >= 0);
+ if (size == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(value)) {
std::lock_guard<std::recursive_mutex> lock(file_lock_);
#ifdef WIN32
@@ -63,6 +66,9 @@
int DescriptorStream::read(uint8_t *buf, int buflen) {
gsl_Expects(buflen >= 0);
+ if (buflen == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(buf)) {
#ifdef WIN32
auto size_read = _read(fd_, buf, buflen);
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index b64ca98..7df6bb3 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -85,6 +85,9 @@
int FileStream::write(const uint8_t *value, int size) {
gsl_Expects(size >= 0);
+ if (size == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(value)) {
std::lock_guard<std::mutex> lock(file_lock_);
if (file_stream_->write(reinterpret_cast<const char*>(value), size)) {
@@ -106,6 +109,9 @@
int FileStream::read(uint8_t *buf, int buflen) {
gsl_Expects(buflen >= 0);
+ if (buflen == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(buf)) {
std::lock_guard<std::mutex> lock(file_lock_);
if (!file_stream_) {
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index 694959e..c5b0b3f 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -47,6 +47,9 @@
int SecureDescriptorStream::write(const uint8_t *value, int size) {
gsl_Expects(size >= 0);
+ if (size == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(value)) {
std::lock_guard<std::recursive_mutex> lock(file_lock_);
int bytes = 0;
@@ -70,6 +73,9 @@
int SecureDescriptorStream::read(uint8_t *buf, int buflen) {
gsl_Expects(buflen >= 0);
+ if (buflen == 0) {
+ return 0;
+ }
if (!IsNullOrEmpty(buf)) {
int total_read = 0;
int status = 0;
diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp
new file mode 100644
index 0000000..4ad0ece
--- /dev/null
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "core/repository/VolatileFlowFileRepository.h"
+#include "../../extensions/rocksdb-repos/DatabaseContentRepository.h"
+#include "../../extensions/rocksdb-repos/FlowFileRepository.h"
+
+TEST_CASE("Import null data") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+ LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+ LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
+ LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
+
+ char format[] = "/var/tmp/test.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ auto prov_repo = std::make_shared<core::Repository>();
+ std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo;
+ SECTION("VolatileContentRepository") {
+ testController.getLogger()->log_info("Using VolatileContentRepository");
+ content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ }
+ SECTION("FileSystemContentRepository") {
+ testController.getLogger()->log_info("Using FileSystemRepository");
+ content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ }
+ SECTION("DatabaseContentRepository") {
+ testController.getLogger()->log_info("Using DatabaseContentRepository");
+ content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ }
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ auto processor = std::make_shared<core::Processor>("dummy");
+ utils::Identifier uuid;
+ processor->getUUID(uuid);
+ auto output = std::make_shared<minifi::Connection>(ff_repository, content_repo, "output");
+ output->addRelationship({"out", ""});
+ output->setSourceUUID(uuid);
+ processor->addConnection(output);
+ auto node = std::make_shared<core::ProcessorNode>(processor);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, prov_repo, ff_repository, content_repo);
+ core::ProcessSession session(context);
+
+ minifi::io::BufferStream input{};
+ auto flowFile = session.create();
+ session.transfer(flowFile, {"out", ""});
+ session.importFrom(input, flowFile);
+ session.commit();
+}
+
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
new file mode 100644
index 0000000..88b3929
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "../../extensions/rocksdb-repos/RocksDbStream.h"
+#include "../../extensions/rocksdb-repos/DatabaseContentRepository.h"
+
+class RocksDBStreamTest : TestController {
+ public:
+ RocksDBStreamTest() {
+ char format[] = "/var/tmp/testdb.XXXXXX";
+ dbPath = createTempDirectory(format);
+ rocksdb::Options options;
+ options.create_if_missing = true;
+ options.use_direct_io_for_flush_and_compaction = true;
+ options.use_direct_reads = true;
+ options.merge_operator = std::make_shared<core::repository::StringAppender>();
+ options.error_if_exists = false;
+ options.max_successive_merges = 0;
+ db = utils::make_unique<minifi::internal::RocksDatabase>(options, dbPath);
+ REQUIRE(db->open());
+ }
+
+ protected:
+ std::string dbPath;
+ std::unique_ptr<minifi::internal::RocksDatabase> db;
+};
+
+TEST_CASE_METHOD(RocksDBStreamTest, "Verify simple operation") {
+ std::string content = "banana";
+ minifi::io::RocksDbStream outStream("one", gsl::make_not_null(db.get()), true);
+ outStream.write(content);
+ REQUIRE(outStream.write(content) > 0);
+ minifi::io::RocksDbStream inStream("one", gsl::make_not_null(db.get()));
+ std::string str;
+ inStream.read(str);
+ REQUIRE(str == content);
+}
+
+TEST_CASE_METHOD(RocksDBStreamTest, "Write zero bytes") {
+ minifi::io::RocksDbStream stream("one", gsl::make_not_null(db.get()), true);
+
+ REQUIRE(stream.write(nullptr, 0) == 0);
+
+ minifi::io::RocksDbStream readonlyStream("two", gsl::make_not_null(db.get()), false);
+
+ REQUIRE(readonlyStream.write(nullptr, 0) == -1);
+}
+
+TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
+ minifi::io::RocksDbStream one("one", gsl::make_not_null(db.get()), true);
+ REQUIRE(one.write("banana") > 0);
+
+ minifi::io::RocksDbStream stream("one", gsl::make_not_null(db.get()));
+
+ REQUIRE(stream.read(nullptr, 0) == 0);
+
+ minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
+
+ REQUIRE(nonExistingStream.read(nullptr, 0) == -1);
+}
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index 37a7955..66d2c15 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -246,3 +246,19 @@
std::remove(ss.str().c_str());
}
+
+TEST_CASE("Write zero bytes") {
+ TestController testController;
+ char format[] = "/tmp/gt.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+ minifi::io::FileStream stream(utils::file::concat_path(dir, "test.txt"), 0, true);
+ REQUIRE(stream.write(nullptr, 0) == 0);
+}
+
+TEST_CASE("Read zero bytes") {
+ TestController testController;
+ char format[] = "/tmp/gt.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+ minifi::io::FileStream stream(utils::file::concat_path(dir, "test.txt"), 0, true);
+ REQUIRE(stream.read(nullptr, 0) == 0);
+}