blob: 5c5b79d38c41b33062dd3910416f75b11198a8f7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutSQL.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property PutSQL::ConnectionURL( // NOLINT
"Connection URL",
"The database URL to connect to",
"");
core::Property PutSQL::SQLStatement( // NOLINT
"SQL Statement",
"The SQL statement to execute",
"");
core::Property PutSQL::BatchSize( // NOLINT
"Batch Size",
"The maximum number of flow files to process in one batch",
"1");
core::Relationship PutSQL::Success( // NOLINT
"success",
"After a successful put SQL operation, FlowFiles are sent here");
core::Relationship PutSQL::Retry( // NOLINT
"retry",
"Failures which might work if retried");
core::Relationship PutSQL::Failure( // NOLINT
"failure",
"Failures which will not work if retried");
void PutSQL::initialize() {
std::set<core::Property> properties;
properties.insert(ConnectionURL);
properties.insert(BatchSize);
properties.insert(SQLStatement);
setSupportedProperties(std::move(properties));
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Retry);
relationships.insert(Failure);
setSupportedRelationships(std::move(relationships));
}
void PutSQL::onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory) {
context->getProperty(ConnectionURL.getName(), db_url_);
if (db_url_.empty()) {
logger_->log_error("Invalid Connection URL");
}
std::string batch_size;
context->getProperty(BatchSize.getName(), batch_size);
if (batch_size.empty()) {
batch_size_ = 100;
} else {
batch_size_ = std::stoull(batch_size);
}
context->getProperty(SQLStatement.getName(), sql_);
}
void PutSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session) {
std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
if (!flow_file) {
return;
}
uint64_t batch_processed = 1;
try {
// Use an existing context, if one is available
std::shared_ptr<minifi::sqlite::SQLiteConnection> db;
if (conn_q_.try_dequeue(db)) {
logger_->log_debug("Using available SQLite connection");
}
if (!db) {
logger_->log_info("Creating new SQLite connection");
if (db_url_.substr(0, 9) == "sqlite://") {
db = std::make_shared<minifi::sqlite::SQLiteConnection>(db_url_.substr(9));
} else {
std::stringstream err_msg;
err_msg << "Connection URL '" << db_url_ << "' is unsupported";
logger_->log_error(err_msg.str().c_str());
throw std::runtime_error("Connection Error");
}
}
do {
auto sql = std::make_shared<std::string>();
if (sql_.empty()) {
// SQL is not defined as a property, so get SQL from the file content
SQLReadCallback cb(sql);
session->read(flow_file, &cb);
} else {
// SQL is defined as a property, so get the property dynamically w/ EL support
context->getProperty(SQLStatement.getName(), *sql, flow_file);
}
auto stmt = db->prepare(*sql);
for (uint64_t i = 1; i < UINT64_MAX; i++) {
std::string val;
std::stringstream val_key;
val_key << "sql.args." << i << ".value";
if (!flow_file->getAttribute(val_key.str(), val)) {
break;
}
stmt.bind_text(i, val);
}
stmt.step();
if (!stmt.is_ok()) {
logger_->log_error("SQL statement execution failed: %s", db->errormsg());
session->transfer(flow_file, Failure);
}
session->transfer(flow_file, Success);
flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
if (!flow_file) {
logger_->log_info("Processed %d in batch", batch_processed);
break;
}
batch_processed++;
} while (batch_processed < batch_size_);
// Make connection available for use again
if (conn_q_.size_approx() < getMaxConcurrentTasks()) {
logger_->log_debug("Releasing SQLite connection");
conn_q_.enqueue(db);
} else {
logger_->log_info("Destroying SQLite connection because it is no longer needed");
}
} catch (std::exception &exception) {
logger_->log_error("Caught Exception %s", exception.what());
session->transfer(flow_file, Failure);
this->yield();
} catch (...) {
logger_->log_error("Caught Exception");
session->transfer(flow_file, Failure);
this->yield();
}
}
int64_t PutSQL::SQLReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
sql_->resize(stream->getSize());
auto num_read = static_cast<uint64_t >(stream->readData(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
static_cast<int>(stream->getSize())));
if (num_read != stream->getSize()) {
throw std::runtime_error("SQLReadCallback failed to fully read flow file input stream");
}
return num_read;
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */