blob: 09f29a682a86d01994ecc057d28273980c74b82a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#[cfg(test)]
mod tests {
use anyhow::Result;
use uniffle_worker::config::{
Config, HybridStoreConfig, LocalfileStoreConfig, MemoryStoreConfig, MetricsConfig,
StorageType,
};
use uniffle_worker::proto::uniffle::shuffle_server_client::ShuffleServerClient;
use uniffle_worker::{start_uniffle_worker, write_read_for_one_time};
use std::time::Duration;
use tonic::transport::Channel;
use uniffle_worker::metric::GAUGE_MEMORY_ALLOCATED;
fn create_mocked_config(grpc_port: i32, capacity: String, local_data_path: String) -> Config {
Config {
memory_store: Some(MemoryStoreConfig::new(capacity)),
localfile_store: Some(LocalfileStoreConfig {
data_paths: vec![local_data_path],
healthy_check_min_disks: Some(0),
disk_high_watermark: None,
disk_low_watermark: None,
disk_max_concurrency: None,
}),
hybrid_store: Some(HybridStoreConfig::new(0.9, 0.5, None)),
hdfs_store: None,
store_type: Some(StorageType::MEMORY_LOCALFILE),
runtime_config: Default::default(),
metrics: Some(MetricsConfig {
push_gateway_endpoint: None,
push_interval_sec: None,
}),
grpc_port: Some(grpc_port),
coordinator_quorum: vec![],
tags: None,
log: None,
app_heartbeat_timeout_min: None,
huge_partition_marked_threshold: None,
huge_partition_memory_max_used_percent: None,
http_monitor_service_port: None,
}
}
async fn get_data_from_remote(
_client: &ShuffleServerClient<Channel>,
_app_id: &str,
_shuffle_id: i32,
_partitions: Vec<i32>,
) {
}
async fn start_embedded_worker(path: String, port: i32) {
let config = create_mocked_config(port, "1G".to_string(), path);
let _ = start_uniffle_worker(config).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn write_read_test_with_embedded_worker() -> Result<()> {
let temp_dir = tempdir::TempDir::new("test_write_read").unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
println!("created the temp file path: {}", &temp_path);
let port = 21101;
let _ = start_embedded_worker(temp_path, port).await;
let client = ShuffleServerClient::connect(format!("http://{}:{}", "0.0.0.0", port)).await?;
// after one batch write/read process, the allocated memory size should be 0
assert_eq!(0, GAUGE_MEMORY_ALLOCATED.get());
write_read_for_one_time(client).await
}
}