feat(connectors): implement e2e tests for http config provider (#2409)
diff --git a/.gitignore b/.gitignore
index 123b58b..312bc4e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,10 +22,10 @@
foreign/cpp/build
foreign/java/**/build
foreign/java/**/out
-foreign/node/**/node_modules
foreign/node/**/dist
foreign/node/npm-debug.log*
foreign/node/.npm
+**/node_modules
/**/*.DotSettings.user
*.exe
*.exe~
diff --git a/Cargo.lock b/Cargo.lock
index 02e9ff0..99b6c7a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5118,8 +5118,10 @@
"rcgen",
"reqwest",
"rmcp",
+ "serde_json",
"serial_test",
"server",
+ "strum_macros 0.27.2",
"tempfile",
"test-case",
"testcontainers-modules",
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index dd83cbd..7891cf0 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -55,8 +55,10 @@
"transport-streamable-http-client",
"transport-streamable-http-client-reqwest",
] }
+serde_json = { workspace = true }
serial_test = { workspace = true }
server = { workspace = true }
+strum_macros = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }
testcontainers-modules = { version = "0.13.0", features = ["postgres"] }
diff --git a/core/integration/src/test_connectors_runtime.rs b/core/integration/src/test_connectors_runtime.rs
index bb9c664..995feb6 100644
--- a/core/integration/src/test_connectors_runtime.rs
+++ b/core/integration/src/test_connectors_runtime.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -18,17 +19,18 @@
use assert_cmd::prelude::CommandCargoExt;
use rand::Rng;
-use std::fs::{self, OpenOptions};
+use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::net::{Ipv4Addr, SocketAddr};
use std::path::PathBuf;
-use std::process::{Child, Command};
+use std::process::{Child, Command, Stdio};
use std::time::Duration;
use std::{collections::HashMap, net::TcpListener};
use tokio::time::sleep;
use uuid::Uuid;
pub const STATE_PATH_ENV_VAR: &str = "IGGY_CONNECTORS_STATE_PATH";
+pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
pub const CONSUMER_NAME: &str = "connectors";
const LOCAL_STATE_PREFIX: &str = "local_state_";
@@ -116,6 +118,19 @@
Command::cargo_bin("iggy-connectors").unwrap()
};
command.envs(self.envs.clone());
+
+ if std::env::var(TEST_VERBOSITY_ENV_VAR).is_ok()
+ || self.envs.contains_key(TEST_VERBOSITY_ENV_VAR)
+ {
+ command.stdout(Stdio::inherit());
+ command.stderr(Stdio::inherit());
+ } else {
+ command.stdout(self.get_stdout_file());
+ self.stdout_file_path = Some(fs::canonicalize(self.get_stdout_file_path()).unwrap());
+ command.stderr(self.get_stderr_file());
+ self.stderr_file_path = Some(fs::canonicalize(self.get_stderr_file_path()).unwrap());
+ }
+
let child = command
.spawn()
.expect("Failed to start Connectors Runtime process");
@@ -174,7 +189,23 @@
format!("{}{}", LOCAL_STATE_PREFIX, Uuid::now_v7().to_u128_le())
}
- fn get_http_api_address(&self) -> String {
+ fn get_stdout_file_path(&self) -> PathBuf {
+ format!("{}_stdout.txt", self.local_state_path).into()
+ }
+
+ fn get_stderr_file_path(&self) -> PathBuf {
+ format!("{}_stderr.txt", self.local_state_path).into()
+ }
+
+ fn get_stdout_file(&self) -> File {
+ File::create(self.get_stdout_file_path()).unwrap()
+ }
+
+ fn get_stderr_file(&self) -> File {
+ File::create(self.get_stderr_file_path()).unwrap()
+ }
+
+ pub fn get_http_api_address(&self) -> String {
format!(
"http://{}:{}",
self.server_address.ip(),
diff --git a/core/integration/tests/connectors/http_config_provider/config_direct.toml b/core/integration/tests/connectors/http_config_provider/config_direct.toml
new file mode 100644
index 0000000..48f22ed
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/config_direct.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "http"
+base_url = "http://localhost:8080"
diff --git a/core/integration/tests/connectors/http_config_provider/config_wrapped.toml b/core/integration/tests/connectors/http_config_provider/config_wrapped.toml
new file mode 100644
index 0000000..843377b
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/config_wrapped.toml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "http"
+base_url = "http://localhost:8080"
+
+[connectors.response]
+data_path = "data"
+errot_path = "error"
diff --git a/core/integration/tests/connectors/http_config_provider/direct_responses.rs b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
new file mode 100644
index 0000000..b77c676
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::connectors::http_config_provider::WireMockMode;
+use reqwest::StatusCode;
+
+#[tokio::test]
+async fn test_source_configs_list_returns_all_versions() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Direct).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sources/random/configs"))
+ .send()
+ .await
+ .expect("GET /sources/random/configs request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_array(), "Should return JSON array");
+ let configs = body.as_array().unwrap();
+ assert_eq!(configs.len(), 2, "Should have 2 config versions");
+
+ // Validate version 0 schema
+ let v0 = &configs[0];
+ assert_eq!(v0["key"].as_str().unwrap(), "random");
+ assert!(v0["enabled"].as_bool().unwrap());
+ assert_eq!(v0["version"].as_u64().unwrap(), 0);
+ assert_eq!(v0["name"].as_str().unwrap(), "Random source");
+ assert!(
+ v0["path"]
+ .as_str()
+ .unwrap()
+ .contains("libiggy_connector_random_source")
+ );
+
+ // Validate streams array (sources have StreamProducerConfig)
+ let streams = v0["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1, "Should have 1 stream config");
+
+ let stream = &streams[0];
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ assert_eq!(stream["topic"].as_str().unwrap(), "test_topic"); // String, not array!
+ assert_eq!(stream["schema"].as_str().unwrap(), "json");
+ assert!(stream["batch_length"].as_u64().is_some());
+ assert!(stream["linger_time"].as_str().is_some()); // Source-specific field
+
+ // Validate plugin configuration
+ assert_eq!(v0["plugin_config_format"].as_str().unwrap(), "json");
+ assert!(v0["plugin_config"].is_object());
+
+ // Validate version 1 exists and is different
+ let v1 = &configs[1];
+ assert_eq!(v1["version"].as_u64().unwrap(), 1);
+ assert_eq!(v1["key"].as_str().unwrap(), "random");
+}
+
+#[tokio::test]
+async fn test_source_config_by_version_returns_specific_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Direct).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sources/random/configs/1"))
+ .send()
+ .await
+ .expect("GET /sources/random/configs/1 request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's version 1
+ assert_eq!(body["version"].as_u64().unwrap(), 1);
+ assert_eq!(body["key"].as_str().unwrap(), "random");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Random source");
+
+ // Validate streams structure
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic"); // String for sources
+}
+
+#[tokio::test]
+async fn test_source_active_config_returns_current_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Direct).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sources/random/configs/active"))
+ .send()
+ .await
+ .expect("GET /sources/random/configs/active request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's the active version (version 1)
+ assert_eq!(body["version"].as_u64().unwrap(), 1);
+ assert_eq!(body["key"].as_str().unwrap(), "random");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Random source");
+
+ // Validate complete structure
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ assert_eq!(body["plugin_config_format"].as_str().unwrap(), "json");
+ assert!(body["plugin_config"].is_object());
+}
+
+#[tokio::test]
+async fn test_sink_configs_list_returns_all_versions() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Direct).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sinks/stdout/configs"))
+ .send()
+ .await
+ .expect("GET /sinks/stdout/configs request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_array(), "Should return JSON array");
+ let configs = body.as_array().unwrap();
+ assert_eq!(configs.len(), 1, "Should have 1 config version");
+
+ // Validate version 0 schema
+ let v0 = &configs[0];
+ assert_eq!(v0["key"].as_str().unwrap(), "stdout");
+ assert!(v0["enabled"].as_bool().unwrap());
+ assert_eq!(v0["version"].as_u64().unwrap(), 0);
+ assert_eq!(v0["name"].as_str().unwrap(), "Stdout sink");
+ assert!(
+ v0["path"]
+ .as_str()
+ .unwrap()
+ .contains("libiggy_connector_stdout_sink")
+ );
+
+ // Validate streams array (sinks have StreamConsumerConfig)
+ let streams = v0["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1, "Should have 1 stream config");
+
+ let stream = &streams[0];
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ // CRITICAL: Sinks have topics as ARRAY, not string!
+ let topics = stream["topics"].as_array().unwrap();
+ assert_eq!(topics.len(), 1);
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+ assert_eq!(stream["schema"].as_str().unwrap(), "json");
+ assert!(stream["batch_length"].as_u64().is_some());
+ assert!(stream["poll_interval"].as_str().is_some()); // Sink-specific
+ assert!(stream["consumer_group"].as_str().is_some()); // Sink-specific
+
+ // Validate plugin configuration (can be null)
+ assert!(v0["plugin_config_format"].is_null());
+ assert!(v0["plugin_config"].is_object());
+}
+
+#[tokio::test]
+async fn test_sink_config_by_version_returns_specific_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Direct).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sinks/stdout/configs/0"))
+ .send()
+ .await
+ .expect("GET /sinks/stdout/configs/0 request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's version 0
+ assert_eq!(body["version"].as_u64().unwrap(), 0);
+ assert_eq!(body["key"].as_str().unwrap(), "stdout");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Stdout sink");
+
+ // Validate streams structure (topics as array for sinks)
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ let topics = streams[0]["topics"].as_array().unwrap();
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+}
+
+#[tokio::test]
+async fn test_sink_active_config_returns_current_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Direct).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sinks/stdout/configs/active"))
+ .send()
+ .await
+ .expect("GET /sinks/stdout/configs/active request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's the active version (version 0, only one available)
+ assert_eq!(body["version"].as_u64().unwrap(), 0);
+ assert_eq!(body["key"].as_str().unwrap(), "stdout");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Stdout sink");
+
+ // Validate complete structure
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ assert!(body["plugin_config"].is_object());
+}
diff --git a/core/integration/tests/connectors/http_config_provider/mod.rs b/core/integration/tests/connectors/http_config_provider/mod.rs
new file mode 100644
index 0000000..60d04e3
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/mod.rs
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod direct_responses;
+mod wrapped_responses;
+
+use crate::connectors::{ConnectorsRuntime, IggySetup, setup_runtime};
+use std::collections::HashMap;
+use strum_macros::Display;
+use testcontainers_modules::testcontainers::core::WaitFor::Healthcheck;
+use testcontainers_modules::testcontainers::core::wait::HealthWaitStrategy;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, Mount};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt};
+
+async fn setup(mode: WireMockMode) -> WiremockConnectorsRuntime {
+ let iggy_setup = IggySetup::default();
+ let mut runtime = setup_runtime();
+
+ let container = create_wiremock_container(mode).await;
+ let base_url = get_base_url(&container).await;
+
+ let mut envs = HashMap::new();
+ envs.insert("IGGY_CONNECTORS_CONNECTORS_BASE_URL".to_owned(), base_url);
+
+ let config_path = match mode {
+ WireMockMode::Direct => "http_config_provider/config_direct.toml",
+ WireMockMode::Wrapped => "http_config_provider/config_wrapped.toml",
+ };
+
+ runtime.init(config_path, Some(envs), iggy_setup).await;
+ WiremockConnectorsRuntime {
+ connectors_runtime: runtime,
+ wiremock_container: container,
+ }
+}
+
+async fn get_base_url(container: &ContainerAsync<GenericImage>) -> String {
+ let host = container
+ .get_host()
+ .await
+ .expect("WireMock container should have a host");
+ let host_port = container
+ .get_host_port_ipv4(8080)
+ .await
+ .expect("WireMock container should have a port");
+ format!("http://{host}:{host_port}")
+}
+
+async fn create_wiremock_container(mode: WireMockMode) -> ContainerAsync<GenericImage> {
+ let current_dir = std::env::current_dir().unwrap();
+ GenericImage::new("wiremock/wiremock", "latest")
+ .with_exposed_port(8080.tcp())
+ .with_wait_for(Healthcheck(HealthWaitStrategy::default()))
+ .with_mount(Mount::bind_mount(
+ current_dir
+ .join(format!(
+ "tests/connectors/http_config_provider/wiremock/mappings/{mode}"
+ ))
+ .to_string_lossy()
+ .to_string(),
+ "/home/wiremock/mappings",
+ ))
+ .with_mount(Mount::bind_mount(
+ current_dir
+ .join("tests/connectors/http_config_provider/wiremock/__files")
+ .to_string_lossy()
+ .to_string(),
+ "/home/wiremock/__files",
+ ))
+ .start()
+ .await
+ .expect("WireMock container should be started")
+}
+
+#[derive(Display, Clone, Copy)]
+#[strum(serialize_all = "lowercase")]
+enum WireMockMode {
+ Direct,
+ Wrapped,
+}
+
+struct WiremockConnectorsRuntime {
+ connectors_runtime: ConnectorsRuntime,
+ #[allow(dead_code)]
+ wiremock_container: ContainerAsync<GenericImage>,
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/config-versions.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/config-versions.json
new file mode 100644
index 0000000..40acc4c
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/config-versions.json
@@ -0,0 +1,14 @@
+{
+ "sinks": {
+ "stdout": {
+ "version": 0,
+ "created_at": "2025-11-25T10:30:00Z"
+ }
+ },
+ "sources": {
+ "random": {
+ "version": 1,
+ "created_at": "2025-11-25T11:45:00Z"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/connectors-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/connectors-config.json
new file mode 100644
index 0000000..d2e3e13
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/connectors-config.json
@@ -0,0 +1,76 @@
+{
+ "sinks": {
+ "stdout": {
+ "key": "stdout",
+ "enabled": true,
+ "version": 0,
+ "name": "Stdout sink",
+ "path": "../../target/debug/libiggy_connector_stdout_sink",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "message",
+ "value": {
+ "static": "hello"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topics": ["test_topic"],
+ "schema": "json",
+ "batch_length": 100,
+ "poll_interval": "5ms",
+ "consumer_group": "stdout_sink_connector"
+ }
+ ],
+ "plugin_config_format": null,
+ "plugin_config": {
+ "print_payload": false
+ }
+ }
+ },
+ "sources": {
+ "random": {
+ "key": "random",
+ "enabled": true,
+ "version": 1,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "1000ms",
+ "max_count": 1000000,
+ "messages_range": [1, 5],
+ "payload_size": 200
+ }
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-configs.json
new file mode 100644
index 0000000..4569feb
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-configs.json
@@ -0,0 +1,74 @@
+[
+ {
+ "key": "random",
+ "enabled": true,
+ "version": 0,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "100ms",
+ "max_count": 1000,
+ "messages_range": [10, 50],
+ "payload_size": 200
+ }
+ },
+ {
+ "key": "random",
+ "enabled": true,
+ "version": 1,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "1000ms",
+ "max_count": 1000000,
+ "messages_range": [1, 5],
+ "payload_size": 200
+ }
+ }
+]
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-v0.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-v0.json
new file mode 100644
index 0000000..f248282
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-v0.json
@@ -0,0 +1,36 @@
+{
+ "key": "random",
+ "enabled": true,
+ "version": 0,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "100ms",
+ "max_count": 1000,
+ "messages_range": [10, 50],
+ "payload_size": 200
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-v1.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-v1.json
new file mode 100644
index 0000000..d329468
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/random-source-v1.json
@@ -0,0 +1,36 @@
+{
+ "key": "random",
+ "enabled": true,
+ "version": 1,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "1000ms",
+ "max_count": 1000000,
+ "messages_range": [1, 5],
+ "payload_size": 200
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/stdout-sink-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/stdout-sink-configs.json
new file mode 100644
index 0000000..f61ff7a
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/stdout-sink-configs.json
@@ -0,0 +1,36 @@
+[
+ {
+ "key": "stdout",
+ "enabled": true,
+ "version": 0,
+ "name": "Stdout sink",
+ "path": "../../target/debug/libiggy_connector_stdout_sink",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "message",
+ "value": {
+ "static": "hello"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topics": ["test_topic"],
+ "schema": "json",
+ "batch_length": 100,
+ "poll_interval": "5ms",
+ "consumer_group": "stdout_sink_connector"
+ }
+ ],
+ "plugin_config_format": null,
+ "plugin_config": {
+ "print_payload": false
+ }
+ }
+]
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/stdout-sink-v0.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/stdout-sink-v0.json
new file mode 100644
index 0000000..a3264ca
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/direct/stdout-sink-v0.json
@@ -0,0 +1,34 @@
+{
+ "key": "stdout",
+ "enabled": true,
+ "version": 0,
+ "name": "Stdout sink",
+ "path": "../../target/debug/libiggy_connector_stdout_sink",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "message",
+ "value": {
+ "static": "hello"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topics": ["test_topic"],
+ "schema": "json",
+ "batch_length": 100,
+ "poll_interval": "5ms",
+ "consumer_group": "stdout_sink_connector"
+ }
+ ],
+ "plugin_config_format": null,
+ "plugin_config": {
+ "print_payload": false
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/config-versions.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/config-versions.json
new file mode 100644
index 0000000..eeeea72
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/config-versions.json
@@ -0,0 +1,17 @@
+{
+ "data": {
+ "sinks": {
+ "stdout": {
+ "version": 0,
+ "created_at": "2025-11-25T10:30:00Z"
+ }
+ },
+ "sources": {
+ "random": {
+ "version": 1,
+ "created_at": "2025-11-25T11:45:00Z"
+ }
+ }
+ },
+ "error": null
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/connectors-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/connectors-config.json
new file mode 100644
index 0000000..689ffdb
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/connectors-config.json
@@ -0,0 +1,79 @@
+{
+ "data": {
+ "sinks": {
+ "stdout": {
+ "key": "stdout",
+ "enabled": true,
+ "version": 0,
+ "name": "Stdout sink",
+ "path": "../../target/debug/libiggy_connector_stdout_sink",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "message",
+ "value": {
+ "static": "hello"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topics": ["test_topic"],
+ "schema": "json",
+ "batch_length": 100,
+ "poll_interval": "5ms",
+ "consumer_group": "stdout_sink_connector"
+ }
+ ],
+ "plugin_config_format": null,
+ "plugin_config": {
+ "print_payload": false
+ }
+ }
+ },
+ "sources": {
+ "random": {
+ "key": "random",
+ "enabled": true,
+ "version": 1,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "1000ms",
+ "max_count": 1000000,
+ "messages_range": [1, 5],
+ "payload_size": 200
+ }
+ }
+ }
+ },
+ "error": null
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-configs.json
new file mode 100644
index 0000000..741e1df
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-configs.json
@@ -0,0 +1,77 @@
+{
+ "data": [
+ {
+ "key": "random",
+ "enabled": true,
+ "version": 0,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "100ms",
+ "max_count": 1000,
+ "messages_range": [10, 50],
+ "payload_size": 200
+ }
+ },
+ {
+ "key": "random",
+ "enabled": true,
+ "version": 1,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "1000ms",
+ "max_count": 1000000,
+ "messages_range": [1, 5],
+ "payload_size": 200
+ }
+ }
+ ],
+ "error": null
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-v0.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-v0.json
new file mode 100644
index 0000000..f05080b
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-v0.json
@@ -0,0 +1,39 @@
+{
+ "data": {
+ "key": "random",
+ "enabled": true,
+ "version": 0,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "100ms",
+ "max_count": 1000,
+ "messages_range": [10, 50],
+ "payload_size": 200
+ }
+ },
+ "error": null
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-v1.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-v1.json
new file mode 100644
index 0000000..b9e3ffa
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/random-source-v1.json
@@ -0,0 +1,39 @@
+{
+ "data": {
+ "key": "random",
+ "enabled": true,
+ "version": 1,
+ "name": "Random source",
+ "path": "../../target/debug/libiggy_connector_random_source",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "test_field",
+ "value": {
+ "static": "hello!"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topic": "test_topic",
+ "schema": "json",
+ "batch_length": 1000,
+ "linger_time": "5ms"
+ }
+ ],
+ "plugin_config_format": "json",
+ "plugin_config": {
+ "interval": "1000ms",
+ "max_count": 1000000,
+ "messages_range": [1, 5],
+ "payload_size": 200
+ }
+ },
+ "error": null
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/stdout-sink-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/stdout-sink-configs.json
new file mode 100644
index 0000000..909ebd0
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/stdout-sink-configs.json
@@ -0,0 +1,39 @@
+{
+ "data": [
+ {
+ "key": "stdout",
+ "enabled": true,
+ "version": 0,
+ "name": "Stdout sink",
+ "path": "../../target/debug/libiggy_connector_stdout_sink",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "message",
+ "value": {
+ "static": "hello"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topics": ["test_topic"],
+ "schema": "json",
+ "batch_length": 100,
+ "poll_interval": "5ms",
+ "consumer_group": "stdout_sink_connector"
+ }
+ ],
+ "plugin_config_format": null,
+ "plugin_config": {
+ "print_payload": false
+ }
+ }
+ ],
+ "error": null
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/stdout-sink-v0.json b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/stdout-sink-v0.json
new file mode 100644
index 0000000..c806484
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/__files/wrapped/stdout-sink-v0.json
@@ -0,0 +1,37 @@
+{
+ "data": {
+ "key": "stdout",
+ "enabled": true,
+ "version": 0,
+ "name": "Stdout sink",
+ "path": "../../target/debug/libiggy_connector_stdout_sink",
+ "transforms": {
+ "add_fields": {
+ "enabled": true,
+ "fields": [
+ {
+ "key": "message",
+ "value": {
+ "static": "hello"
+ }
+ }
+ ]
+ }
+ },
+ "streams": [
+ {
+ "stream": "test_stream",
+ "topics": ["test_topic"],
+ "schema": "json",
+ "batch_length": 100,
+ "poll_interval": "5ms",
+ "consumer_group": "stdout_sink_connector"
+ }
+ ],
+ "plugin_config_format": null,
+ "plugin_config": {
+ "print_payload": false
+ }
+ },
+ "error": null
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/create-sink.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/create-sink.json
new file mode 100644
index 0000000..93daa87
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/create-sink.json
@@ -0,0 +1,18 @@
+{
+ "request": {
+ "method": "POST",
+ "urlPattern": "/sinks/([^/]+)/configs",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ }
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/stdout-sink-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/create-source.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/create-source.json
new file mode 100644
index 0000000..032b367
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/create-source.json
@@ -0,0 +1,18 @@
+{
+ "request": {
+ "method": "POST",
+ "urlPattern": "/sources/([^/]+)/configs",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ }
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/random-source-v1.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/delete-sink-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/delete-sink-config.json
new file mode 100644
index 0000000..997d629
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/delete-sink-config.json
@@ -0,0 +1,12 @@
+{
+ "request": {
+ "method": "DELETE",
+ "urlPattern": "/sinks/([^/]+)/configs.*"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/delete-source-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/delete-source-config.json
new file mode 100644
index 0000000..b82392a
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/delete-source-config.json
@@ -0,0 +1,12 @@
+{
+ "request": {
+ "method": "DELETE",
+ "urlPattern": "/sources/([^/]+)/configs.*"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-configs.json
new file mode 100644
index 0000000..67ded64
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-configs.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "url": "/configs/active"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/connectors-config.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-sink-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-sink-config.json
new file mode 100644
index 0000000..f1fcac5
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-sink-config.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sinks/([^/]+)/configs/active"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/stdout-sink-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-source-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-source-config.json
new file mode 100644
index 0000000..df1d954
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-source-config.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sources/([^/]+)/configs/active"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/random-source-v1.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-versions.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-versions.json
new file mode 100644
index 0000000..0e17279
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-active-versions.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "url": "/configs/active/versions"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/config-versions.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-sink-config-by-version.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-sink-config-by-version.json
new file mode 100644
index 0000000..5c8cf52
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-sink-config-by-version.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sinks/([^/]+)/configs/([0-9]+)"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/stdout-sink-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-sink-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-sink-configs.json
new file mode 100644
index 0000000..c81ecf6
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-sink-configs.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sinks/([^/]+)/configs"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/stdout-sink-configs.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-config-v0.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-config-v0.json
new file mode 100644
index 0000000..6ca0308
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-config-v0.json
@@ -0,0 +1,14 @@
+{
+ "priority": 1,
+ "request": {
+ "method": "GET",
+ "url": "/sources/random/configs/0"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/random-source-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-config-v1.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-config-v1.json
new file mode 100644
index 0000000..eb48bd5
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-config-v1.json
@@ -0,0 +1,14 @@
+{
+ "priority": 1,
+ "request": {
+ "method": "GET",
+ "url": "/sources/random/configs/1"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/random-source-v1.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-configs.json
new file mode 100644
index 0000000..0b2265c
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/get-source-configs.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sources/([^/]+)/configs"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "direct/random-source-configs.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/set-active-sink.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/set-active-sink.json
new file mode 100644
index 0000000..e8a0de5
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/set-active-sink.json
@@ -0,0 +1,22 @@
+{
+ "request": {
+ "method": "PUT",
+ "urlPattern": "/sinks/([^/]+)/configs/active",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ },
+ "bodyPatterns": [
+ {
+ "matchesJsonPath": "$.version"
+ }
+ ]
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/set-active-source.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/set-active-source.json
new file mode 100644
index 0000000..6442d52
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/direct/set-active-source.json
@@ -0,0 +1,22 @@
+{
+ "request": {
+ "method": "PUT",
+ "urlPattern": "/sources/([^/]+)/configs/active",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ },
+ "bodyPatterns": [
+ {
+ "matchesJsonPath": "$.version"
+ }
+ ]
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/create-sink.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/create-sink.json
new file mode 100644
index 0000000..49c4600
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/create-sink.json
@@ -0,0 +1,18 @@
+{
+ "request": {
+ "method": "POST",
+ "urlPattern": "/sinks/([^/]+)/configs",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ }
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/stdout-sink-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/create-source.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/create-source.json
new file mode 100644
index 0000000..c7f1d50
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/create-source.json
@@ -0,0 +1,18 @@
+{
+ "request": {
+ "method": "POST",
+ "urlPattern": "/sources/([^/]+)/configs",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ }
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/random-source-v1.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/delete-sink-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/delete-sink-config.json
new file mode 100644
index 0000000..997d629
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/delete-sink-config.json
@@ -0,0 +1,12 @@
+{
+ "request": {
+ "method": "DELETE",
+ "urlPattern": "/sinks/([^/]+)/configs.*"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/delete-source-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/delete-source-config.json
new file mode 100644
index 0000000..b82392a
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/delete-source-config.json
@@ -0,0 +1,12 @@
+{
+ "request": {
+ "method": "DELETE",
+ "urlPattern": "/sources/([^/]+)/configs.*"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-configs.json
new file mode 100644
index 0000000..3dba50a
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-configs.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "url": "/configs/active"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/connectors-config.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-sink-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-sink-config.json
new file mode 100644
index 0000000..ae58306
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-sink-config.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sinks/([^/]+)/configs/active"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/stdout-sink-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-source-config.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-source-config.json
new file mode 100644
index 0000000..82c039a
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-source-config.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sources/([^/]+)/configs/active"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/random-source-v1.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-versions.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-versions.json
new file mode 100644
index 0000000..59a2d2d
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-active-versions.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "url": "/configs/active/versions"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/config-versions.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-sink-config-by-version.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-sink-config-by-version.json
new file mode 100644
index 0000000..f8a0c73
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-sink-config-by-version.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sinks/([^/]+)/configs/([0-9]+)"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/stdout-sink-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-sink-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-sink-configs.json
new file mode 100644
index 0000000..618cea6
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-sink-configs.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sinks/([^/]+)/configs"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/stdout-sink-configs.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-config-v0.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-config-v0.json
new file mode 100644
index 0000000..25b19e4
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-config-v0.json
@@ -0,0 +1,14 @@
+{
+ "priority": 1,
+ "request": {
+ "method": "GET",
+ "url": "/sources/random/configs/0"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/random-source-v0.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-config-v1.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-config-v1.json
new file mode 100644
index 0000000..ae16057
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-config-v1.json
@@ -0,0 +1,14 @@
+{
+ "priority": 1,
+ "request": {
+ "method": "GET",
+ "url": "/sources/random/configs/1"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/random-source-v1.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-configs.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-configs.json
new file mode 100644
index 0000000..e2dc791
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/get-source-configs.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "urlPattern": "/sources/([^/]+)/configs"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "wrapped/random-source-configs.json"
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/set-active-sink.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/set-active-sink.json
new file mode 100644
index 0000000..e8a0de5
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/set-active-sink.json
@@ -0,0 +1,22 @@
+{
+ "request": {
+ "method": "PUT",
+ "urlPattern": "/sinks/([^/]+)/configs/active",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ },
+ "bodyPatterns": [
+ {
+ "matchesJsonPath": "$.version"
+ }
+ ]
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/set-active-source.json b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/set-active-source.json
new file mode 100644
index 0000000..6442d52
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wiremock/mappings/wrapped/set-active-source.json
@@ -0,0 +1,22 @@
+{
+ "request": {
+ "method": "PUT",
+ "urlPattern": "/sources/([^/]+)/configs/active",
+ "headers": {
+ "Content-Type": {
+ "contains": "application/json"
+ }
+ },
+ "bodyPatterns": [
+ {
+ "matchesJsonPath": "$.version"
+ }
+ ]
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+}
diff --git a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
new file mode 100644
index 0000000..4ec3a93
--- /dev/null
+++ b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::connectors::http_config_provider::WireMockMode;
+use reqwest::StatusCode;
+
+#[tokio::test]
+async fn test_source_configs_list_returns_all_versions() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Wrapped).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sources/random/configs"))
+ .send()
+ .await
+ .expect("GET /sources/random/configs request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ // Note: The connector runtime API always returns direct format,
+ // even when it fetches configs from a wrapped-format provider
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_array(), "Should return JSON array");
+ let configs = body.as_array().unwrap();
+ assert_eq!(configs.len(), 2, "Should have 2 config versions");
+
+ // Validate version 0 schema
+ let v0 = &configs[0];
+ assert_eq!(v0["key"].as_str().unwrap(), "random");
+ assert!(v0["enabled"].as_bool().unwrap());
+ assert_eq!(v0["version"].as_u64().unwrap(), 0);
+ assert_eq!(v0["name"].as_str().unwrap(), "Random source");
+ assert!(
+ v0["path"]
+ .as_str()
+ .unwrap()
+ .contains("libiggy_connector_random_source")
+ );
+
+ // Validate streams array (sources have StreamProducerConfig)
+ let streams = v0["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1, "Should have 1 stream config");
+
+ let stream = &streams[0];
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ assert_eq!(stream["topic"].as_str().unwrap(), "test_topic"); // String, not array!
+ assert_eq!(stream["schema"].as_str().unwrap(), "json");
+ assert!(stream["batch_length"].as_u64().is_some());
+ assert!(stream["linger_time"].as_str().is_some()); // Source-specific field
+
+ // Validate plugin configuration
+ assert_eq!(v0["plugin_config_format"].as_str().unwrap(), "json");
+ assert!(v0["plugin_config"].is_object());
+
+ // Validate version 1 exists and is different
+ let v1 = &configs[1];
+ assert_eq!(v1["version"].as_u64().unwrap(), 1);
+ assert_eq!(v1["key"].as_str().unwrap(), "random");
+}
+
+#[tokio::test]
+async fn test_source_config_by_version_returns_specific_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Wrapped).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sources/random/configs/1"))
+ .send()
+ .await
+ .expect("GET /sources/random/configs/1 request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's version 1
+ assert_eq!(body["version"].as_u64().unwrap(), 1);
+ assert_eq!(body["key"].as_str().unwrap(), "random");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Random source");
+
+ // Validate streams structure
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic"); // String for sources
+}
+
+#[tokio::test]
+async fn test_source_active_config_returns_current_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Wrapped).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sources/random/configs/active"))
+ .send()
+ .await
+ .expect("GET /sources/random/configs/active request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's the active version (version 1)
+ assert_eq!(body["version"].as_u64().unwrap(), 1);
+ assert_eq!(body["key"].as_str().unwrap(), "random");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Random source");
+
+ // Validate complete structure
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ assert_eq!(body["plugin_config_format"].as_str().unwrap(), "json");
+ assert!(body["plugin_config"].is_object());
+}
+
+#[tokio::test]
+async fn test_sink_configs_list_returns_all_versions() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Wrapped).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sinks/stdout/configs"))
+ .send()
+ .await
+ .expect("GET /sinks/stdout/configs request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_array(), "Should return JSON array");
+ let configs = body.as_array().unwrap();
+ assert_eq!(configs.len(), 1, "Should have 1 config version");
+
+ // Validate version 0 schema
+ let v0 = &configs[0];
+ assert_eq!(v0["key"].as_str().unwrap(), "stdout");
+ assert!(v0["enabled"].as_bool().unwrap());
+ assert_eq!(v0["version"].as_u64().unwrap(), 0);
+ assert_eq!(v0["name"].as_str().unwrap(), "Stdout sink");
+ assert!(
+ v0["path"]
+ .as_str()
+ .unwrap()
+ .contains("libiggy_connector_stdout_sink")
+ );
+
+ // Validate streams array (sinks have StreamConsumerConfig)
+ let streams = v0["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1, "Should have 1 stream config");
+
+ let stream = &streams[0];
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ // CRITICAL: Sinks have topics as ARRAY, not string!
+ let topics = stream["topics"].as_array().unwrap();
+ assert_eq!(topics.len(), 1);
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+ assert_eq!(stream["schema"].as_str().unwrap(), "json");
+ assert!(stream["batch_length"].as_u64().is_some());
+ assert!(stream["poll_interval"].as_str().is_some()); // Sink-specific
+ assert!(stream["consumer_group"].as_str().is_some()); // Sink-specific
+
+ // Validate plugin configuration (can be null)
+ assert!(v0["plugin_config_format"].is_null());
+ assert!(v0["plugin_config"].is_object());
+}
+
+#[tokio::test]
+async fn test_sink_config_by_version_returns_specific_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Wrapped).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sinks/stdout/configs/0"))
+ .send()
+ .await
+ .expect("GET /sinks/stdout/configs/0 request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's version 0
+ assert_eq!(body["version"].as_u64().unwrap(), 0);
+ assert_eq!(body["key"].as_str().unwrap(), "stdout");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Stdout sink");
+
+ // Validate streams structure (topics as array for sinks)
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ let topics = streams[0]["topics"].as_array().unwrap();
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+}
+
+#[tokio::test]
+async fn test_sink_active_config_returns_current_version() {
+ let wiremock_runtime =
+ crate::connectors::http_config_provider::setup(WireMockMode::Wrapped).await;
+ let http_api_address = wiremock_runtime
+ .connectors_runtime
+ .connectors_api_address()
+ .expect("connector runtime should be available");
+ let client = reqwest::Client::new();
+
+ let response = client
+ .get(format!("{http_api_address}/sinks/stdout/configs/active"))
+ .send()
+ .await
+ .expect("GET /sinks/stdout/configs/active request should succeed");
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.unwrap();
+ assert!(body.is_object(), "Should return JSON object");
+
+ // Validate it's the active version (version 0, only one available)
+ assert_eq!(body["version"].as_u64().unwrap(), 0);
+ assert_eq!(body["key"].as_str().unwrap(), "stdout");
+ assert!(body["enabled"].as_bool().unwrap());
+ assert_eq!(body["name"].as_str().unwrap(), "Stdout sink");
+
+ // Validate complete structure
+ let streams = body["streams"].as_array().unwrap();
+ assert_eq!(streams.len(), 1);
+ assert!(body["plugin_config"].is_object());
+}
diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs
index d90b290..146de4d 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -26,6 +27,7 @@
};
use std::collections::HashMap;
+mod http_config_provider;
mod postgres;
mod random;
@@ -177,4 +179,10 @@
.expect("Failed to login as root user");
IggyClient::create(client, None, None)
}
+
+ pub fn connectors_api_address(&self) -> Option<String> {
+ self.connectors_runtime
+ .as_ref()
+ .map(|connectors_runtime| connectors_runtime.get_http_api_address())
+ }
}
diff --git a/scripts/check-backwards-compat.sh b/scripts/check-backwards-compat.sh
index dec360c..10a0f56 100755
--- a/scripts/check-backwards-compat.sh
+++ b/scripts/check-backwards-compat.sh
@@ -1,11 +1,22 @@
#!/usr/bin/env bash
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
-# with the License.
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
set -euo pipefail
diff --git a/scripts/ci/shellcheck.sh b/scripts/ci/shellcheck.sh
index e4b1e0f..2f76995 100755
--- a/scripts/ci/shellcheck.sh
+++ b/scripts/ci/shellcheck.sh
@@ -1,4 +1,5 @@
#!/usr/bin/env bash
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
set -euo pipefail
@@ -60,6 +62,7 @@
"./.git/*"
"./foreign/node/node_modules/*"
"./foreign/python/.venv/*"
+ "./web/node_modules/*"
"./.venv/*"
"./venv/*"
"./build/*"