[#1407] fix(rust): drop events and release memory when errors happened (#1509)
### What changes were proposed in this pull request?
drop events and release memory when errors happened
### Why are the changes needed?
Drop events to release memory to ensure service stable
Sub tasks for #1407
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Online tests
diff --git a/rust/experimental/server/src/error.rs b/rust/experimental/server/src/error.rs
index bb0c1a1..7a70408 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -60,6 +60,9 @@
#[error("Data should be read from hdfs in client side instead of from server side")]
NOT_READ_HDFS_DATA_FROM_SERVER,
+
+ #[error("Spill event has been retried exceed the max limit for app: {0}")]
+ SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(String),
}
impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/metric.rs b/rust/experimental/server/src/metric.rs
index 9347f38..03e53bc 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -248,8 +248,20 @@
pub static GAUGE_GRPC_REQUEST_QUEUE_SIZE: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("grpc_request_queue_size", "grpc request queue size").unwrap());
+pub static TOTAL_SPILL_EVENTS_DROPPED: Lazy<IntCounter> = Lazy::new(|| {
+ IntCounter::new(
+ "total_spill_events_dropped",
+ "total spill events dropped number",
+ )
+ .expect("")
+});
+
fn register_custom_metrics() {
REGISTRY
+ .register(Box::new(TOTAL_SPILL_EVENTS_DROPPED.clone()))
+ .expect("");
+
+ REGISTRY
.register(Box::new(GAUGE_TOPN_APP_RESIDENT_DATA_SIZE.clone()))
.expect("");
diff --git a/rust/experimental/server/src/store/hybrid.rs b/rust/experimental/server/src/store/hybrid.rs
index 7417607..76bb3e8 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -27,7 +27,7 @@
GAUGE_IN_SPILL_DATA_SIZE, GAUGE_MEMORY_SPILL_OPERATION, GAUGE_MEMORY_SPILL_TO_HDFS,
GAUGE_MEMORY_SPILL_TO_LOCALFILE, TOTAL_MEMORY_SPILL_OPERATION,
TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_MEMORY_SPILL_TO_HDFS,
- TOTAL_MEMORY_SPILL_TO_LOCALFILE,
+ TOTAL_MEMORY_SPILL_TO_LOCALFILE, TOTAL_SPILL_EVENTS_DROPPED,
};
use crate::readable_size::ReadableSize;
#[cfg(feature = "hdfs")]
@@ -41,7 +41,7 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
use prometheus::core::{Atomic, AtomicU64};
use std::any::Any;
@@ -177,10 +177,13 @@
spill_message: SpillMessage,
) -> Result<String, WorkerError> {
let mut ctx: WritingViewContext = spill_message.ctx;
- let in_flight_blocks_id: i64 = spill_message.id;
let retry_cnt = spill_message.retry_cnt;
- let uid = ctx.uid.clone();
+ if retry_cnt > 3 {
+ let app_id = ctx.uid.app_id;
+ return Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(app_id));
+ }
+
let blocks = &ctx.data_blocks;
let mut spill_size = 0i64;
for block in blocks {
@@ -254,11 +257,6 @@
}
}
- self.hot_store
- .release_in_flight_blocks_in_underlying_staging_buffer(uid, in_flight_blocks_id)
- .await?;
- self.hot_store.free_used(spill_size).await?;
-
match candidate_store.name().await {
StorageType::LOCALFILE => {
GAUGE_MEMORY_SPILL_TO_LOCALFILE.dec();
@@ -317,6 +315,17 @@
Ok(())
}
+
+ async fn release_data_in_memory(&self, data_size: u64, message: &SpillMessage) -> Result<()> {
+ let uid = &message.ctx.uid;
+ let in_flight_id = message.id;
+ self.hot_store
+ .release_in_flight_blocks_in_underlying_staging_buffer(uid.clone(), in_flight_id)
+ .await?;
+ self.hot_store.free_used(data_size as i64).await?;
+ self.hot_store.desc_to_in_flight_buffer_size(data_size);
+ Ok(())
+ }
}
#[async_trait]
@@ -360,7 +369,7 @@
TOTAL_MEMORY_SPILL_OPERATION.inc();
GAUGE_MEMORY_SPILL_OPERATION.inc();
- let store_cloned = store.clone();
+ let store_ref = store.clone();
store
.runtime_manager
.write_runtime
@@ -371,14 +380,23 @@
}
GAUGE_IN_SPILL_DATA_SIZE.add(size as i64);
- match store_cloned
+ match store_ref
.memory_spill_to_persistent_store(message.clone())
.instrument_await("memory_spill_to_persistent_store.")
.await
{
Ok(msg) => {
- store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
- debug!("{}", msg)
+ debug!("{}", msg);
+ if let Err(err) = store_ref.release_data_in_memory(size, &message).await {
+ error!("Errors on releasing memory data, that should not happen. err: {:#?}", err);
+ }
+ }
+ Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(_)) | Err(WorkerError::PARTIAL_DATA_LOST(_)) => {
+ warn!("Dropping the spill event for app: {:?}. Attention: this will make data lost!", message.ctx.uid.app_id);
+ if let Err(err) = store_ref.release_data_in_memory(size, &message).await {
+ error!("Errors on releasing memory data when dropping the spill event, that should not happen. err: {:#?}", err);
+ }
+ TOTAL_SPILL_EVENTS_DROPPED.inc();
}
Err(error) => {
TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
@@ -389,10 +407,10 @@
message.retry_cnt = message.retry_cnt + 1;
// re-push to the queue to execute
- let _ = store_cloned.memory_spill_send.send(message).await;
+ let _ = store_ref.memory_spill_send.send(message).await;
}
}
- store_cloned.memory_spill_event_num.dec_by(1);
+ store_ref.memory_spill_event_num.dec_by(1);
GAUGE_IN_SPILL_DATA_SIZE.sub(size as i64);
GAUGE_MEMORY_SPILL_OPERATION.dec();
drop(concurrency_guarder);