blob: 71b0e6f55a2982df022df64f55855a8e9e4c091f [file] [log] [blame]
/**
* @file QueryDatabaseTable.cpp
* PutSQL class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "QueryDatabaseTable.h"
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sstream>
#include <cstdio>
#include <string>
#include <iostream>
#include <memory>
#include <codecvt>
#include <algorithm>
#include <regex>
#include <soci/soci.h>
#include "io/BufferStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "Exception.h"
#include "utils/OsUtils.h"
#include "data/DatabaseConnectors.h"
#include "data/JSONSQLWriter.h"
#include "data/SQLRowsetProcessor.h"
#include "data/WriteCallback.h"
#include "data/MaxCollector.h"
#include "data/Utils.h"
#include "utils/file/FileUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
const core::Property QueryDatabaseTable::s_tableName(
core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
const core::Property QueryDatabaseTable::s_columnNames(
core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
"A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
"If no column names are supplied, all columns in the specified table will be returned. "
"NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
const core::Property QueryDatabaseTable::s_maxValueColumnNames(
core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
"A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
"Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
"Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
"This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. "
"Note that some ODBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. "
"If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
"NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
"NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
"there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
supportsExpressionLanguage(true)->build());
const core::Property QueryDatabaseTable::s_whereClause(
core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
"A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
const core::Property QueryDatabaseTable::s_sqlQuery(
core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
"A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
"Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
"The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
"If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
const core::Property QueryDatabaseTable::s_stateDirectory(
core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
static const std::string ResultTableName = "tablename";
static const std::string ResultRowCount = "querydbtable.row.count";
static const std::string TABLENAME_KEY = "tablename";
static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
// State
class LegacyState {
public:
LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
:tableName_(tableName), logger_(logger) {
filePath_ = utils::file::FileUtils::concat_path(
utils::file::FileUtils::concat_path(
utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
if (!getStateFromFile())
return;
ok_ = true;
}
explicit operator bool() const {
return ok_;
}
const std::unordered_map<std::string, std::string>& getStateMap() const {
return mapState_;
}
bool moveStateFileToMigrated() {
if (!ok_) {
return false;
}
return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
}
private:
static const std::string separator_;
bool getStateFromFile() {
std::string state;
std::ifstream file(filePath_);
if (!file) {
return false;
}
std::stringstream ss;
ss << file.rdbuf();
state = ss.str();
file.close();
std::vector<std::string> listColumnNameValue;
size_t pos = state.find(separator_, 0);
if (pos == std::string::npos) {
logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
mapState_.clear();
return false;
}
auto tableName = state.substr(0, pos);
if (tableName != tableName_) {
logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
mapState_.clear();
return false;
}
pos += separator_.size();
while (true) {
auto newPos = state.find(separator_, pos);
if (newPos == std::string::npos)
break;
const std::string& columnNameValue = state.substr(pos, newPos - pos);
listColumnNameValue.emplace_back(columnNameValue);
pos = newPos + separator_.size();
}
for (const auto& columnNameValue : listColumnNameValue) {
const auto posEQ = columnNameValue.find('=');
if (posEQ == std::string::npos) {
logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
mapState_.clear();
return false;
}
const auto& name = columnNameValue.substr(0, posEQ);
const auto& value = columnNameValue.substr(posEQ + 1);
mapState_.insert({ name, value });
}
return true;
}
private:
std::unordered_map<std::string, std::string> mapState_;
std::shared_ptr<logging::Logger> logger_;
std::string filePath_;
std::string tableName_;
bool ok_{};
};
const std::string LegacyState::separator_ = "@!qdt!@";
// QueryDatabaseTable
QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
: SQLProcessor(name, uuid) {
}
QueryDatabaseTable::~QueryDatabaseTable() = default;
void QueryDatabaseTable::initialize() {
//! Set the supported properties
setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
//! Set the supported relationships
setSupportedRelationships({ s_success });
}
void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
initOutputFormat(context);
context.getProperty(s_tableName.getName(), tableName_);
context.getProperty(s_columnNames.getName(), columnNames_);
context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
context.getProperty(s_whereClause.getName(), whereClause_);
context.getProperty(s_sqlQuery.getName(), sqlQuery_);
context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
mapState_.clear();
state_manager_ = context.getStateManager();
if (state_manager_ == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
std::unordered_map<std::string, std::string> state_map;
if (state_manager_->get(state_map)) {
if (state_map[TABLENAME_KEY] != tableName_) {
state_manager_->clear();
} else {
for (auto&& elem : state_map) {
if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), std::move(elem.second));
}
}
}
} else {
// Try to migrate legacy state file
std::string stateDir;
context.getProperty(s_stateDirectory.getName(), stateDir);
if (!stateDir.empty()) {
LegacyState legacyState(tableName_, stateDir, getUUIDStr(), logger_);
if (legacyState) {
mapState_ = legacyState.getStateMap();
if (saveState() && state_manager_->persist()) {
logger_->log_info("State migration successful");
legacyState.moveStateFileToMigrated();
} else {
logger_->log_warn("Failed to persists migrated state");
}
} else {
logger_->log_warn("Could not migrate state from specified State Directory %s", stateDir);
}
}
}
// If 'listMaxValueColumnName_' doesn't match columns in mapState_, then clear mapState_.
if (listMaxValueColumnName_.size() != mapState_.size()) {
mapState_.clear();
} else {
for (const auto& columName : listMaxValueColumnName_) {
if (0 == mapState_.count(columName)) {
mapState_.clear();
break;
}
}
}
// Add in 'mapState_' new columns which are in 'listMaxValueColumnName_'.
for (const auto& maxValueColumnName: listMaxValueColumnName_) {
if (0 == mapState_.count(maxValueColumnName)) {
mapState_.insert({maxValueColumnName, std::string()});
}
}
const auto dynamic_prop_keys = context.getDynamicPropertyKeys();
logger_->log_info("Received %zu dynamic properties", dynamic_prop_keys.size());
// If the stored state for a max value column is empty, populate it with the corresponding initial max value, if it exists.
for (const auto& key : dynamic_prop_keys) {
if (std::string::npos == key.rfind(s_initialMaxValueDynamicPropertyPrefix, 0)) {
throw minifi::Exception(PROCESSOR_EXCEPTION, "QueryDatabaseTable: Unsupported dynamic property \"" + key + "\"");
}
const auto columnName = utils::toLower(key.substr(s_initialMaxValueDynamicPropertyPrefix.length()));
auto it = mapState_.find(columnName);
if (it == mapState_.end()) {
logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", columnName);
continue;
}
if (!it->second.empty()) {
continue;
}
std::string value;
if (context.getDynamicProperty(key, value) && !value.empty()) {
it->second = value;
logger_->log_info("Setting initial maximum value of %s to %s", columnName, value);
}
}
}
void QueryDatabaseTable::processOnTrigger(core::ProcessSession& session) {
const auto& selectQuery = getSelectQuery();
logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
auto statement = connection_->prepareStatement(selectQuery);
auto rowset = statement->execute();
int count = 0;
size_t rowCount = 0;
sql::MaxCollector maxCollector(selectQuery, maxValueColumnNames_, mapState_);
sql::JSONSQLWriter sqlWriter(isJSONPretty());
sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {&sqlWriter, &maxCollector});
// Process rowset.
do {
rowCount = sqlRowsetProcessor.process(maxRowsPerFlowFile_ == 0 ? std::numeric_limits<size_t>::max() : maxRowsPerFlowFile_);
count++;
if (rowCount == 0)
break;
const auto output = sqlWriter.toString();
if (!output.empty()) {
WriteCallback writer(output);
auto newflow = session.create();
newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
newflow->addAttribute(ResultTableName, tableName_);
session.write(newflow, &writer);
session.transfer(newflow, s_success);
}
} while (rowCount > 0);
const auto mapState = mapState_;
if (maxCollector.updateMapState()) {
try {
session.commit();
} catch (std::exception& e) {
mapState_ = mapState;
throw;
}
saveState();
}
}
std::string QueryDatabaseTable::getSelectQuery() {
std::string ret;
if (sqlQuery_.empty()) {
std::string columns;
if (columnNames_.empty()) {
columns = "*";
} else {
columns = columnNames_;
}
ret = "select " + columns + " from " + tableName_;
} else {
ret = sqlQuery_;
}
std::string whereClauses;
if (!mapState_.empty() && !listMaxValueColumnName_.empty()) {
for (auto index = 0U; index < listMaxValueColumnName_.size(); index++) {
const auto& columnName = listMaxValueColumnName_[index];
const auto itState = mapState_.find(columnName);
const auto& maxValue = itState->second;
if (maxValue.empty()) {
continue;
}
// Logic to differentiate ">" vs ">=" based on index is copied from:
// https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
// (under comment "Add a condition for the WHERE clause"). And implementation explanation: https://issues.apache.org/jira/browse/NIFI-2712.
if (index == 0) {
whereClauses += columnName + " > ";
} else {
whereClauses += " and " + columnName + " >= ";
}
whereClauses += maxValue;
}
}
if (!whereClause_.empty()) {
whereClauses += " and " + whereClause_;
}
if (!whereClauses.empty()) {
ret += " where " + whereClauses;
}
return ret;
}
bool QueryDatabaseTable::saveState() {
std::unordered_map<std::string, std::string> state_map;
state_map.emplace(TABLENAME_KEY, tableName_);
for (const auto& elem : mapState_) {
state_map.emplace(MAXVALUE_KEY_PREFIX + elem.first, elem.second);
}
return state_manager_->set(state_map);
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */