avoid empty std::exception, improve logging
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #943
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index f97b004..99a9296 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -50,7 +50,7 @@
void HttpStream::seek(uint64_t offset) {
// seek is an unnecessary part of this implementatino
- throw std::exception();
+ throw std::logic_error{"HttpStream::seek is unimplemented"};
}
// data stream overrides
diff --git a/extensions/http-curl/sitetosite/PeersEntity.h b/extensions/http-curl/sitetosite/PeersEntity.h
index b5c651b..7d60bf9 100644
--- a/extensions/http-curl/sitetosite/PeersEntity.h
+++ b/extensions/http-curl/sitetosite/PeersEntity.h
@@ -77,8 +77,9 @@
} else if (utils::StringUtils::equalsIgnoreCase(secureStr, "false")) {
secure = false;
} else {
- logger->log_error("could not parse secure string %s", secureStr);
- throw std::exception();
+ const auto err = utils::StringUtils::join_pack("could not parse secure string ", secureStr);
+ logger->log_error("%s", err);
+ throw std::logic_error{err};
}
} else {
logger->log_warn("Invalid value type for secure, assuming false (rapidjson type id %i)",
diff --git a/extensions/pcap/CapturePacket.cpp b/extensions/pcap/CapturePacket.cpp
index bdb0dd3..b20d881 100644
--- a/extensions/pcap/CapturePacket.cpp
+++ b/extensions/pcap/CapturePacket.cpp
@@ -102,7 +102,7 @@
CapturePacketMechanism *new_capture = new CapturePacketMechanism(base_path, generate_new_pcap(base_path), max_size);
new_capture->writer_ = new pcpp::PcapFileWriterDevice(new_capture->getFile().c_str());
if (!new_capture->writer_->open())
- throw std::exception();
+ throw std::runtime_error{utils::StringUtils::join_pack("Failed to open PcapFileWriterDevice with file ", new_capture->getFile())};
return new_capture;
}
@@ -207,7 +207,7 @@
if (IsNullOrEmpty(devList)) {
logger_->log_error("Could not open any devices");
- throw std::exception();
+ throw std::runtime_error{"Pcap: could not open any devices"};
}
}
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index b806b8e..a8cf579 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -16,29 +16,22 @@
* limitations under the License.
*/
#include "GetTCP.h"
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <stdio.h>
-#include <limits.h>
#ifndef WIN32
#include <dirent.h>
-#include <unistd.h>
-#include <regex.h>
#endif
-#include <vector>
-#include <queue>
-#include <map>
-#include <memory>
-#include <utility>
-#include <set>
-#include <sstream>
-#include <string>
-#include <iostream>
#include <cinttypes>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <utility>
+#include <vector>
+#include <set>
+#include <string>
#include "io/ClientSocket.h"
+#include "utils/gsl.h"
#include "utils/StringUtils.h"
#include "utils/TimeUtil.h"
#include "core/ProcessContext.h"
@@ -260,7 +253,7 @@
}
auto portStr = hostAndPort.at(1);
- auto endpoint = realizedHost + ":" + portStr;
+ auto endpoint = utils::StringUtils::join_pack(realizedHost, ":", portStr);
auto endPointFuture = live_clients_.find(endpoint);
// does not exist
@@ -285,7 +278,7 @@
} else {
logger_->log_error("Could not create socket for %s", endpoint);
}
- std::future<int> *future = new std::future<int>();
+ auto* future = new std::future<int>();
std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
if (client_thread_pool_.execute(std::move(functor), *future)) {
@@ -294,7 +287,7 @@
} else {
if (!endPointFuture->second->valid()) {
delete endPointFuture->second;
- std::future<int> *future = new std::future<int>();
+ auto* future = new std::future<int>();
std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
if (client_thread_pool_.execute(std::move(functor), *future)) {
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index e582d44..b7e60eb 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -21,6 +21,7 @@
#include <map>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include <atomic>
@@ -43,19 +44,19 @@
class SocketAfterExecute : public utils::AfterExecute<int> {
public:
- explicit SocketAfterExecute(std::atomic<bool> &running, const std::string &endpoint, std::map<std::string, std::future<int>*> *list, std::mutex *mutex)
+ explicit SocketAfterExecute(std::atomic<bool> &running, std::string endpoint, std::map<std::string, std::future<int>*> *list, std::mutex *mutex)
: running_(running.load()),
- endpoint_(endpoint),
+ endpoint_(std::move(endpoint)),
mutex_(mutex),
list_(list) {
}
- explicit SocketAfterExecute(SocketAfterExecute && other) {
- }
+ SocketAfterExecute(const SocketAfterExecute&&) = delete;
+ SocketAfterExecute& operator=(const SocketAfterExecute&) = delete;
- ~SocketAfterExecute() = default;
+ ~SocketAfterExecute() override = default;
- virtual bool isFinished(const int &result) {
+ bool isFinished(const int &result) override {
if (result == -1 || result == 0 || !running_) {
std::lock_guard<std::mutex> lock(*mutex_);
list_->erase(endpoint_);
@@ -64,14 +65,14 @@
return false;
}
}
- virtual bool isCancelled(const int &result) {
+ bool isCancelled(const int &result) override {
if (!running_)
return true;
else
return false;
}
- virtual std::chrono::milliseconds wait_time() {
+ std::chrono::milliseconds wait_time() override {
// wait 500ms
return std::chrono::milliseconds(500);
}
@@ -90,9 +91,9 @@
size_(size) {
}
- virtual ~DataHandlerCallback() = default;
+ ~DataHandlerCallback() override = default;
- virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
return stream->write(message_, size_);
}
@@ -104,7 +105,7 @@
class DataHandler {
public:
DataHandler(std::shared_ptr<core::ProcessSessionFactory> sessionFactory) // NOLINT
- : sessionFactory_(sessionFactory) {
+ : sessionFactory_(std::move(sessionFactory)) {
}
static const char *SOURCE_ENDPOINT_ATTRIBUTE;
@@ -118,23 +119,17 @@
public:
GetTCPMetrics()
: state::response::ResponseNode("GetTCPMetrics") {
- iterations_ = 0;
- accepted_files_ = 0;
- input_bytes_ = 0;
}
- GetTCPMetrics(std::string name, utils::Identifier &uuid)
+ GetTCPMetrics(const std::string& name, utils::Identifier &uuid)
: state::response::ResponseNode(name, uuid) {
- iterations_ = 0;
- accepted_files_ = 0;
- input_bytes_ = 0;
}
- virtual ~GetTCPMetrics() = default;
- virtual std::string getName() const {
+ ~GetTCPMetrics() override = default;
+ std::string getName() const override {
return core::Connectable::getName();
}
- virtual std::vector<state::response::SerializedResponseNode> serialize() {
+ std::vector<state::response::SerializedResponseNode> serialize() override {
std::vector<state::response::SerializedResponseNode> resp;
state::response::SerializedResponseNode iter;
@@ -161,9 +156,9 @@
protected:
friend class GetTCP;
- std::atomic<size_t> iterations_;
- std::atomic<size_t> accepted_files_;
- std::atomic<size_t> input_bytes_;
+ std::atomic<size_t> iterations_{0};
+ std::atomic<size_t> accepted_files_{0};
+ std::atomic<size_t> input_bytes_{0};
};
// GetTCP Class
@@ -173,7 +168,7 @@
/*!
* Create a new processor
*/
- explicit GetTCP(std::string name, utils::Identifier uuid = utils::Identifier())
+ explicit GetTCP(const std::string& name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid),
running_(false),
stay_connected_(true),
@@ -187,7 +182,7 @@
metrics_ = std::make_shared<GetTCPMetrics>();
}
// Destructor
- virtual ~GetTCP() = default;
+ ~GetTCP() override = default;
// Processor Name
static constexpr char const* ProcessorName = "GetTCP";
@@ -212,29 +207,29 @@
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
- void onSchedule(core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
- throw std::exception();
+ void onSchedule(core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) override {
+ throw std::logic_error{"GetTCP::onSchedule(ProcessContext*, ProcessSessionFactory*) is unimplemented"};
}
/**
* Execution trigger for the GetTCP Processor
* @param context processor context
* @param session processor session reference.
*/
- virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- throw std::exception();
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+ throw std::logic_error{"GetTCP::onTrigger(ProcessContext*, ProcessSession*) is unimplemented"};
}
// Initialize, over write by NiFi GetTCP
- virtual void initialize(void);
+ void initialize() override;
- int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector);
+ int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) override;
protected:
- virtual void notifyStop();
+ void notifyStop() override;
private:
std::function<int()> f_ex;
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 82703b2..2aedabf 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -36,6 +36,7 @@
#include <string>
#include <memory>
#include <vector>
+#include <stdexcept>
#include "io/BaseStream.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
@@ -240,7 +241,7 @@
static WSADATA s_wsaData;
const int iWinSockInitResult = WSAStartup(MAKEWORD(2, 2), &s_wsaData);
if (0 != iWinSockInitResult) {
- throw std::exception("Cannot start client");
+ throw std::runtime_error("Cannot start client");
}
}
~SocketInitializer() noexcept {
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 86c8384..6294a2f 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -247,11 +247,12 @@
onTrigger(context, session.get());
session->commit();
} catch (std::exception &exception) {
- logger_->log_warn("Caught Exception %s during Processor::onTrigger of processor: %s (%s)", exception.what(), getUUIDStr(), getName());
+ logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)",
+ exception.what(), typeid(exception).name(), getUUIDStr(), getName());
session->rollback();
throw;
} catch (...) {
- logger_->log_warn("Caught Exception during Processor::onTrigger of processor: %s (%s)", getUUIDStr(), getName());
+ logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: %s (%s)", getUUIDStr(), getName());
session->rollback();
throw;
}
@@ -265,11 +266,12 @@
onTrigger(context, session);
session->commit();
} catch (std::exception &exception) {
- logger_->log_warn("Caught Exception %s during Processor::onTrigger of processor: %s (%s)", exception.what(), getUUIDStr(), getName());
+ logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)",
+ exception.what(), typeid(exception).name(), getUUIDStr(), getName());
session->rollback();
throw;
} catch (...) {
- logger_->log_warn("Caught Exception during Processor::onTrigger of processor: %s (%s)", getUUIDStr(), getName());
+ logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: %s (%s)", getUUIDStr(), getName());
session->rollback();
throw;
}
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 541cbe3..52bdae1 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -140,7 +140,7 @@
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
if (nullptr == ptr) {
- throw std::exception();
+ throw std::runtime_error{fmt::format("Failed to instantiate processor name: {0} uuid: {1}", processor_name, uuid.to_string().c_str())};
}
std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);