| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| use assert_cmd::prelude::CommandCargoExt; |
| use async_trait::async_trait; |
| use configs::ConfigProvider; |
| use derive_more::Display; |
| use futures::executor::block_on; |
| use iggy::prelude::UserStatus::Active; |
| use iggy::prelude::*; |
| use iggy_common::TransportProtocol; |
| use rand::Rng; |
| use server::configs::server::ServerConfig; |
| use std::collections::HashMap; |
| use std::fs; |
| use std::fs::{File, OpenOptions}; |
| use std::io::Write; |
| use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; |
| use std::path::{Path, PathBuf}; |
| use std::process::{Child, Command, Stdio}; |
| use std::sync::Arc; |
| use std::sync::atomic::{AtomicBool, Ordering}; |
| use std::thread::{self, JoinHandle, available_parallelism, panicking, sleep}; |
| use std::time::Duration; |
| use uuid::Uuid; |
| |
| pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH"; |
| pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE"; |
| pub const IPV6_ENV_VAR: &str = "IGGY_TCP_IPV6"; |
| pub const IGGY_ROOT_USERNAME_VAR: &str = "IGGY_ROOT_USERNAME"; |
| pub const IGGY_ROOT_PASSWORD_VAR: &str = "IGGY_ROOT_PASSWORD"; |
| const USER_PASSWORD: &str = "secret"; |
| const SLEEP_INTERVAL_MS: u64 = 20; |
| const LOCAL_DATA_PREFIX: &str = "local_data_"; |
| |
| const MAX_PORT_WAIT_DURATION_S: u64 = 60; |
| |
| #[derive(PartialEq)] |
| pub enum IpAddrKind { |
| V4, |
| V6, |
| } |
| |
| #[async_trait] |
| pub trait ClientFactory: Sync + Send { |
| async fn create_client(&self) -> ClientWrapper; |
| fn transport(&self) -> TransportProtocol; |
| fn server_addr(&self) -> String; |
| } |
| #[derive(Display, Debug)] |
| enum ServerProtocolAddr { |
| #[display("RAW_TCP:{_0}")] |
| RawTcp(SocketAddr), |
| |
| #[display("HTTP_TCP:{_0}")] |
| HttpTcp(SocketAddr), |
| |
| #[display("QUIC_UDP:{_0}")] |
| QuicUdp(SocketAddr), |
| |
| #[display("WEBSOCKET:{_0}")] |
| WebSocket(SocketAddr), |
| } |
| |
| pub struct TestServer { |
| local_data_path: String, |
| envs: HashMap<String, String>, |
| child_handle: Option<Child>, |
| server_addrs: Vec<ServerProtocolAddr>, |
| stdout_file_path: Option<PathBuf>, |
| stderr_file_path: Option<PathBuf>, |
| cleanup: bool, |
| server_executable_path: Option<String>, |
| watchdog_handle: Option<JoinHandle<()>>, |
| watchdog_stop: Arc<AtomicBool>, |
| } |
| |
| impl std::fmt::Debug for TestServer { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("TestServer") |
| .field("local_data_path", &self.local_data_path) |
| .field("server_addrs", &self.server_addrs) |
| .field("cleanup", &self.cleanup) |
| .finish_non_exhaustive() |
| } |
| } |
| |
| impl TestServer { |
| pub fn new( |
| extra_envs: Option<HashMap<String, String>>, |
| cleanup: bool, |
| server_executable_path: Option<String>, |
| ip_kind: IpAddrKind, |
| ) -> Self { |
| let mut envs = HashMap::new(); |
| if let Some(extra) = extra_envs { |
| for (key, value) in extra { |
| envs.insert(key, value); |
| } |
| } |
| |
| // Randomly select 4 CPU cores to reduce interference between parallel tests |
| let cpu_allocation = match available_parallelism() { |
| Ok(parallelism) => { |
| let available_cpus = parallelism.get(); |
| if available_cpus >= 4 { |
| let mut rng = rand::rng(); |
| let max_start = available_cpus - 4; |
| let start = rng.random_range(0..=max_start); |
| let end = start + 4; |
| format!("{}..{}", start, end) |
| } else { |
| "all".to_string() |
| } |
| } |
| Err(_) => "0..4".to_string(), |
| }; |
| |
| envs.insert( |
| "IGGY_SYSTEM_SHARDING_CPU_ALLOCATION".to_string(), |
| cpu_allocation, |
| ); |
| |
| if ip_kind == IpAddrKind::V6 { |
| envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string()); |
| } |
| |
| if !envs.contains_key(IGGY_ROOT_USERNAME_VAR) { |
| envs.insert( |
| IGGY_ROOT_USERNAME_VAR.to_string(), |
| DEFAULT_ROOT_USERNAME.to_string(), |
| ); |
| } |
| |
| if !envs.contains_key(IGGY_ROOT_PASSWORD_VAR) { |
| envs.insert( |
| IGGY_ROOT_PASSWORD_VAR.to_string(), |
| DEFAULT_ROOT_PASSWORD.to_string(), |
| ); |
| } |
| |
| // If IGGY_SYSTEM_PATH is not set, use a random path starting with "local_data_" |
| let local_data_path = if let Some(system_path) = envs.get(SYSTEM_PATH_ENV_VAR) { |
| system_path.to_string() |
| } else { |
| TestServer::get_random_path() |
| }; |
| |
| Self::create( |
| local_data_path, |
| envs, |
| cleanup, |
| server_executable_path, |
| ip_kind, |
| ) |
| } |
| |
| pub fn create( |
| local_data_path: String, |
| envs: HashMap<String, String>, |
| cleanup: bool, |
| server_executable_path: Option<String>, |
| ip_kind: IpAddrKind, |
| ) -> Self { |
| let mut server_addrs = Vec::new(); |
| |
| if let Some(tcp_addr) = envs.get("IGGY_TCP_ADDRESS") { |
| server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr.parse().unwrap())); |
| } |
| |
| if let Some(http_addr) = envs.get("IGGY_HTTP_ADDRESS") { |
| server_addrs.push(ServerProtocolAddr::HttpTcp(http_addr.parse().unwrap())); |
| } |
| |
| if let Some(quic_addr) = envs.get("IGGY_QUIC_ADDRESS") { |
| server_addrs.push(ServerProtocolAddr::QuicUdp(quic_addr.parse().unwrap())); |
| } |
| |
| if server_addrs.is_empty() { |
| server_addrs = match ip_kind { |
| IpAddrKind::V6 => Self::get_server_ipv6_addrs_with_random_port(), |
| _ => Self::get_server_ipv4_addrs_with_random_port(), |
| } |
| } |
| |
| Self { |
| local_data_path, |
| envs, |
| child_handle: None, |
| server_addrs, |
| stdout_file_path: None, |
| stderr_file_path: None, |
| cleanup, |
| server_executable_path, |
| watchdog_handle: None, |
| watchdog_stop: Arc::new(AtomicBool::new(false)), |
| } |
| } |
| |
| pub fn start(&mut self) { |
| self.set_server_addrs_from_env(); |
| self.cleanup(); |
| |
| // Remove the config file if it exists from a previous run. |
| // Without this, starting the server on existing data will not work, because |
| // port detection mechanism will use port from previous runtime. |
| let config_path = format!("{}/runtime/current_config.toml", self.local_data_path); |
| if Path::new(&config_path).exists() { |
| fs::remove_file(&config_path).ok(); |
| } |
| |
| let files_path = self.local_data_path.clone(); |
| let mut command = if let Some(server_executable_path) = &self.server_executable_path { |
| Command::new(server_executable_path) |
| } else { |
| Command::cargo_bin("iggy-server").unwrap() |
| }; |
| command.env(SYSTEM_PATH_ENV_VAR, files_path); |
| command.envs(self.envs.clone()); |
| |
| // By default, server all logs are redirected to files, |
| // and dumped to stderr when test fails. With IGGY_TEST_VERBOSE=1 |
| // logs are dumped to stdout during test execution. |
| if std::env::var(TEST_VERBOSITY_ENV_VAR).is_ok() |
| || self.envs.contains_key(TEST_VERBOSITY_ENV_VAR) |
| { |
| command.stdout(Stdio::inherit()); |
| command.stderr(Stdio::inherit()); |
| } else { |
| command.stdout(self.get_stdout_file()); |
| self.stdout_file_path = Some(fs::canonicalize(self.get_stdout_file_path()).unwrap()); |
| command.stderr(self.get_stderr_file()); |
| self.stderr_file_path = Some(fs::canonicalize(self.get_stderr_file_path()).unwrap()); |
| } |
| |
| let child = command.spawn().unwrap(); |
| let pid = child.id(); |
| self.child_handle = Some(child); |
| self.wait_until_server_has_bound(); |
| |
| let watchdog_stop = self.watchdog_stop.clone(); |
| let stdout_path = self.stdout_file_path.clone(); |
| let stderr_path = self.stderr_file_path.clone(); |
| let watchdog_handle = thread::Builder::new() |
| .name("test-server-watchdog".to_string()) |
| .spawn(move || { |
| Self::watchdog_loop(pid, watchdog_stop, stdout_path, stderr_path); |
| }) |
| .expect("Failed to spawn watchdog thread"); |
| self.watchdog_handle = Some(watchdog_handle); |
| } |
| |
| /// Watchdog loop that monitors the server process. |
| /// Panics if the server exits while the watchdog is still running (i.e., not gracefully stopped). |
| fn watchdog_loop( |
| pid: u32, |
| stop_signal: Arc<AtomicBool>, |
| stdout_path: Option<PathBuf>, |
| stderr_path: Option<PathBuf>, |
| ) { |
| const CHECK_INTERVAL: Duration = Duration::from_millis(100); |
| |
| loop { |
| if stop_signal.load(Ordering::SeqCst) { |
| return; |
| } |
| |
| // Check if process is alive AND not a zombie via /proc/{pid}/stat |
| // A zombie process still exists in the process table (kill returns 0) |
| // but has state 'Z' in /proc/pid/stat |
| if !Self::is_process_alive(pid) { |
| let stdout_content = stdout_path |
| .as_ref() |
| .and_then(|p| fs::read_to_string(p).ok()) |
| .unwrap_or_else(|| "[No stdout log]".to_string()); |
| |
| let stderr_content = stderr_path |
| .as_ref() |
| .and_then(|p| fs::read_to_string(p).ok()) |
| .unwrap_or_else(|| "[No stderr log]".to_string()); |
| |
| eprintln!( |
| "\n\n=== SERVER CRASHED ===\n\ |
| The iggy-server process (PID {}) has died unexpectedly!\n\ |
| This usually indicates a bug in the server.\n\n\ |
| === STDOUT ===\n{}\n\n\ |
| === STDERR ===\n{}\n", |
| pid, stdout_content, stderr_content |
| ); |
| std::process::abort(); |
| } |
| |
| thread::sleep(CHECK_INTERVAL); |
| } |
| } |
| |
| #[cfg(target_os = "linux")] |
| fn is_process_alive(pid: u32) -> bool { |
| // /proc is the only reliable way to distinguish zombies from live processes. |
| // A zombie still responds to kill(pid, 0) but we need to detect it as dead. |
| let stat_path = format!("/proc/{}/stat", pid); |
| match fs::read_to_string(&stat_path) { |
| Ok(content) => { |
| // State char is after the command name in parentheses: "pid (comm) S ..." |
| if let Some(state_start) = content.rfind(')') { |
| let state = content[state_start + 1..].trim().chars().next(); |
| !matches!(state, Some('Z') | Some('X')) |
| } else { |
| false |
| } |
| } |
| Err(_) => false, |
| } |
| } |
| |
| #[cfg(all(unix, not(target_os = "linux")))] |
| fn is_process_alive(pid: u32) -> bool { |
| // macOS lacks /proc, so we use signal 0 which checks process existence |
| // without actually sending anything. Zombies are less of a concern here |
| // since our child will be reaped when we call wait() in stop(). |
| unsafe { libc::kill(pid as libc::pid_t, 0) == 0 } |
| } |
| |
| pub fn stop(&mut self) { |
| self.watchdog_stop.store(true, Ordering::SeqCst); |
| if let Some(watchdog) = self.watchdog_handle.take() { |
| let _ = watchdog.join(); |
| } |
| |
| #[allow(unused_mut)] |
| if let Some(mut child_handle) = self.child_handle.take() { |
| #[cfg(unix)] |
| unsafe { |
| use libc::SIGTERM; |
| use libc::kill; |
| kill(child_handle.id() as libc::pid_t, SIGTERM); |
| } |
| |
| #[cfg(not(unix))] |
| child_handle.kill().unwrap(); |
| |
| if let Ok(output) = child_handle.wait_with_output() { |
| let stderr = String::from_utf8_lossy(&output.stderr); |
| let stdout = String::from_utf8_lossy(&output.stdout); |
| if let Some(stderr_file_path) = &self.stderr_file_path { |
| OpenOptions::new() |
| .append(true) |
| .create(true) |
| .open(stderr_file_path) |
| .unwrap() |
| .write_all(stderr.as_bytes()) |
| .unwrap(); |
| } |
| |
| if let Some(stdout_file_path) = &self.stdout_file_path { |
| OpenOptions::new() |
| .append(true) |
| .create(true) |
| .open(stdout_file_path) |
| .unwrap() |
| .write_all(stdout.as_bytes()) |
| .unwrap(); |
| } |
| } |
| } |
| self.cleanup(); |
| } |
| |
| pub fn is_started(&self) -> bool { |
| self.child_handle.is_some() |
| } |
| |
| /// Check if the server process is still running. |
| /// Returns false if the process has exited (crashed or terminated). |
| pub fn is_running(&mut self) -> bool { |
| if let Some(child) = self.child_handle.as_mut() { |
| match child.try_wait() { |
| Ok(Some(_)) => false, // Process has exited |
| Ok(None) => true, // Still running |
| Err(_) => false, // Error checking, assume dead |
| } |
| } else { |
| false |
| } |
| } |
| |
| /// Assert that the server is still running. If it has crashed, |
| /// panic with server logs included for debugging. |
| pub fn assert_running(&mut self) { |
| if let Some(Ok(Some(exit_status))) = self.child_handle.as_mut().map(|c| c.try_wait()) { |
| let (stdout_content, stderr_content) = self.collect_logs(); |
| panic!( |
| "Server process has crashed with exit status: {}\n\n\ |
| === STDOUT ===\n{}\n\n\ |
| === STDERR ===\n{}", |
| exit_status, stdout_content, stderr_content |
| ); |
| } |
| } |
| |
| /// Collect server stdout and stderr logs from files. |
| pub fn collect_logs(&self) -> (String, String) { |
| let stdout = self |
| .stdout_file_path |
| .as_ref() |
| .and_then(|path| fs::read_to_string(path).ok()) |
| .unwrap_or_else(|| "[No stdout log]".to_string()); |
| |
| let stderr = self |
| .stderr_file_path |
| .as_ref() |
| .and_then(|path| fs::read_to_string(path).ok()) |
| .unwrap_or_else(|| "[No stderr log]".to_string()); |
| |
| (stdout, stderr) |
| } |
| |
| pub fn pid(&self) -> u32 { |
| self.child_handle.as_ref().unwrap().id() |
| } |
| |
| fn cleanup(&self) { |
| if !self.cleanup { |
| return; |
| } |
| |
| if fs::metadata(&self.local_data_path).is_ok() { |
| fs::remove_dir_all(&self.local_data_path).unwrap(); |
| } |
| } |
| |
| fn get_server_ipv4_addrs_with_random_port() -> Vec<ServerProtocolAddr> { |
| let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); |
| vec![ |
| ServerProtocolAddr::QuicUdp(addr), |
| ServerProtocolAddr::RawTcp(addr), |
| ServerProtocolAddr::HttpTcp(addr), |
| ServerProtocolAddr::WebSocket(addr), |
| ] |
| } |
| |
| fn get_server_ipv6_addrs_with_random_port() -> Vec<ServerProtocolAddr> { |
| let addr = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 0); |
| vec![ |
| ServerProtocolAddr::QuicUdp(addr), |
| ServerProtocolAddr::RawTcp(addr), |
| ServerProtocolAddr::HttpTcp(addr), |
| ServerProtocolAddr::WebSocket(addr), |
| ] |
| } |
| |
| fn set_server_addrs_from_env(&mut self) { |
| for server_protocol_addr in &self.server_addrs { |
| let key = match server_protocol_addr { |
| ServerProtocolAddr::RawTcp(addr) => { |
| ("IGGY_TCP_ADDRESS".to_string(), addr.to_string()) |
| } |
| ServerProtocolAddr::HttpTcp(addr) => { |
| ("IGGY_HTTP_ADDRESS".to_string(), addr.to_string()) |
| } |
| ServerProtocolAddr::QuicUdp(addr) => { |
| ("IGGY_QUIC_ADDRESS".to_string(), addr.to_string()) |
| } |
| ServerProtocolAddr::WebSocket(addr) => { |
| ("IGGY_WEBSOCKET_ADDRESS".to_string(), addr.to_string()) |
| } |
| }; |
| |
| self.envs.entry(key.0).or_insert(key.1); |
| } |
| } |
| |
| fn wait_until_server_has_bound(&mut self) { |
| let config_path = format!("{}/runtime/current_config.toml", self.local_data_path); |
| let file_config_provider = ServerConfig::config_provider(&config_path); |
| |
| let max_attempts = (MAX_PORT_WAIT_DURATION_S * 1000) / SLEEP_INTERVAL_MS; |
| self.server_addrs.clear(); |
| |
| let config = block_on(async { |
| let mut loaded_config = None; |
| |
| for _ in 0..max_attempts { |
| if !Path::new(&config_path).exists() { |
| if let Some(exit_status) = |
| self.child_handle.as_mut().unwrap().try_wait().unwrap() |
| { |
| panic!("Server process has exited with status {exit_status}!"); |
| } |
| sleep(Duration::from_millis(SLEEP_INTERVAL_MS)); |
| continue; |
| } |
| let config: Result<ServerConfig, _> = file_config_provider.load_config().await; |
| match config { |
| Ok(config) => { |
| // Verify config contains fresh addresses, not stale defaults |
| // Default ports: TCP=8090, HTTP=3000, QUIC=8080, WebSocket=8092 |
| let tcp_port: u16 = config |
| .tcp |
| .address |
| .split(':') |
| .nth(1) |
| .and_then(|s| s.parse().ok()) |
| .unwrap_or(0); |
| let http_port: u16 = config |
| .http |
| .address |
| .split(':') |
| .nth(1) |
| .and_then(|s| s.parse().ok()) |
| .unwrap_or(0); |
| let quic_port: u16 = config |
| .quic |
| .address |
| .split(':') |
| .nth(1) |
| .and_then(|s| s.parse().ok()) |
| .unwrap_or(0); |
| let websocket_port: u16 = config |
| .websocket |
| .address |
| .split(':') |
| .nth(1) |
| .and_then(|s| s.parse().ok()) |
| .unwrap_or(0); |
| |
| if tcp_port == 8090 |
| || http_port == 3000 |
| || quic_port == 8080 |
| || websocket_port == 8092 |
| { |
| sleep(Duration::from_millis(SLEEP_INTERVAL_MS)); |
| continue; |
| } |
| |
| loaded_config = Some(config); |
| break; |
| } |
| Err(_) => sleep(Duration::from_millis(SLEEP_INTERVAL_MS)), |
| } |
| } |
| loaded_config |
| }); |
| |
| if let Some(config) = config { |
| // Only validate and add enabled protocols |
| if config.quic.enabled { |
| let quic_addr: SocketAddr = config.quic.address.parse().unwrap(); |
| if quic_addr.port() == 0 { |
| panic!("Quic address port is 0!"); |
| } |
| self.server_addrs |
| .push(ServerProtocolAddr::QuicUdp(quic_addr)); |
| } |
| |
| if config.tcp.enabled { |
| let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap(); |
| if tcp_addr.port() == 0 { |
| panic!("Tcp address port is 0!"); |
| } |
| self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr)); |
| } |
| |
| if config.http.enabled { |
| let http_addr: SocketAddr = config.http.address.parse().unwrap(); |
| if http_addr.port() == 0 { |
| panic!("Http address port is 0!"); |
| } |
| self.server_addrs |
| .push(ServerProtocolAddr::HttpTcp(http_addr)); |
| } |
| |
| if config.websocket.enabled { |
| let websocket_addr: SocketAddr = config.websocket.address.parse().unwrap(); |
| if websocket_addr.port() == 0 { |
| panic!("WebSocket address port is 0!"); |
| } |
| self.server_addrs |
| .push(ServerProtocolAddr::WebSocket(websocket_addr)); |
| } |
| } else { |
| panic!( |
| "Failed to load config from file {config_path} in {MAX_PORT_WAIT_DURATION_S} s!" |
| ); |
| } |
| } |
| |
| fn get_stdout_file_path(&self) -> PathBuf { |
| format!("{}_stdout.txt", self.local_data_path).into() |
| } |
| |
| fn get_stderr_file_path(&self) -> PathBuf { |
| format!("{}_stderr.txt", self.local_data_path).into() |
| } |
| |
| fn get_stdout_file(&self) -> File { |
| File::create(self.get_stdout_file_path()).unwrap() |
| } |
| |
| fn get_stderr_file(&self) -> File { |
| File::create(self.get_stderr_file_path()).unwrap() |
| } |
| |
| fn read_file_to_string(path: &str) -> String { |
| fs::read_to_string(path).unwrap() |
| } |
| |
| pub fn get_local_data_path(&self) -> &str { |
| &self.local_data_path |
| } |
| |
| pub fn get_random_path() -> String { |
| format!("{}{}", LOCAL_DATA_PREFIX, Uuid::now_v7().to_u128_le()) |
| } |
| |
| pub fn get_http_api_addr(&self) -> Option<String> { |
| for server_protocol_addr in &self.server_addrs { |
| if let ServerProtocolAddr::HttpTcp(a) = server_protocol_addr { |
| return Some(a.to_string()); |
| } |
| } |
| None |
| } |
| |
| pub fn get_raw_tcp_addr(&self) -> Option<String> { |
| for server_protocol_addr in &self.server_addrs { |
| if let ServerProtocolAddr::RawTcp(a) = server_protocol_addr { |
| return Some(a.to_string()); |
| } |
| } |
| None |
| } |
| |
| pub fn get_quic_udp_addr(&self) -> Option<String> { |
| for server_protocol_addr in &self.server_addrs { |
| if let ServerProtocolAddr::QuicUdp(a) = server_protocol_addr { |
| return Some(a.to_string()); |
| } |
| } |
| None |
| } |
| |
| pub fn get_server_ip_addr(&self) -> Option<String> { |
| if let Some(server_address) = self |
| .get_raw_tcp_addr() |
| .or_else(|| self.get_http_api_addr()) |
| .or_else(|| self.get_quic_udp_addr()) |
| { |
| server_address |
| .split(':') |
| .map(|s| s.to_string()) |
| .collect::<Vec<_>>() |
| .first() |
| .cloned() |
| } else { |
| None |
| } |
| } |
| |
| pub fn get_websocket_addr(&self) -> Option<String> { |
| for server_protocol_addr in &self.server_addrs { |
| if let ServerProtocolAddr::WebSocket(a) = server_protocol_addr { |
| return Some(a.to_string()); |
| } |
| } |
| None |
| } |
| } |
| |
| impl Drop for TestServer { |
| fn drop(&mut self) { |
| self.stop(); |
| if panicking() { |
| if let Some(stdout_file_path) = &self.stdout_file_path { |
| eprintln!( |
| "Iggy server stdout:\n{}", |
| Self::read_file_to_string(stdout_file_path.to_str().unwrap()) |
| ); |
| } |
| |
| if let Some(stderr_file_path) = &self.stderr_file_path { |
| eprintln!( |
| "Iggy server stderr:\n{}", |
| Self::read_file_to_string(stderr_file_path.to_str().unwrap()) |
| ); |
| } |
| } |
| if let Some(stdout_file_path) = &self.stdout_file_path { |
| fs::remove_file(stdout_file_path).unwrap(); |
| } |
| if let Some(stderr_file_path) = &self.stderr_file_path { |
| fs::remove_file(stderr_file_path).unwrap(); |
| } |
| } |
| } |
| |
| impl Default for TestServer { |
| fn default() -> Self { |
| TestServer::new(None, true, None, IpAddrKind::V4) |
| } |
| } |
| |
| pub async fn create_user(client: &IggyClient, username: &str) { |
| client |
| .create_user( |
| username, |
| USER_PASSWORD, |
| Active, |
| Some(Permissions { |
| global: GlobalPermissions { |
| manage_servers: true, |
| read_servers: true, |
| manage_users: true, |
| read_users: true, |
| manage_streams: true, |
| read_streams: true, |
| manage_topics: true, |
| read_topics: true, |
| poll_messages: true, |
| send_messages: true, |
| }, |
| streams: None, |
| }), |
| ) |
| .await |
| .unwrap(); |
| } |
| |
| pub async fn delete_user(client: &IggyClient, username: &str) { |
| client |
| .delete_user(&Identifier::named(username).unwrap()) |
| .await |
| .unwrap(); |
| } |
| |
| pub async fn login_root(client: &IggyClient) -> IdentityInfo { |
| client |
| .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) |
| .await |
| .unwrap() |
| } |
| |
| pub async fn login_user(client: &IggyClient, username: &str) -> IdentityInfo { |
| client.login_user(username, USER_PASSWORD).await.unwrap() |
| } |
| |
| pub async fn assert_clean_system(system_client: &IggyClient) { |
| let streams = system_client.get_streams().await.unwrap(); |
| assert!(streams.is_empty()); |
| |
| let users = system_client.get_users().await.unwrap(); |
| assert_eq!(users.len(), 1); |
| } |