| /** | 
 |  * | 
 |  * Licensed to the Apache Software Foundation (ASF) under one or more | 
 |  * contributor license agreements.  See the NOTICE file distributed with | 
 |  * this work for additional information regarding copyright ownership. | 
 |  * The ASF licenses this file to You under the Apache License, Version 2.0 | 
 |  * (the "License"); you may not use this file except in compliance with | 
 |  * the License.  You may obtain a copy of the License at | 
 |  * | 
 |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  */ | 
 |  | 
 | #include <array> | 
 | #include <memory> | 
 | #include <string> | 
 | #include <vector> | 
 |  | 
 | #include <catch.hpp> | 
 | #include "core/Processor.h" | 
 | #include "core/ProcessSession.h" | 
 | #include "core/Resource.h" | 
 | #include "../TestBase.h" | 
 | #include "../Catch.h" | 
 | #include "../DummyProcessor.h" | 
 | #include "StreamPipe.h" | 
 |  | 
 | #pragma once | 
 |  | 
 | namespace ContentRepositoryDependentTests { | 
 |  | 
 | struct ReadUntilItCan { | 
 |   std::string value_; | 
 |  | 
 |   int64_t operator()(const std::shared_ptr<minifi::io::InputStream> &stream) { | 
 |     value_.clear(); | 
 |     std::array<std::byte, 1024> buffer{}; | 
 |     size_t bytes_read = 0; | 
 |     while (true) { | 
 |       size_t read_result = stream->read(buffer); | 
 |       if (minifi::io::isError(read_result)) | 
 |         return -1; | 
 |       if (read_result == 0) | 
 |         return bytes_read; | 
 |       bytes_read += read_result; | 
 |       const auto char_view = gsl::make_span(buffer).subspan(0, read_result).as_span<const char>(); | 
 |       value_.append(std::begin(char_view), std::end(char_view)); | 
 |     } | 
 |   } | 
 | }; | 
 |  | 
 | class Fixture { | 
 |  public: | 
 |   const core::Relationship Success{"success", "everything is fine"}; | 
 |   const core::Relationship Failure{"failure", "something has gone awry"}; | 
 |  | 
 |   explicit Fixture(std::shared_ptr<core::ContentRepository> content_repo) { | 
 |     test_plan_ = test_controller_.createPlan(nullptr, std::nullopt, content_repo); | 
 |     dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); | 
 |     context_ = [this] { | 
 |       test_plan_->runNextProcessor(); | 
 |       return test_plan_->getCurrentContext(); | 
 |     }(); | 
 |     process_session_ = std::make_unique<core::ProcessSession>(context_); | 
 |   } | 
 |  | 
 |   core::ProcessSession &processSession() { return *process_session_; } | 
 |  | 
 |   void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) { | 
 |     process_session_->transfer(flow_file, Success); | 
 |     process_session_->commit(); | 
 |   } | 
 |  | 
 |   void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content) { | 
 |     process_session_->writeBuffer(flow_file, content); | 
 |   } | 
 |  | 
 |   void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) { | 
 |     process_session_->add(flow_file); | 
 |     process_session_->appendBuffer(flow_file, content_to_append); | 
 |   } | 
 |  | 
 |  private: | 
 |   TestController test_controller_; | 
 |  | 
 |   std::shared_ptr<TestPlan> test_plan_; | 
 |   std::shared_ptr<core::Processor> dummy_processor_; | 
 |   std::shared_ptr<core::ProcessContext> context_; | 
 |   std::unique_ptr<core::ProcessSession> process_session_; | 
 | }; | 
 |  | 
 | void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> content_repo) { | 
 |   Fixture fixture = Fixture(content_repo); | 
 |   core::ProcessSession& process_session = fixture.processSession(); | 
 |   const auto original_ff = process_session.create(); | 
 |   fixture.writeToFlowFile(original_ff, "foobar"); | 
 |   fixture.transferAndCommit(original_ff); | 
 |   REQUIRE(original_ff); | 
 |   auto clone_first_half = process_session.clone(original_ff, 0, 3); | 
 |   auto clone_second_half = process_session.clone(original_ff, 3, 3); | 
 |   REQUIRE(clone_first_half != nullptr); | 
 |   REQUIRE(clone_second_half != nullptr); | 
 |   ReadUntilItCan read_until_it_can_callback; | 
 |   const auto read_result_original = process_session.readBuffer(original_ff); | 
 |   process_session.read(original_ff, std::ref(read_until_it_can_callback)); | 
 |   CHECK(original_ff->getSize() == 6); | 
 |   CHECK(to_string(read_result_original) == "foobar"); | 
 |   CHECK(read_until_it_can_callback.value_ == "foobar"); | 
 |   const auto read_result_first_half = process_session.readBuffer(clone_first_half); | 
 |   process_session.read(clone_first_half, std::ref(read_until_it_can_callback)); | 
 |   CHECK(clone_first_half->getSize() == 3); | 
 |   CHECK(to_string(read_result_first_half) == "foo"); | 
 |   CHECK(read_until_it_can_callback.value_ == "foo"); | 
 |   const auto read_result_second_half = process_session.readBuffer(clone_second_half); | 
 |   process_session.read(clone_second_half, std::ref(read_until_it_can_callback)); | 
 |   CHECK(clone_second_half->getSize() == 3); | 
 |   CHECK(to_string(read_result_second_half) == "bar"); | 
 |   CHECK(read_until_it_can_callback.value_ == "bar"); | 
 | } | 
 |  | 
 | void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) { | 
 |   Fixture fixture = Fixture(content_repo); | 
 |   core::ProcessSession& process_session = fixture.processSession(); | 
 |   const auto flow_file = process_session.create(); | 
 |   REQUIRE(flow_file); | 
 |  | 
 |   fixture.writeToFlowFile(flow_file, "my"); | 
 |   fixture.transferAndCommit(flow_file); | 
 |   fixture.appendToFlowFile(flow_file, "foobar"); | 
 |   fixture.transferAndCommit(flow_file); | 
 |  | 
 |   CHECK(flow_file->getSize() == 8); | 
 |   ReadUntilItCan read_until_it_can_callback; | 
 |   const auto read_result = process_session.readBuffer(flow_file); | 
 |   process_session.read(flow_file, std::ref(read_until_it_can_callback)); | 
 |   CHECK(to_string(read_result) == "myfoobar"); | 
 |   CHECK(read_until_it_can_callback.value_ == "myfoobar"); | 
 | } | 
 |  | 
 | void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) { | 
 |   Fixture fixture = Fixture(content_repo); | 
 |   core::ProcessSession& process_session = fixture.processSession(); | 
 |   const auto flow_file = process_session.create(); | 
 |   REQUIRE(flow_file); | 
 |  | 
 |   fixture.writeToFlowFile(flow_file, "my"); | 
 |   fixture.appendToFlowFile(flow_file, "foobar"); | 
 |   fixture.transferAndCommit(flow_file); | 
 |  | 
 |   CHECK(flow_file->getSize() == 8); | 
 |   const auto read_result = process_session.readBuffer(flow_file); | 
 |   ReadUntilItCan read_until_it_can_callback; | 
 |   CHECK(to_string(read_result) == "myfoobar"); | 
 |   process_session.read(flow_file, std::ref(read_until_it_can_callback)); | 
 |   CHECK(read_until_it_can_callback.value_ == "myfoobar"); | 
 | } | 
 |  | 
 | void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> content_repo) { | 
 |   Fixture fixture = Fixture(content_repo); | 
 |   core::ProcessSession& process_session = fixture.processSession(); | 
 |   const auto flow_file = process_session.create(); | 
 |   REQUIRE(flow_file); | 
 |   fixture.transferAndCommit(flow_file); | 
 |  | 
 |   CHECK(flow_file->getSize() == 0); | 
 |   REQUIRE_NOTHROW(process_session.readBuffer(flow_file)); | 
 |   REQUIRE_NOTHROW(process_session.read(flow_file, ReadUntilItCan{})); | 
 | } | 
 | }  // namespace ContentRepositoryDependentTests |