blob: fef132777ea0c3fe9ec3aeba5ffc2cd32ad1eae7 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <memory>
#include <string>
#include "io/BaseStream.h"
#include "serialization/FlowFileV3Serializer.h"
#include "serialization/PayloadSerializer.h"
#include "core/FlowFile.h"
#include "../TestBase.h"
#include "utils/gsl.h"
std::shared_ptr<minifi::FlowFileRecord> createEmptyFlowFile() {
auto flowFile = std::make_shared<minifi::FlowFileRecord>();
flowFile->removeAttribute(core::SpecialFlowAttribute::FILENAME);
return flowFile;
}
TEST_CASE("Payload Serializer", "[testPayload]") {
std::string content = "flowFileContent";
auto contentStream = std::make_shared<minifi::io::BufferStream>();
contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), gsl::narrow<int>(content.length()));
auto result = std::make_shared<minifi::io::BufferStream>();
auto flowFile = createEmptyFlowFile();
flowFile->setSize(content.size());
flowFile->addAttribute("first", "one");
flowFile->addAttribute("second", "two");
minifi::PayloadSerializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
return gsl::narrow<int>(cb->process(contentStream));
});
serializer.serialize(flowFile, result);
std::string serialized{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
REQUIRE(serialized == content);
}
TEST_CASE("FFv3 Serializer", "[testFFv3]") {
std::string content = "flowFileContent";
auto contentStream = std::make_shared<minifi::io::BufferStream>();
contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), gsl::narrow<int>(content.length()));
auto result = std::make_shared<minifi::io::BufferStream>();
auto flowFile = createEmptyFlowFile();
flowFile->setSize(content.size());
flowFile->addAttribute("first", "one");
flowFile->addAttribute("second", "two");
minifi::FlowFileV3Serializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
return gsl::narrow<int>(cb->process(contentStream));
});
serializer.serialize(flowFile, result);
std::string serialized{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
std::string expected = "NiFiFF3";
expected += std::string("\x00\x02", 2); // number of attributes
expected += std::string("\x00\x05", 2) + "first"; // first key
expected += std::string("\x00\x03", 2) + "one"; // first value
expected += std::string("\x00\x06", 2) + "second"; // second key
expected += std::string("\x00\x03", 2) + "two"; // second value
expected += std::string("\x00\x00\x00\x00\x00\x00\x00\x0f", 8) + content; // payload of the flowFile
REQUIRE(serialized == expected);
}