blob: 96e49de04a939385544a45f8e87fde981c86944d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use bytes::Bytes;
use crc32fast::Hasher;
use std::net::IpAddr;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
const WORKER_IP: &str = "WORKER_IP";
pub fn get_local_ip() -> Result<IpAddr, std::io::Error> {
let ip = std::env::var(WORKER_IP);
if ip.is_ok() {
Ok(ip.unwrap().parse().unwrap())
} else {
let socket = std::net::UdpSocket::bind("0.0.0.0:0")?;
socket.connect("8.8.8.8:80")?;
let local_addr = socket.local_addr()?;
Ok(local_addr.ip())
}
}
pub fn gen_worker_uid(grpc_port: i32) -> String {
let ip = get_local_ip().unwrap().to_string();
format!("{}-{}", ip.clone(), grpc_port)
}
const LENGTH_PER_CRC: usize = 4 * 1024;
pub fn get_crc(bytes: &Bytes) -> i64 {
let mut crc32 = Hasher::new();
let offset = 0;
let length = bytes.len();
let crc_buffer = &bytes[offset..(offset + length)];
for i in (0..length).step_by(LENGTH_PER_CRC) {
let len = std::cmp::min(LENGTH_PER_CRC, length - i);
let crc_slice = &crc_buffer[i..(i + len)];
crc32.update(crc_slice);
}
crc32.finalize() as i64
}
pub fn current_timestamp_ms() -> u128 {
let current_time = SystemTime::now();
let timestamp = current_time.duration_since(UNIX_EPOCH).unwrap().as_millis();
timestamp
}
pub fn current_timestamp_sec() -> u64 {
let current_time = SystemTime::now();
let timestamp = current_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
timestamp
}
pub struct ConcurrencyLimiter {
remaining: Mutex<u32>,
}
pub struct Ticket<'a> {
limiter: &'a ConcurrencyLimiter,
id: u32,
}
impl ConcurrencyLimiter {
fn new(size: u32) -> Self {
ConcurrencyLimiter {
remaining: Mutex::new(size),
}
}
pub fn try_acquire(&self) -> Option<Ticket<'_>> {
let mut remaining = self.remaining.lock().unwrap();
if *remaining <= 0 {
None
} else {
let id = *remaining;
*remaining -= 1;
Some(Ticket { limiter: self, id })
}
}
}
impl Drop for Ticket<'_> {
fn drop(&mut self) {
let mut lock = self.limiter.remaining.lock().unwrap();
*lock += 1;
}
}
#[cfg(test)]
mod test {
use crate::util::{current_timestamp_sec, get_crc, ConcurrencyLimiter};
use bytes::Bytes;
#[test]
fn ticket_test() {
let limiter = ConcurrencyLimiter::new(2);
{
let ticket = limiter.try_acquire().unwrap();
assert_eq!(2, ticket.id);
let ticket = limiter.try_acquire().unwrap();
assert_eq!(1, ticket.id);
assert_eq!(true, limiter.try_acquire().is_none());
}
let ticket = limiter.try_acquire();
assert_eq!(true, ticket.is_some());
}
#[test]
fn time_test() {
println!("{}", current_timestamp_sec());
}
#[test]
fn crc_test() {
let data = Bytes::from("hello world! hello china!");
let crc_value = get_crc(&data);
// This value is the same with java's implementation
assert_eq!(3871485936, crc_value);
}
#[test]
fn drop_test() {}
}