blob: b172a859b85ec6f156f90c1ce744771dcc5da5a5 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <unordered_map>
#include <tuple>
#include <vector>
#include <sstream>
#include "SQLRowSubscriber.h"
#include "SQLColumnIdentifier.h"
namespace org::apache::nifi::minifi::sql {
class MaxCollector: public SQLRowSubscriber {
void beginProcessBatch() override {}
void endProcessBatch() override {}
void beginProcessRow() override {}
void endProcessRow() override {}
void finishProcessing() override {
updateMapState();
}
void processColumnNames(const std::vector<std::string>& names) override {
for (const auto& expected : state_) {
if (std::find(names.begin(), names.end(), expected.first) == names.end()) {
throw minifi::Exception(PROCESSOR_EXCEPTION,
"Column '" + expected.first.str() + "' is not found in the columns of '" + query_ + "' result.");
}
}
}
void processColumn(const std::string& name, const std::string& value) override {
updateMaxValue(name, '\'' + value + '\'');
}
void processColumn(const std::string& name, double value) override {
updateMaxValue(name, value);
}
void processColumn(const std::string& name, int value) override {
updateMaxValue(name, value);
}
void processColumn(const std::string& name, long long value) override { // NOLINT(runtime/int)
updateMaxValue(name, value);
}
void processColumn(const std::string& name, unsigned long long value) override { // NOLINT(runtime/int)
updateMaxValue(name, value);
}
void processColumn(const std::string& /*name*/, const char* /*value*/) override {}
template <typename T>
class MaxValue {
public:
void updateMaxValue(const SQLColumnIdentifier& column, const T& value) {
const auto it = column_maxima.find(column);
if (it == column_maxima.end()) {
column_maxima.emplace(column, value);
} else {
if (value > it->second) {
it->second = value;
}
}
}
protected:
void updateStateImpl(std::unordered_map<SQLColumnIdentifier, std::string>& state) const {
for (auto& curr_column_max : state) {
const auto it = column_maxima.find(curr_column_max.first);
if (it != column_maxima.end()) {
std::stringstream ss;
ss << it->second;
curr_column_max.second = ss.str();
}
}
}
private:
std::unordered_map<SQLColumnIdentifier, T> column_maxima;
};
template <typename ...Ts>
struct MaxValues : public MaxValue<Ts>... {
void updateState(std::unordered_map<SQLColumnIdentifier, std::string>& state) const {
(void)(std::initializer_list<int>{(MaxValue<Ts>::updateStateImpl(state), 0)...});
}
};
public:
MaxCollector(std::string query, std::unordered_map<SQLColumnIdentifier, std::string>& state)
:query_(std::move(query)), state_(state) {
}
template <typename T>
void updateMaxValue(const std::string& column_name, const T& value) {
SQLColumnIdentifier column_id(column_name);
if (state_.count(column_id)) {
max_values_.MaxValue<T>::updateMaxValue(column_id, value);
}
}
void updateMapState() {
max_values_.updateState(state_);
}
private:
const std::string query_;
std::unordered_map<SQLColumnIdentifier, std::string>& state_;
MaxValues<std::string, double, int, long long, unsigned long long> max_values_; // NOLINT(runtime/int)
};
} // namespace org::apache::nifi::minifi::sql