blob: f43101211a682930372310f3fb433a6286084c54 [file] [log] [blame]
/**
* @file UpdateAttribute.cpp
* UpdateAttribute class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "processors/UpdateAttribute.h"
#include <memory>
#include <string>
#include <set>
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Relationship UpdateAttribute::Success(
"success",
"All files are routed to success");
core::Relationship UpdateAttribute::Failure(
"failure",
"Failed files are transferred to failure");
void UpdateAttribute::initialize() {
std::set<core::Property> properties;
setSupportedProperties(properties);
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Failure);
setSupportedRelationships(relationships);
}
void UpdateAttribute::onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory) {
attributes_.clear();
const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
logger_->log_info("UpdateAttribute registering %d keys", dynamic_prop_keys.size());
for (const auto &key : dynamic_prop_keys) {
attributes_.emplace_back(key);
logger_->log_info("UpdateAttribute registered attribute '%s'", key);
}
}
void UpdateAttribute::onTrigger(core::ProcessContext *context,
core::ProcessSession *session) {
auto flow_file = session->get();
// Do nothing if there are no incoming files
if (!flow_file) {
return;
}
try {
for (const auto &attribute : attributes_) {
std::string value;
context->getDynamicProperty(attribute, value, flow_file);
flow_file->setAttribute(attribute, value);
logger_->log_info("Set attribute '%s' of flow file '%s' with value '%s'",
attribute,
flow_file->getUUIDStr(), value);
}
session->transfer(flow_file, Success);
} catch (const std::exception &e) {
logger_->log_error("Caught exception while updating attributes: %s", e.what());
session->transfer(flow_file, Failure);
yield();
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */