blob: 57e79b5521d515510bddaf28d7ff6ace40f010fc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::config::Config;
use crate::error::WorkerError;
use crate::metric::{
GAUGE_APP_NUMBER, GAUGE_TOPN_APP_RESIDENT_DATA_SIZE, TOTAL_APP_NUMBER,
TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED, TOTAL_READ_DATA, TOTAL_READ_DATA_FROM_LOCALFILE,
TOTAL_READ_DATA_FROM_MEMORY, TOTAL_RECEIVED_DATA, TOTAL_REQUIRE_BUFFER_FAILED,
};
use crate::readable_size::ReadableSize;
use crate::runtime::manager::RuntimeManager;
use crate::store::hybrid::HybridStore;
use crate::store::memory::MemorySnapshot;
use crate::store::{
PartitionedDataBlock, RequireBufferResponse, ResponseData, ResponseDataIndex, Store,
StoreProvider,
};
use crate::util::current_timestamp_sec;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
use dashmap::DashMap;
use log::{debug, error, info};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use crate::proto::uniffle::RemoteStorage;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
pub static SHUFFLE_SERVER_ID: OnceLock<String> = OnceLock::new();
#[derive(Debug, Clone)]
pub enum DataDistribution {
NORMAL,
#[allow(non_camel_case_types)]
LOCAL_ORDER,
}
pub const MAX_CONCURRENCY_PER_PARTITION_TO_WRITE: i32 = 20;
#[derive(Debug, Clone)]
pub struct AppConfigOptions {
pub data_distribution: DataDistribution,
pub max_concurrency_per_partition_to_write: i32,
pub remote_storage_config_option: Option<RemoteStorageConfig>,
}
impl AppConfigOptions {
pub fn new(
data_distribution: DataDistribution,
max_concurrency_per_partition_to_write: i32,
remote_storage_config_option: Option<RemoteStorageConfig>,
) -> Self {
Self {
data_distribution,
max_concurrency_per_partition_to_write,
remote_storage_config_option,
}
}
}
impl Default for AppConfigOptions {
fn default() -> Self {
AppConfigOptions {
data_distribution: DataDistribution::LOCAL_ORDER,
max_concurrency_per_partition_to_write: 20,
remote_storage_config_option: None,
}
}
}
// =============================================================
#[derive(Clone, Debug)]
pub struct RemoteStorageConfig {
pub root: String,
pub configs: HashMap<String, String>,
}
impl From<RemoteStorage> for RemoteStorageConfig {
fn from(remote_conf: RemoteStorage) -> Self {
let root = remote_conf.path;
let mut confs = HashMap::new();
for kv in remote_conf.remote_storage_conf {
confs.insert(kv.key, kv.value);
}
Self {
root,
configs: confs,
}
}
}
// =============================================================
pub struct App {
app_id: String,
// key: shuffleId, value: partitionIds
partitions: DashMap<i32, HashSet<i32>>,
app_config_options: AppConfigOptions,
latest_heartbeat_time: AtomicU64,
store: Arc<HybridStore>,
// key: (shuffle_id, partition_id)
bitmap_of_blocks: DashMap<(i32, i32), PartitionedMeta>,
huge_partition_marked_threshold: Option<u64>,
huge_partition_memory_max_available_size: Option<u64>,
total_received_data_size: AtomicU64,
total_resident_data_size: AtomicU64,
}
#[derive(Clone)]
struct PartitionedMeta {
inner: Arc<RwLock<PartitionedMetaInner>>,
}
struct PartitionedMetaInner {
blocks_bitmap: Treemap,
total_size: u64,
}
impl PartitionedMeta {
fn new() -> Self {
PartitionedMeta {
inner: Arc::new(RwLock::new(PartitionedMetaInner {
blocks_bitmap: Treemap::default(),
total_size: 0,
})),
}
}
fn get_data_size(&self) -> Result<u64> {
let meta = self.inner.read().unwrap();
Ok(meta.total_size)
}
fn incr_data_size(&mut self, data_size: i32) -> Result<()> {
let mut meta = self.inner.write().unwrap();
meta.total_size += data_size as u64;
Ok(())
}
fn get_block_ids_bytes(&self) -> Result<Bytes> {
let meta = self.inner.read().unwrap();
let serialized_data = meta.blocks_bitmap.serialize()?;
Ok(Bytes::from(serialized_data))
}
fn report_block_ids(&mut self, ids: Vec<i64>) -> Result<()> {
let mut meta = self.inner.write().unwrap();
for id in ids {
meta.blocks_bitmap.add(id as u64);
}
Ok(())
}
}
impl App {
fn from(
app_id: String,
config_options: AppConfigOptions,
store: Arc<HybridStore>,
huge_partition_marked_threshold: Option<u64>,
huge_partition_memory_max_available_size: Option<u64>,
runtime_manager: RuntimeManager,
) -> Self {
// todo: should throw exception if register failed.
let copy_app_id = app_id.to_string();
let app_options = config_options.clone();
let cloned_store = store.clone();
let register_result = futures::executor::block_on(async move {
runtime_manager
.default_runtime
.spawn(async move {
cloned_store
.register_app(RegisterAppContext {
app_id: copy_app_id,
app_config_options: app_options,
})
.await
})
.await
});
if register_result.is_err() {
error!(
"Errors on registering app to store: {:#?}",
register_result.err()
);
}
App {
app_id,
partitions: DashMap::new(),
app_config_options: config_options,
latest_heartbeat_time: AtomicU64::new(current_timestamp_sec()),
store,
bitmap_of_blocks: DashMap::new(),
huge_partition_marked_threshold,
huge_partition_memory_max_available_size,
total_received_data_size: Default::default(),
total_resident_data_size: Default::default(),
}
}
fn get_latest_heartbeat_time(&self) -> u64 {
self.latest_heartbeat_time.load(Ordering::SeqCst)
}
pub fn heartbeat(&self) -> Result<()> {
let timestamp = current_timestamp_sec();
self.latest_heartbeat_time.swap(timestamp, Ordering::SeqCst);
Ok(())
}
pub fn register_shuffle(&self, shuffle_id: i32) -> Result<()> {
self.partitions
.entry(shuffle_id)
.or_insert_with(|| HashSet::new());
Ok(())
}
pub async fn insert(&self, ctx: WritingViewContext) -> Result<i32, WorkerError> {
let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum();
self.get_underlying_partition_bitmap(ctx.uid.clone())
.incr_data_size(len)?;
TOTAL_RECEIVED_DATA.inc_by(len as u64);
self.total_received_data_size.fetch_add(len as u64, SeqCst);
self.total_resident_data_size.fetch_add(len as u64, SeqCst);
let ctx = match self.is_huge_partition(&ctx.uid).await {
Ok(true) => WritingViewContext::new(ctx.uid.clone(), ctx.data_blocks, true),
_ => ctx,
};
self.store.insert(ctx).await?;
Ok(len)
}
pub async fn select(&self, ctx: ReadingViewContext) -> Result<ResponseData, WorkerError> {
let response = self.store.get(ctx).await;
response.map(|data| {
match &data {
ResponseData::Local(local_data) => {
let length = local_data.data.len() as u64;
TOTAL_READ_DATA_FROM_LOCALFILE.inc_by(length);
TOTAL_READ_DATA.inc_by(length);
}
ResponseData::Mem(mem_data) => {
let length = mem_data.data.len() as u64;
TOTAL_READ_DATA_FROM_MEMORY.inc_by(length);
TOTAL_READ_DATA.inc_by(length);
}
};
data
})
}
pub async fn list_index(
&self,
ctx: ReadingIndexViewContext,
) -> Result<ResponseDataIndex, WorkerError> {
self.store.get_index(ctx).await
}
async fn is_huge_partition(&self, uid: &PartitionedUId) -> Result<bool> {
let huge_partition_threshold_option = &self.huge_partition_marked_threshold;
let huge_partition_memory_used_option = &self.huge_partition_memory_max_available_size;
if huge_partition_threshold_option.is_none() || huge_partition_memory_used_option.is_none()
{
return Ok(false);
}
let huge_partition_threshold = &huge_partition_threshold_option.unwrap();
let meta = self.get_underlying_partition_bitmap(uid.clone());
let data_size = meta.get_data_size()?;
if data_size > *huge_partition_threshold {
return Ok(true);
}
return Ok(false);
}
async fn is_backpressure_for_huge_partition(&self, uid: &PartitionedUId) -> Result<bool> {
if !self.is_huge_partition(uid).await? {
return Ok(false);
}
let huge_partition_memory_used = &self.huge_partition_memory_max_available_size;
let huge_partition_memory = &huge_partition_memory_used.unwrap();
if self
.store
.get_hot_store_memory_partitioned_buffer_size(uid)
.await?
> *huge_partition_memory
{
info!(
"[{:?}] with huge partition, it has been writing speed limited",
uid
);
TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED.inc();
Ok(true)
} else {
Ok(false)
}
}
pub async fn free_allocated_memory_size(&self, size: i64) -> Result<bool> {
self.store.free_hot_store_allocated_memory_size(size).await
}
pub async fn require_buffer(
&self,
ctx: RequireBufferContext,
) -> Result<RequireBufferResponse, WorkerError> {
if self.is_backpressure_for_huge_partition(&ctx.uid).await? {
TOTAL_REQUIRE_BUFFER_FAILED.inc();
return Err(WorkerError::MEMORY_USAGE_LIMITED_BY_HUGE_PARTITION);
}
self.store.require_buffer(ctx).await.map_err(|err| {
TOTAL_REQUIRE_BUFFER_FAILED.inc();
err
})
}
pub async fn release_buffer(&self, ticket_id: i64) -> Result<i64, WorkerError> {
self.store
.release_buffer(ReleaseBufferContext::from(ticket_id))
.await
}
fn get_underlying_partition_bitmap(&self, uid: PartitionedUId) -> PartitionedMeta {
let shuffle_id = uid.shuffle_id;
let partition_id = uid.partition_id;
let partitioned_meta = self
.bitmap_of_blocks
.entry((shuffle_id, partition_id))
.or_insert_with(|| PartitionedMeta::new());
partitioned_meta.clone()
}
pub fn get_block_ids(&self, ctx: GetBlocksContext) -> Result<Bytes> {
debug!("get blocks: {:?}", ctx.clone());
let partitioned_meta = self.get_underlying_partition_bitmap(ctx.uid);
partitioned_meta.get_block_ids_bytes()
}
pub async fn report_block_ids(&self, ctx: ReportBlocksContext) -> Result<()> {
debug!("Report blocks: {:?}", ctx.clone());
let mut partitioned_meta = self.get_underlying_partition_bitmap(ctx.uid);
partitioned_meta.report_block_ids(ctx.blocks)?;
Ok(())
}
pub async fn purge(&self, app_id: String, shuffle_id: Option<i32>) -> Result<()> {
let removed_size = self
.store
.purge(PurgeDataContext::new(app_id, shuffle_id))
.await?;
self.total_resident_data_size
.fetch_sub(removed_size as u64, SeqCst);
Ok(())
}
pub fn total_received_data_size(&self) -> u64 {
self.total_received_data_size.load(SeqCst)
}
pub fn total_resident_data_size(&self) -> u64 {
self.total_resident_data_size.load(SeqCst)
}
}
#[derive(Debug, Clone)]
pub struct PurgeDataContext {
pub(crate) app_id: String,
pub(crate) shuffle_id: Option<i32>,
}
impl PurgeDataContext {
pub fn new(app_id: String, shuffle_id: Option<i32>) -> PurgeDataContext {
PurgeDataContext { app_id, shuffle_id }
}
}
impl From<&str> for PurgeDataContext {
fn from(app_id_ref: &str) -> Self {
PurgeDataContext {
app_id: app_id_ref.to_string(),
shuffle_id: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ReportBlocksContext {
pub(crate) uid: PartitionedUId,
pub(crate) blocks: Vec<i64>,
}
#[derive(Debug, Clone)]
pub struct GetBlocksContext {
pub(crate) uid: PartitionedUId,
}
#[derive(Debug, Clone)]
pub struct WritingViewContext {
pub uid: PartitionedUId,
pub data_blocks: Vec<PartitionedDataBlock>,
pub owned_by_huge_partition: bool,
}
impl WritingViewContext {
pub fn from(uid: PartitionedUId, data_blocks: Vec<PartitionedDataBlock>) -> Self {
WritingViewContext {
uid,
data_blocks,
owned_by_huge_partition: false,
}
}
pub fn new(
uid: PartitionedUId,
data_blocks: Vec<PartitionedDataBlock>,
owned_by_huge_partition: bool,
) -> Self {
WritingViewContext {
uid,
data_blocks,
owned_by_huge_partition,
}
}
}
#[derive(Debug, Clone)]
pub struct ReadingViewContext {
pub uid: PartitionedUId,
pub reading_options: ReadingOptions,
pub serialized_expected_task_ids_bitmap: Option<Treemap>,
}
pub struct ReadingIndexViewContext {
pub partition_id: PartitionedUId,
}
#[derive(Debug, Clone)]
pub struct RequireBufferContext {
pub uid: PartitionedUId,
pub size: i64,
}
#[derive(Debug, Clone)]
pub struct RegisterAppContext {
pub app_id: String,
pub app_config_options: AppConfigOptions,
}
#[derive(Debug, Clone)]
pub struct ReleaseBufferContext {
pub(crate) ticket_id: i64,
}
impl From<i64> for ReleaseBufferContext {
fn from(value: i64) -> Self {
Self { ticket_id: value }
}
}
impl RequireBufferContext {
pub fn new(uid: PartitionedUId, size: i64) -> Self {
Self { uid, size }
}
}
#[derive(Debug, Clone)]
pub enum ReadingOptions {
#[allow(non_camel_case_types)]
MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(i64, i64),
#[allow(non_camel_case_types)]
FILE_OFFSET_AND_LEN(i64, i64),
}
// ==========================================================
#[derive(Debug, Clone)]
#[allow(non_camel_case_types)]
pub enum PurgeEvent {
// app_id
HEART_BEAT_TIMEOUT(String),
// app_id + shuffle_id
APP_PARTIAL_SHUFFLES_PURGE(String, i32),
// app_id
APP_PURGE(String),
}
pub type AppManagerRef = Arc<AppManager>;
pub struct AppManager {
// key: app_id
apps: DashMap<String, Arc<App>>,
receiver: async_channel::Receiver<PurgeEvent>,
sender: async_channel::Sender<PurgeEvent>,
store: Arc<HybridStore>,
app_heartbeat_timeout_min: u32,
config: Config,
runtime_manager: RuntimeManager,
}
impl AppManager {
fn new(runtime_manager: RuntimeManager, config: Config) -> Self {
let (sender, receiver) = async_channel::unbounded();
let app_heartbeat_timeout_min = config.app_heartbeat_timeout_min.unwrap_or(10);
let store = Arc::new(StoreProvider::get(runtime_manager.clone(), config.clone()));
store.clone().start();
let manager = AppManager {
apps: DashMap::new(),
receiver,
sender,
store,
app_heartbeat_timeout_min,
config,
runtime_manager: runtime_manager.clone(),
};
manager
}
}
impl AppManager {
pub fn get_ref(runtime_manager: RuntimeManager, config: Config) -> AppManagerRef {
let app_ref = Arc::new(AppManager::new(runtime_manager.clone(), config));
let app_manager_ref_cloned = app_ref.clone();
runtime_manager.default_runtime.spawn(async move {
info!("Starting app heartbeat checker...");
loop {
// task1: find out heartbeat timeout apps
tokio::time::sleep(Duration::from_secs(120)).await;
let _current_timestamp = current_timestamp_sec();
for item in app_manager_ref_cloned.apps.iter() {
let (key, app) = item.pair();
let last_time = app.get_latest_heartbeat_time();
if current_timestamp_sec() - last_time
> (app_manager_ref_cloned.app_heartbeat_timeout_min * 60) as u64
{
if app_manager_ref_cloned
.sender
.send(PurgeEvent::HEART_BEAT_TIMEOUT(key.clone()))
.await
.is_err()
{
error!(
"Errors on sending purge event when app: {} heartbeat timeout",
key
);
}
}
}
}
});
// calculate topN app shuffle data size
let app_manager_ref = app_ref.clone();
runtime_manager.default_runtime.spawn(async move {
info!("Starting calculating topN app shuffle data size...");
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let view = app_manager_ref.apps.clone().into_read_only();
let mut apps: Vec<_> = view.values().collect();
apps.sort_by_key(|x| 0 - x.total_resident_data_size());
let top_n = 10;
let limit = if apps.len() > top_n {
top_n
} else {
apps.len()
};
for idx in 0..limit {
GAUGE_TOPN_APP_RESIDENT_DATA_SIZE
.with_label_values(&[&apps[idx].app_id])
.set(apps[idx].total_resident_data_size() as i64);
}
}
});
let app_manager_cloned = app_ref.clone();
runtime_manager.default_runtime.spawn(async move {
info!("Starting purge event handler...");
while let Ok(event) = app_manager_cloned.receiver.recv().await {
GAUGE_APP_NUMBER.dec();
let _ = match event {
PurgeEvent::HEART_BEAT_TIMEOUT(app_id) => {
info!(
"The app:[{}]'s data will be purged due to heartbeat timeout",
&app_id
);
app_manager_cloned.purge_app_data(app_id, None).await
}
PurgeEvent::APP_PURGE(app_id) => {
info!(
"The app:[{}] has been finished, its data will be purged.",
&app_id
);
app_manager_cloned.purge_app_data(app_id, None).await
}
PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id) => {
info!("The app:[{:?}] with shuffleId: [{:?}] will be purged due to unregister grpc interface", &app_id, shuffle_id);
app_manager_cloned.purge_app_data(app_id, Some(shuffle_id)).await
}
}
.map_err(|err| error!("Errors on purging data. error: {:?}", err));
}
});
app_ref
}
pub async fn store_is_healthy(&self) -> Result<bool> {
self.store.is_healthy().await
}
pub async fn store_memory_snapshot(&self) -> Result<MemorySnapshot> {
self.store.get_hot_store_memory_snapshot().await
}
pub fn store_memory_spill_event_num(&self) -> Result<u64> {
self.store.memory_spill_event_num()
}
async fn purge_app_data(&self, app_id: String, shuffle_id_option: Option<i32>) -> Result<()> {
let app = self.get_app(&app_id).ok_or(anyhow!(format!(
"App:{} don't exist when purging data, this should not happen",
&app_id
)))?;
app.purge(app_id.clone(), shuffle_id_option).await?;
if shuffle_id_option.is_none() {
self.apps.remove(&app_id);
}
Ok(())
}
pub fn get_app(&self, app_id: &str) -> Option<Arc<App>> {
self.apps.get(app_id).map(|v| v.value().clone())
}
pub fn register(
&self,
app_id: String,
shuffle_id: i32,
app_config_options: AppConfigOptions,
) -> Result<()> {
info!(
"Accepted registry. app_id: {}, shuffle_id: {}",
app_id.clone(),
shuffle_id
);
let app_ref = self.apps.entry(app_id.clone()).or_insert_with(|| {
TOTAL_APP_NUMBER.inc();
GAUGE_APP_NUMBER.inc();
let capacity =
ReadableSize::from_str(&self.config.memory_store.clone().unwrap().capacity)
.unwrap()
.as_bytes();
let huge_partition_max_available_size = Some(
(self
.config
.huge_partition_memory_max_used_percent
.unwrap_or(1.0)
* capacity as f64) as u64,
);
let threshold = match &self.config.huge_partition_marked_threshold {
Some(v) => Some(
ReadableSize::from_str(v.clone().as_str())
.unwrap()
.as_bytes(),
),
_ => None,
};
Arc::new(App::from(
app_id,
app_config_options,
self.store.clone(),
threshold,
huge_partition_max_available_size,
self.runtime_manager.clone(),
))
});
app_ref.register_shuffle(shuffle_id)
}
pub async fn unregister_shuffle(&self, app_id: String, shuffle_id: i32) -> Result<()> {
self.sender
.send(PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id))
.await?;
Ok(())
}
pub async fn unregister_app(&self, app_id: String) -> Result<()> {
self.sender.send(PurgeEvent::APP_PURGE(app_id)).await?;
Ok(())
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Default, Debug, Hash, Clone)]
pub struct PartitionedUId {
pub app_id: String,
pub shuffle_id: i32,
pub partition_id: i32,
}
impl PartitionedUId {
pub fn from(app_id: String, shuffle_id: i32, partition_id: i32) -> PartitionedUId {
PartitionedUId {
app_id,
shuffle_id,
partition_id,
}
}
pub fn get_hash(uid: &PartitionedUId) -> u64 {
let mut hasher = DefaultHasher::new();
uid.hash(&mut hasher);
let hash_value = hasher.finish();
hash_value
}
}
#[cfg(test)]
mod test {
use crate::app::{
AppManager, GetBlocksContext, PartitionedUId, ReadingOptions, ReadingViewContext,
ReportBlocksContext, WritingViewContext,
};
use crate::config::{Config, HybridStoreConfig, LocalfileStoreConfig, MemoryStoreConfig};
use crate::runtime::manager::RuntimeManager;
use crate::store::{PartitionedDataBlock, ResponseData};
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
use dashmap::DashMap;
#[test]
fn test_uid_hash() {
let uid = PartitionedUId::from("a".to_string(), 1, 1);
let hash_value = PartitionedUId::get_hash(&uid);
println!("{}", hash_value);
}
fn mock_config() -> Config {
let temp_dir = tempdir::TempDir::new("test_local_store").unwrap();
let temp_path = temp_dir.path().to_str().unwrap().to_string();
println!("init local file path: {}", temp_path);
let mut config = Config::default();
config.memory_store = Some(MemoryStoreConfig::new((1024 * 1024).to_string()));
config.localfile_store = Some(LocalfileStoreConfig::new(vec![temp_path]));
config.hybrid_store = Some(HybridStoreConfig::default());
config
}
#[test]
fn app_put_get_purge_test() {
let app_id = "app_put_get_purge_test-----id";
let runtime_manager: RuntimeManager = Default::default();
let app_manager_ref = AppManager::get_ref(runtime_manager.clone(), mock_config()).clone();
app_manager_ref
.register(app_id.clone().into(), 1, Default::default())
.unwrap();
if let Some(app) = app_manager_ref.get_app("app_id".into()) {
let writing_ctx = WritingViewContext::from(
PartitionedUId {
app_id: app_id.clone().into(),
shuffle_id: 1,
partition_id: 0,
},
vec![
PartitionedDataBlock {
block_id: 0,
length: 10,
uncompress_length: 20,
crc: 10,
data: Default::default(),
task_attempt_id: 0,
},
PartitionedDataBlock {
block_id: 1,
length: 20,
uncompress_length: 30,
crc: 0,
data: Default::default(),
task_attempt_id: 0,
},
],
);
// case1: put
let f = app.insert(writing_ctx);
if runtime_manager.wait(f).is_err() {
panic!()
}
let reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options: ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
serialized_expected_task_ids_bitmap: Default::default(),
};
// case2: get
let f = app.select(reading_ctx);
let result = runtime_manager.wait(f);
if result.is_err() {
panic!()
}
match result.unwrap() {
ResponseData::Mem(data) => {
assert_eq!(2, data.shuffle_data_block_segments.len());
}
_ => todo!(),
}
// check the data size
assert_eq!(30, app.total_received_data_size());
assert_eq!(30, app.total_resident_data_size());
// case3: purge
runtime_manager
.wait(app_manager_ref.purge_app_data(app_id.to_string(), None))
.expect("");
assert_eq!(false, app_manager_ref.get_app(app_id).is_none());
// check the data size again after the data has been removed
assert_eq!(30, app.total_received_data_size());
assert_eq!(0, app.total_resident_data_size());
}
}
#[test]
fn app_manager_test() {
let app_manager_ref = AppManager::get_ref(Default::default(), mock_config()).clone();
app_manager_ref
.register("app_id".into(), 1, Default::default())
.unwrap();
if let Some(app) = app_manager_ref.get_app("app_id".into()) {
assert_eq!("app_id", app.app_id);
}
}
#[test]
fn test_get_or_put_block_ids() {
let app_id = "test_get_or_put_block_ids-----id".to_string();
let runtime_manager: RuntimeManager = Default::default();
let app_manager_ref = AppManager::get_ref(runtime_manager.clone(), mock_config()).clone();
app_manager_ref
.register(app_id.clone().into(), 1, Default::default())
.unwrap();
let app = app_manager_ref.get_app(app_id.as_ref()).unwrap();
runtime_manager
.wait(app.report_block_ids(ReportBlocksContext {
uid: PartitionedUId {
app_id: app_id.clone(),
shuffle_id: 1,
partition_id: 0,
},
blocks: vec![123, 124],
}))
.expect("TODO: panic message");
let data = app
.get_block_ids(GetBlocksContext {
uid: PartitionedUId {
app_id,
shuffle_id: 1,
partition_id: 0,
},
})
.expect("TODO: panic message");
let deserialized = Treemap::deserialize(&data).unwrap();
assert_eq!(deserialized, Treemap::from_iter(vec![123, 124]));
}
#[test]
fn test_dashmap_values() {
let dashmap = DashMap::new();
dashmap.insert(1, 3);
dashmap.insert(2, 2);
dashmap.insert(3, 8);
let cloned = dashmap.clone().into_read_only();
let mut vals: Vec<_> = cloned.values().collect();
vals.sort_by_key(|x| -(*x));
assert_eq!(vec![&8, &3, &2], vals);
let apps = vec![0, 1, 2, 3];
println!("{:#?}", &apps[0..2]);
}
}