blob: c4194847dab3627ce4df1d6e406149154ce7a00f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SQLRowsetProcessor.h"
#include "Exception.h"
#include "utils/StringUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace sql {
SQLRowsetProcessor::SQLRowsetProcessor(std::unique_ptr<Rowset> rowset, std::vector<std::reference_wrapper<SQLRowSubscriber>> row_subscribers)
: rowset_(std::move(rowset)), row_subscribers_(std::move(row_subscribers)) {
rowset_->reset();
}
size_t SQLRowsetProcessor::process(size_t max) {
size_t count = 0;
for (const auto& subscriber : row_subscribers_) {
subscriber.get().beginProcessBatch();
}
while (!rowset_->is_done()) {
addRow(rowset_->getCurrent(), count);
rowset_->next();
count++;
if (max > 0 && count >= max) {
break;
}
}
for (const auto& subscriber : row_subscribers_) {
subscriber.get().endProcessBatch();
if (count == 0) {
subscriber.get().finishProcessing();
}
}
return count;
}
void SQLRowsetProcessor::addRow(const Row& row, size_t rowCount) {
for (const auto& subscriber : row_subscribers_) {
subscriber.get().beginProcessRow();
}
if (rowCount == 0) {
std::vector<std::string> column_names;
column_names.reserve(row.size());
for (std::size_t i = 0; i != row.size(); ++i) {
column_names.push_back(row.getColumnName(i));
}
for (const auto& subscriber : row_subscribers_) {
subscriber.get().processColumnNames(column_names);
}
}
for (std::size_t i = 0; i != row.size(); ++i) {
const auto& name = row.getColumnName(i);
if (row.isNull(i)) {
processColumn(name, "NULL");
} else {
switch (row.getDataType(i)) {
case DataType::STRING: {
processColumn(name, row.getString(i));
}
break;
case DataType::DOUBLE: {
processColumn(name, row.getDouble(i));
}
break;
case DataType::INTEGER: {
processColumn(name, row.getInteger(i));
}
break;
case DataType::LONG_LONG: {
processColumn(name, row.getLongLong(i));
}
break;
case DataType::UNSIGNED_LONG_LONG: {
processColumn(name, row.getUnsignedLongLong(i));
}
break;
case DataType::DATE: {
const std::tm when = row.getDate(i);
char value[128];
if (!std::strftime(value, sizeof(value), "%Y-%m-%d %H:%M:%S", &when))
throw minifi::Exception(PROCESSOR_EXCEPTION, "SQLRowsetProcessor: !strftime.");
processColumn(name, std::string(value));
}
break;
default: {
throw minifi::Exception(PROCESSOR_EXCEPTION, "SQLRowsetProcessor: Unsupported data type in column");
}
}
}
}
for (const auto& subscriber : row_subscribers_) {
subscriber.get().endProcessRow();
}
}
} /* namespace sql */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */