blob: ecbf5b2d51f3c35b264b3bdeb8ff0cb2777ed4c6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "QueryDatabaseTable.h"
#include <vector>
#include <string>
#include <algorithm>
#include "io/BufferStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "Exception.h"
#include "data/MaxCollector.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::processors {
const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
QueryDatabaseTable::QueryDatabaseTable(std::string name, const utils::Identifier& uuid)
: SQLProcessor(std::move(name), uuid, core::logging::LoggerFactory<QueryDatabaseTable>::getLogger(uuid)) {
}
void QueryDatabaseTable::initialize() {
setSupportedProperties(properties());
setSupportedRelationships(relationships());
}
void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
context.getProperty(OutputFormat.getName(), output_format_);
max_rows_ = [&] {
uint64_t max_rows = 0;
context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
return gsl::narrow<size_t>(max_rows);
}();
state_manager_ = context.getStateManager();
if (state_manager_ == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
context.getProperty(TableName.getName(), table_name_);
context.getProperty(WhereClause.getName(), extra_where_clause_);
return_columns_.clear();
queried_columns_.clear();
for (auto&& raw_col : utils::StringUtils::splitAndTrimRemovingEmpty(context.getProperty(ColumnNames).value_or(""), ",")) {
if (!queried_columns_.empty()) {
queried_columns_ += ", ";
}
queried_columns_ += raw_col;
return_columns_.insert(sql::SQLColumnIdentifier(std::move(raw_col)));
}
max_value_columns_.clear();
for (auto&& raw_col : utils::StringUtils::splitAndTrimRemovingEmpty(context.getProperty(MaxValueColumnNames).value_or(""), ",")) {
sql::SQLColumnIdentifier col_id(raw_col);
if (!queried_columns_.empty() && !return_columns_.contains(col_id)) {
// columns will be explicitly enumerated, we need to add the max value columns as it is not yet queried
queried_columns_ += ", ";
queried_columns_ += raw_col;
}
max_value_columns_.push_back(std::move(col_id));
}
initializeMaxValues(context);
}
void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) {
const auto& selectQuery = buildSelectQuery();
logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
auto statement = connection_->prepareStatement(selectQuery);
auto rowset = statement->execute();
std::unordered_map<sql::SQLColumnIdentifier, std::string> new_max_values = max_values_;
sql::MaxCollector maxCollector{selectQuery, new_max_values};
auto column_filter = [&] (const std::string& column_name) {
return return_columns_.empty() || return_columns_.contains(sql::SQLColumnIdentifier(column_name));
};
sql::JSONSQLWriter json_writer{output_format_ == OutputType::JSONPretty, column_filter};
FlowFileGenerator flow_file_creator{session, json_writer};
sql::SQLRowsetProcessor sql_rowset_processor(std::move(rowset), {json_writer, maxCollector, flow_file_creator});
while (size_t row_count = sql_rowset_processor.process(max_rows_)) {
auto new_file = flow_file_creator.getLastFlowFile();
gsl_Expects(new_file);
new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));
new_file->addAttribute(RESULT_TABLE_NAME, table_name_);
}
// the updated max_values and the total number of flow_files is available from here
for (auto& new_file : flow_file_creator.getFlowFiles()) {
session.transfer(new_file, Success);
for (const auto& max_column : max_value_columns_) {
new_file->addAttribute("maxvalue." + max_column.str(), new_max_values[max_column]);
}
}
if (new_max_values != max_values_) {
max_values_ = new_max_values;
saveState();
}
}
bool QueryDatabaseTable::loadMaxValuesFromStoredState(const std::unordered_map<std::string, std::string> &state) {
std::unordered_map<sql::SQLColumnIdentifier, std::string> new_max_values;
if (!state.contains(TABLENAME_KEY)) {
logger_->log_info("State does not specify the table name.");
return false;
}
if (state.at(TABLENAME_KEY) != table_name_) {
logger_->log_info("Querying new table \"%s\", resetting state.", table_name_);
return false;
}
for (auto& elem : state) {
if (utils::StringUtils::startsWith(elem.first, MAXVALUE_KEY_PREFIX)) {
sql::SQLColumnIdentifier column_name(elem.first.substr(MAXVALUE_KEY_PREFIX.length()));
// add only those columns that we care about
if (std::find(max_value_columns_.begin(), max_value_columns_.end(), column_name) != max_value_columns_.end()) {
new_max_values.emplace(column_name, elem.second);
} else {
logger_->log_info("State contains obsolete maximum-value column \"%s\", resetting state.", column_name.str());
return false;
}
}
}
for (auto& column : max_value_columns_) {
if (new_max_values.find(column) == new_max_values.end()) {
logger_->log_info("New maximum-value column \"%s\" specified, resetting state.", column.str());
return false;
}
}
max_values_ = new_max_values;
return true;
}
void QueryDatabaseTable::initializeMaxValues(core::ProcessContext &context) {
max_values_.clear();
std::unordered_map<std::string, std::string> stored_state;
if (!state_manager_->get(stored_state)) {
logger_->log_info("Found no stored state");
} else {
if (!loadMaxValuesFromStoredState(stored_state)) {
state_manager_->clear();
}
}
for (const auto& column_name : max_value_columns_) {
// initialize column values
max_values_[column_name];
}
loadMaxValuesFromDynamicProperties(context);
}
void QueryDatabaseTable::loadMaxValuesFromDynamicProperties(core::ProcessContext &context) {
const auto dynamic_prop_keys = context.getDynamicPropertyKeys();
logger_->log_info("Received %zu dynamic properties", dynamic_prop_keys.size());
for (const auto& key : dynamic_prop_keys) {
if (!utils::StringUtils::startsWith(key, InitialMaxValueDynamicPropertyPrefix)) {
throw minifi::Exception(PROCESSOR_EXCEPTION, "QueryDatabaseTable: Unsupported dynamic property \"" + key + "\"");
}
sql::SQLColumnIdentifier column_name(key.substr(InitialMaxValueDynamicPropertyPrefix.length()));
auto it = max_values_.find(column_name);
if (it == max_values_.end()) {
logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", column_name.str());
continue;
}
// do not overwrite existing max value
if (!it->second.empty()) {
continue;
}
std::string value;
if (context.getDynamicProperty(key, value) && !value.empty()) {
it->second = value;
logger_->log_info("Setting initial maximum value of %s to %s", column_name.str(), value);
}
}
}
std::string QueryDatabaseTable::buildSelectQuery() {
std::string query = "select " + (queried_columns_.empty() ? "*" : queried_columns_) + " from " + table_name_;
std::vector<std::string> where_clauses;
for (size_t index = 0; index < max_value_columns_.size(); index++) {
const auto& column_name = max_value_columns_[index];
const auto& max_value = max_values_[column_name];
if (max_value.empty()) {
// max value has not been set for this column
continue;
}
// Logic to differentiate ">" vs ">=" based on index is copied from:
// https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
// (under comment "Add a condition for the WHERE clause"). And implementation explanation: https://issues.apache.org/jira/browse/NIFI-2712.
where_clauses.push_back(utils::StringUtils::join_pack(column_name.str(), index == 0 ? " > " : " >= ", max_value));
}
if (!extra_where_clause_.empty()) {
where_clauses.push_back(extra_where_clause_);
}
if (!where_clauses.empty()) {
query += " where " + utils::StringUtils::join(" and ", where_clauses);
}
return query;
}
bool QueryDatabaseTable::saveState() {
std::unordered_map<std::string, std::string> state_map;
state_map.emplace(TABLENAME_KEY, table_name_);
for (const auto& item : max_values_) {
state_map.emplace(MAXVALUE_KEY_PREFIX + item.first.str(), item.second);
}
return state_manager_->set(state_map);
}
} // namespace org::apache::nifi::minifi::processors