MINIFICPP-1255 - Set absolute timeout and also make sure that no timeout is initialized to zero.
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #810
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 837f3b7..518da4a 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -241,10 +241,13 @@
bool HTTPClient::submit() {
if (IsNullOrEmpty(url_))
return false;
+
+ int absoluteTimeout = std::max(0, 3 * static_cast<int>(read_timeout_ms_.count()));
+
curl_easy_setopt(http_session_, CURLOPT_NOSIGNAL, 1);
- if (connect_timeout_ms_.count() > 0) {
- curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT_MS, connect_timeout_ms_.count());
- }
+ // setting it to 0 will result in the default 300 second timeout
+ curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT_MS, std::max(0, static_cast<int>(connect_timeout_ms_.count())));
+ curl_easy_setopt(http_session_, CURLOPT_TIMEOUT_MS, absoluteTimeout);
if (read_timeout_ms_.count() > 0) {
progress_.reset();
@@ -252,6 +255,7 @@
curl_easy_setopt(http_session_, CURLOPT_XFERINFOFUNCTION, onProgress);
curl_easy_setopt(http_session_, CURLOPT_XFERINFODATA, (void*)this);
}else{
+ // the user explicitly set it to 0
curl_easy_setopt(http_session_, CURLOPT_NOPROGRESS, 1);
}
if (headers_ != nullptr) {
@@ -286,6 +290,9 @@
}
curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type_str_);
+ if (res == CURLE_OPERATION_TIMEDOUT) {
+ logger_->log_error("HTTP operation timed out, with absolute timeout %dms\n", absoluteTimeout);
+ }
if (res != CURLE_OK) {
logger_->log_error("curl_easy_perform() failed %s on %s, error code %d\n", curl_easy_strerror(res), url_, res);
return false;
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 6a4b4a2..8cc38ab 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -261,9 +261,9 @@
std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
std::string url_;
- std::chrono::milliseconds connect_timeout_ms_{0};
+ std::chrono::milliseconds connect_timeout_ms_{30000};
// read timeout.
- std::chrono::milliseconds read_timeout_ms_{0};
+ std::chrono::milliseconds read_timeout_ms_{30000};
char *content_type_str_{nullptr};
std::string content_type_;
struct curl_slist *headers_{nullptr};
diff --git a/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
new file mode 100644
index 0000000..590529b
--- /dev/null
+++ b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "tests/TestServer.h"
+#include "HTTPHandlers.h"
+
+int main() {
+ TestController controller;
+
+ std::string port = "12324";
+ std::string rootURI = "/";
+ TimeoutingHTTPHandler handler({
+ std::chrono::milliseconds(500),
+ std::chrono::milliseconds(500),
+ std::chrono::milliseconds(500),
+ std::chrono::milliseconds(500),
+ std::chrono::milliseconds(500),
+ std::chrono::milliseconds(500),
+ std::chrono::milliseconds(500)
+ });
+
+ auto server = start_webserver(port, rootURI, &handler);
+
+ auto plan = controller.createPlan();
+
+ auto processor = plan->addProcessor("InvokeHTTP", "InvokeHTTP");
+ processor->setProperty("Read Timeout", "1 s");
+ processor->setProperty("Remote URL", "http://localhost:" + port);
+ processor->setAutoTerminatedRelationships({{"failure", "d"}});
+
+ plan->runNextProcessor();
+
+ assert(LogTestController::getInstance().contains("HTTP operation timed out, with absolute timeout 3000ms"));
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index e9f5d3d..8ba76a2 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -94,3 +94,4 @@
add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/")
add_test(NAME VerifyInvokeHTTPTest COMMAND VerifyInvokeHTTPTest "${TEST_RESOURCES}/TestInvokeHTTPPost.yml")
+add_test(NAME AbsoluteTimeoutTest COMMAND AbsoluteTimeoutTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index eda7a0e..83e70ee 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -489,30 +489,39 @@
class TimeoutingHTTPHandler : public ServerAwareHandler {
public:
- TimeoutingHTTPHandler(std::chrono::milliseconds wait_ms)
- : wait_(wait_ms) {
+ TimeoutingHTTPHandler(std::vector<std::chrono::milliseconds> wait_times)
+ : wait_times_(wait_times) {
}
bool handlePost(CivetServer *, struct mg_connection *conn) {
- std::this_thread::sleep_for(wait_);
- mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ respond(conn);
return true;
}
bool handleGet(CivetServer *, struct mg_connection *conn) {
- std::this_thread::sleep_for(wait_);
- mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ respond(conn);
return true;
}
bool handleDelete(CivetServer *, struct mg_connection *conn) {
- std::this_thread::sleep_for(wait_);
- mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ respond(conn);
return true;
}
bool handlePut(CivetServer *, struct mg_connection *conn) {
- std::this_thread::sleep_for(wait_);
- mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ respond(conn);
return true;
}
-protected:
- std::chrono::milliseconds wait_;
+ private:
+ void respond(struct mg_connection *conn) {
+ if (wait_times_.size() > 0 && wait_times_[0].count() > 0) {
+ std::this_thread::sleep_for(wait_times_[0]);
+ }
+ int chunk_count = std::max(static_cast<int>(wait_times_.size()) - 1, 0);
+ mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n", chunk_count);
+ for (int chunkIdx = 0; chunkIdx < chunk_count; ++chunkIdx) {
+ mg_printf(conn, "a");
+ if (wait_times_[chunkIdx + 1].count() > 0) {
+ std::this_thread::sleep_for(wait_times_[chunkIdx + 1]);
+ }
+ }
+ }
+ std::vector<std::chrono::milliseconds> wait_times_;
};
#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
index 91a9a77..1e9c59e 100644
--- a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -96,7 +96,7 @@
if(handler)return handler;
return def;
}
- void set(std::chrono::milliseconds timeout) {
+ void set(std::vector<std::chrono::milliseconds> timeout) {
handler = new TimeoutingHTTPHandler(timeout);
}
};
@@ -184,31 +184,31 @@
{
timeout_test_profile profile;
- profile.base_.set(timeout);
+ profile.base_.set({timeout});
run_timeout_variance(test_file_location, isSecure, url, profile);
}
{
timeout_test_profile profile;
- profile.flow_.set(timeout);
+ profile.flow_.set({timeout});
run_timeout_variance(test_file_location, isSecure, url, profile);
}
{
timeout_test_profile profile;
- profile.transaction_.set(timeout);
+ profile.transaction_.set({timeout});
run_timeout_variance(test_file_location, isSecure, url, profile);
}
{
timeout_test_profile profile;
- profile.delete_.set(timeout);
+ profile.delete_.set({timeout});
run_timeout_variance(test_file_location, isSecure, url, profile);
}
{
timeout_test_profile profile;
- profile.peer_.set(timeout);
+ profile.peer_.set({timeout});
run_timeout_variance(test_file_location, isSecure, url, profile);
}
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 7d31165..fae79b5 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -192,7 +192,7 @@
}
{
- TimeoutingHTTPHandler handler(std::chrono::milliseconds(4000));
+ TimeoutingHTTPHandler handler({std::chrono::milliseconds(4000)});
VerifyRWTimeoutInvokeHTTP harness;
run(harness, url, test_file_location, key_dir, &handler);
}
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 5fb6742..651b002 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -214,7 +214,7 @@
utils::Identifier protocol_uuid_;
- std::chrono::milliseconds idle_timeout_{};
+ std::chrono::milliseconds idle_timeout_{15000};
// rest API end point info
std::vector<struct RPG> nifi_instances_;
diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h
index 896faac..feb1ed4 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -395,7 +395,7 @@
std::string local_network_interface_;
- std::chrono::milliseconds idle_timeout_{};
+ std::chrono::milliseconds idle_timeout_{15000};
// secore comms
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 6e9b333..fdfe58d 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -254,7 +254,7 @@
utils::Identifier port_id_;
// idleTimeout
- std::chrono::milliseconds idle_timeout_{};
+ std::chrono::milliseconds idle_timeout_{15000};
// Peer Connection
std::unique_ptr<SiteToSitePeer> peer_;
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 8d2c5ce..b945396 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -167,13 +167,15 @@
}
}
{
- uint64_t idleTimeoutVal;
+ uint64_t idleTimeoutVal = 15000;
std::string idleTimeoutStr;
if (!context->getProperty(idleTimeout.getName(), idleTimeoutStr)
|| !core::Property::getTimeMSFromString(idleTimeoutStr, idleTimeoutVal)) {
logger_->log_debug("%s attribute is invalid, so default value of %s will be used", idleTimeout.getName(),
idleTimeout.getValue());
- assert(core::Property::getTimeMSFromString(idleTimeout.getValue(), idleTimeoutVal));
+ if (!core::Property::getTimeMSFromString(idleTimeout.getValue(), idleTimeoutVal)) {
+ assert(false); // Couldn't parse our default value
+ }
}
idle_timeout_ = std::chrono::milliseconds(idleTimeoutVal);
}