blob: 3bccf8422f2af1641f08963888826d63bec83c38 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <unordered_map>
#include <tuple>
#include "SQLRowSubscriber.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace sql {
class MaxCollector: public SQLRowSubscriber {
void beginProcessRow() override {}
void endProcessRow() override {
if (columnsVerified_) {
return;
}
if (countColumns_ != mapState_.size())
throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
columnsVerified_ = true;
}
void processColumnName(const std::string& name) override {
if (columnsVerified_) {
return;
}
if (mapState_.count(name)) {
countColumns_++;
}
}
void processColumn(const std::string& name, const std::string& value) override {
updateMaxValue(name, '\'' + value + '\'');
}
void processColumn(const std::string& name, double value) override {
updateMaxValue(name, value);
}
void processColumn(const std::string& name, int value) override {
updateMaxValue(name, value);
}
void processColumn(const std::string& name, long long value) override {
updateMaxValue(name, value);
}
void processColumn(const std::string& name, unsigned long long value) override {
updateMaxValue(name, value);
}
void processColumn(const std::string& name, const char* value) override {}
template <typename T>
struct MaxValue {
void updateMaxValue(const std::string& name, const T& value) {
const auto it = mapColumnNameValue_.find(name);
if (it == mapColumnNameValue_.end()) {
mapColumnNameValue_.insert({ name, value });
} else {
if (value > it->second) {
it->second = value;
}
}
}
std::unordered_map<std::string, T> mapColumnNameValue_;
};
template <typename Tuple, int Index>
struct UpdateMapState {
UpdateMapState(const Tuple& tpl, std::unordered_map<std::string, std::string>& mapState) {
for (auto& el : mapState) {
const auto& maxVal = std::get<Index>(tpl);
const auto it = maxVal.mapColumnNameValue_.find(el.first);
if (it != maxVal.mapColumnNameValue_.end()) {
std::stringstream ss;
ss << it->second;
el.second = ss.str();
}
}
UpdateMapState<Tuple, Index - 1>(tpl, mapState);
}
};
template <typename Tuple>
struct UpdateMapState<Tuple, -1> {
UpdateMapState(const Tuple&, std::unordered_map<std::string, std::string>&) {}
};
template <typename ...Ts>
struct MaxValues : public std::tuple<MaxValue<Ts>...> {
constexpr static size_t size = sizeof...(Ts);
};
public:
MaxCollector(const std::string& selectQuery, const std::string& maxValueColumnNames, std::unordered_map<std::string, std::string>& mapState)
:selectQuery_(selectQuery), maxValueColumnNames_(maxValueColumnNames), mapState_(mapState) {
}
template <typename T>
void updateMaxValue(const std::string& columnName, const T& value) {
if (mapState_.count(columnName)) {
std::get<MaxValue<T>>(maxValues_).updateMaxValue(columnName, value);
}
}
bool updateMapState() {
auto mapState = mapState_;
UpdateMapState<decltype(maxValues_), decltype(maxValues_)::size - 1>(maxValues_, mapState_);
return mapState != mapState_;
}
private:
const std::string selectQuery_;
const std::string maxValueColumnNames_;
std::unordered_map<std::string, std::string>& mapState_;
MaxValues<std::string, double, int, long long, unsigned long long> maxValues_;
size_t countColumns_{};
bool columnsVerified_{false};
};
} /* namespace sql */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */