blob: 3b0fc3ee929ec733e228953479f895a91ee2c0c0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#[cfg(test)]
mod test {
use dashmap::DashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
struct Buffer {
idx: i64,
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
#[ignore]
async fn lock_test() -> anyhow::Result<()> {
// 100 requests(4000 insert in single batch)
let concurrency = 1000;
let loop_size = 4000;
let buffer_map: Arc<DashMap<String, Arc<Mutex<Buffer>>>> = Arc::new(DashMap::new());
let mut handlers = vec![];
let timer = Instant::now();
for _ in 0..concurrency {
let buffer = buffer_map.clone();
handlers.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
for k in 0..loop_size {
let buf = buffer
.entry(k.to_string())
.or_insert_with(|| Arc::new(Mutex::new(Buffer { idx: 0 })))
.clone();
let mut buf = buf.lock().await;
buf.idx += 1;
}
}));
}
for handler in handlers {
let _ = handler.await;
}
println!(
"concurrent tokio lock time cost: {} ms",
timer.elapsed().as_millis() - 10
);
let buffer_map: Arc<DashMap<String, Arc<Mutex<Buffer>>>> = Arc::new(DashMap::new());
let timer = Instant::now();
for _ in 0..concurrency {
let buffer = buffer_map.clone();
for k in 0..loop_size {
let buf = buffer
.entry(k.to_string())
.or_insert_with(|| Arc::new(Mutex::new(Buffer { idx: 0 })))
.clone();
let mut buf = buf.lock().await;
buf.idx += 1;
}
}
println!(
"sequence tokio lock time cost: {} ms",
timer.elapsed().as_millis() - 10
);
let buffer_map: Arc<DashMap<String, Arc<std::sync::Mutex<Buffer>>>> =
Arc::new(DashMap::new());
let timer = Instant::now();
for _ in 0..concurrency {
let buffer = buffer_map.clone();
for k in 0..loop_size {
let buf = buffer
.entry(k.to_string())
.or_insert_with(|| Arc::new(std::sync::Mutex::new(Buffer { idx: 0 })))
.clone();
let mut buf = buf.lock().unwrap();
buf.idx += 1;
}
}
println!(
"sequence std lock time cost: {} ms",
timer.elapsed().as_millis() - 10
);
let buffer_map: Arc<DashMap<String, Arc<std::sync::Mutex<Buffer>>>> =
Arc::new(DashMap::new());
let mut handlers = vec![];
let timer = Instant::now();
for _ in 0..concurrency {
let buffer = buffer_map.clone();
handlers.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
for k in 0..loop_size {
let buf = buffer
.entry(k.to_string())
.or_insert_with(|| Arc::new(std::sync::Mutex::new(Buffer { idx: 0 })))
.clone();
let mut buf = buf.lock().unwrap();
buf.idx += 1;
}
}));
}
for handler in handlers {
let _ = handler.await;
}
println!(
"concurrent std lock time cost: {} ms",
timer.elapsed().as_millis() - 10
);
Ok(())
}
}