blob: c5b83aff73cbe25efafd3e7c84c647682da34c81 [file] [log] [blame]
/**
* @file SiteToSiteProvenanceReportingTask.cpp
* SiteToSiteProvenanceReportingTask class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <string>
#include <memory>
#include <sstream>
#include <functional>
#include <iostream>
#include <utility>
#include "core/Repository.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "../include/io/StreamFactory.h"
#include "io/ClientSocket.h"
#include "utils/TimeUtil.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "provenance/Provenance.h"
#include "FlowController.h"
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/prettywriter.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace reporting {
const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi Flow";
void SiteToSiteProvenanceReportingTask::initialize() {
RemoteProcessorGroupPort::initialize();
}
void setJsonStr(const std::string& key, const std::string& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT
rapidjson::Value keyVal;
rapidjson::Value valueVal;
const char* c_key = key.c_str();
const char* c_val = value.c_str();
keyVal.SetString(c_key, key.length(), alloc);
valueVal.SetString(c_val, value.length(), alloc);
parent.AddMember(keyVal, valueVal, alloc);
}
rapidjson::Value getStringValue(const std::string& value, rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value Val;
Val.SetString(value.c_str(), value.length(), alloc);
return Val;
}
template<size_t N>
rapidjson::Value getStringValue(const utils::SmallString<N>& value, rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value Val;
Val.SetString(value.c_str(), value.length(), alloc);
return Val;
}
void appendJsonStr(const std::string& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value valueVal;
valueVal.SetString(value.c_str(), value.length(), alloc);
parent.PushBack(valueVal, alloc);
}
template<size_t N>
void appendJsonStr(const utils::SmallString<N>& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value valueVal;
valueVal.SetString(value.c_str(), value.length(), alloc);
parent.PushBack(valueVal, alloc);
}
void SiteToSiteProvenanceReportingTask::getJsonReport(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
std::vector<std::shared_ptr<core::SerializableComponent>> &records, std::string &report) {
rapidjson::Document array(rapidjson::kArrayType);
rapidjson::Document::AllocatorType &alloc = array.GetAllocator();
for (auto sercomp : records) {
std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast<provenance::ProvenanceEventRecord>(sercomp);
if (nullptr == record) {
break;
}
rapidjson::Value recordJson(rapidjson::kObjectType);
rapidjson::Value updatedAttributesJson(rapidjson::kObjectType);
rapidjson::Value parentUuidJson(rapidjson::kArrayType);
rapidjson::Value childUuidJson(rapidjson::kArrayType);
recordJson.AddMember("timestampMillis", record->getEventTime(), alloc);
recordJson.AddMember("durationMillis", record->getEventDuration(), alloc);
recordJson.AddMember("lineageStart", record->getlineageStartDate(), alloc);
recordJson.AddMember("entitySize", record->getFileSize(), alloc);
recordJson.AddMember("entityOffset", record->getFileOffset(), alloc);
recordJson.AddMember("entityType", "org.apache.nifi.flowfile.FlowFile", alloc);
recordJson.AddMember("eventId", getStringValue(record->getEventId().to_string(), alloc), alloc);
recordJson.AddMember("eventType", getStringValue(provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()], alloc), alloc);
recordJson.AddMember("details", getStringValue(record->getDetails(), alloc), alloc);
recordJson.AddMember("componentId", getStringValue(record->getComponentId(), alloc), alloc);
recordJson.AddMember("componentType", getStringValue(record->getComponentType(), alloc), alloc);
recordJson.AddMember("entityId", getStringValue(record->getFlowFileUuid().to_string(), alloc), alloc);
recordJson.AddMember("transitUri", getStringValue(record->getTransitUri(), alloc), alloc);
recordJson.AddMember("remoteIdentifier", getStringValue(record->getSourceSystemFlowFileIdentifier(), alloc), alloc);
recordJson.AddMember("alternateIdentifier", getStringValue(record->getAlternateIdentifierUri(), alloc), alloc);
for (auto attr : record->getAttributes()) {
setJsonStr(attr.first, attr.second, updatedAttributesJson, alloc);
}
recordJson.AddMember("updatedAttributes", updatedAttributesJson, alloc);
for (auto parentUUID : record->getParentUuids()) {
appendJsonStr(parentUUID.to_string(), parentUuidJson, alloc);
}
recordJson.AddMember("parentIds", parentUuidJson, alloc);
for (auto childUUID : record->getChildrenUuids()) {
appendJsonStr(childUUID.to_string(), childUuidJson, alloc);
}
recordJson.AddMember("childIds", childUuidJson, alloc);
rapidjson::Value applicationVal;
applicationVal.SetString(ProvenanceAppStr, std::strlen(ProvenanceAppStr));
recordJson.AddMember("application", applicationVal, alloc);
array.PushBack(recordJson, alloc);
}
rapidjson::StringBuffer buffer;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
array.Accept(writer);
report = buffer.GetString();
}
void SiteToSiteProvenanceReportingTask::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
}
void SiteToSiteProvenanceReportingTask::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_debug("SiteToSiteProvenanceReportingTask -- onTrigger");
std::vector<std::shared_ptr<core::SerializableComponent>> records;
logging::LOG_DEBUG(logger_) << "batch size " << batch_size_ << " records";
size_t deserialized = batch_size_;
std::shared_ptr<core::Repository> repo = context->getProvenanceRepository();
std::function<std::shared_ptr<core::SerializableComponent>()> constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();};
if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) {
return;
}
logging::LOG_DEBUG(logger_) << "Captured " << deserialized << " records";
std::string jsonStr;
this->getJsonReport(context, session, records, jsonStr);
if (jsonStr.length() <= 0) {
return;
}
auto protocol_ = getNextProtocol(true);
if (!protocol_) {
context->yield();
return;
}
try {
std::map<std::string, std::string> attributes;
if (!protocol_->transmitPayload(context, session, jsonStr, attributes)) {
context->yield();
}
} catch (...) {
// if transfer bytes failed, return instead of purge the provenance records
return;
}
// we transfer the record, purge the record from DB
repo->Delete(records);
returnProtocol(std::move(protocol_));
}
} /* namespace reporting */
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */