[#1407] fix(rust): drop events and release memory when errors happened (#1509)

### What changes were proposed in this pull request?

drop events and release memory when errors happened

### Why are the changes needed?

Drop events to release memory to ensure service stable
Sub tasks for #1407 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Online tests
diff --git a/rust/experimental/server/src/error.rs b/rust/experimental/server/src/error.rs
index bb0c1a1..7a70408 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -60,6 +60,9 @@
 
     #[error("Data should be read from hdfs in client side instead of from server side")]
     NOT_READ_HDFS_DATA_FROM_SERVER,
+
+    #[error("Spill event has been retried exceed the max limit for app: {0}")]
+    SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(String),
 }
 
 impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/metric.rs b/rust/experimental/server/src/metric.rs
index 9347f38..03e53bc 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -248,8 +248,20 @@
 pub static GAUGE_GRPC_REQUEST_QUEUE_SIZE: Lazy<IntGauge> =
     Lazy::new(|| IntGauge::new("grpc_request_queue_size", "grpc request queue size").unwrap());
 
+pub static TOTAL_SPILL_EVENTS_DROPPED: Lazy<IntCounter> = Lazy::new(|| {
+    IntCounter::new(
+        "total_spill_events_dropped",
+        "total spill events dropped number",
+    )
+    .expect("")
+});
+
 fn register_custom_metrics() {
     REGISTRY
+        .register(Box::new(TOTAL_SPILL_EVENTS_DROPPED.clone()))
+        .expect("");
+
+    REGISTRY
         .register(Box::new(GAUGE_TOPN_APP_RESIDENT_DATA_SIZE.clone()))
         .expect("");
 
diff --git a/rust/experimental/server/src/store/hybrid.rs b/rust/experimental/server/src/store/hybrid.rs
index 7417607..76bb3e8 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -27,7 +27,7 @@
     GAUGE_IN_SPILL_DATA_SIZE, GAUGE_MEMORY_SPILL_OPERATION, GAUGE_MEMORY_SPILL_TO_HDFS,
     GAUGE_MEMORY_SPILL_TO_LOCALFILE, TOTAL_MEMORY_SPILL_OPERATION,
     TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_MEMORY_SPILL_TO_HDFS,
-    TOTAL_MEMORY_SPILL_TO_LOCALFILE,
+    TOTAL_MEMORY_SPILL_TO_LOCALFILE, TOTAL_SPILL_EVENTS_DROPPED,
 };
 use crate::readable_size::ReadableSize;
 #[cfg(feature = "hdfs")]
@@ -41,7 +41,7 @@
 use anyhow::{anyhow, Result};
 
 use async_trait::async_trait;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
 use prometheus::core::{Atomic, AtomicU64};
 use std::any::Any;
 
@@ -177,10 +177,13 @@
         spill_message: SpillMessage,
     ) -> Result<String, WorkerError> {
         let mut ctx: WritingViewContext = spill_message.ctx;
-        let in_flight_blocks_id: i64 = spill_message.id;
         let retry_cnt = spill_message.retry_cnt;
 
-        let uid = ctx.uid.clone();
+        if retry_cnt > 3 {
+            let app_id = ctx.uid.app_id;
+            return Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(app_id));
+        }
+
         let blocks = &ctx.data_blocks;
         let mut spill_size = 0i64;
         for block in blocks {
@@ -254,11 +257,6 @@
             }
         }
 
-        self.hot_store
-            .release_in_flight_blocks_in_underlying_staging_buffer(uid, in_flight_blocks_id)
-            .await?;
-        self.hot_store.free_used(spill_size).await?;
-
         match candidate_store.name().await {
             StorageType::LOCALFILE => {
                 GAUGE_MEMORY_SPILL_TO_LOCALFILE.dec();
@@ -317,6 +315,17 @@
 
         Ok(())
     }
+
+    async fn release_data_in_memory(&self, data_size: u64, message: &SpillMessage) -> Result<()> {
+        let uid = &message.ctx.uid;
+        let in_flight_id = message.id;
+        self.hot_store
+            .release_in_flight_blocks_in_underlying_staging_buffer(uid.clone(), in_flight_id)
+            .await?;
+        self.hot_store.free_used(data_size as i64).await?;
+        self.hot_store.desc_to_in_flight_buffer_size(data_size);
+        Ok(())
+    }
 }
 
 #[async_trait]
@@ -360,7 +369,7 @@
 
                 TOTAL_MEMORY_SPILL_OPERATION.inc();
                 GAUGE_MEMORY_SPILL_OPERATION.inc();
-                let store_cloned = store.clone();
+                let store_ref = store.clone();
                 store
                     .runtime_manager
                     .write_runtime
@@ -371,14 +380,23 @@
                         }
 
                         GAUGE_IN_SPILL_DATA_SIZE.add(size as i64);
-                        match store_cloned
+                        match store_ref
                             .memory_spill_to_persistent_store(message.clone())
                             .instrument_await("memory_spill_to_persistent_store.")
                             .await
                         {
                             Ok(msg) => {
-                                store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
-                                debug!("{}", msg)
+                                debug!("{}", msg);
+                                if let Err(err) = store_ref.release_data_in_memory(size, &message).await {
+                                    error!("Errors on releasing memory data, that should not happen. err: {:#?}", err);
+                                }
+                            }
+                            Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(_)) | Err(WorkerError::PARTIAL_DATA_LOST(_)) => {
+                                warn!("Dropping the spill event for app: {:?}. Attention: this will make data lost!", message.ctx.uid.app_id);
+                                if let Err(err) = store_ref.release_data_in_memory(size, &message).await {
+                                    error!("Errors on releasing memory data when dropping the spill event, that should not happen. err: {:#?}", err);
+                                }
+                                TOTAL_SPILL_EVENTS_DROPPED.inc();
                             }
                             Err(error) => {
                                 TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
@@ -389,10 +407,10 @@
 
                                 message.retry_cnt = message.retry_cnt + 1;
                                 // re-push to the queue to execute
-                                let _ = store_cloned.memory_spill_send.send(message).await;
+                                let _ = store_ref.memory_spill_send.send(message).await;
                             }
                         }
-                        store_cloned.memory_spill_event_num.dec_by(1);
+                        store_ref.memory_spill_event_num.dec_by(1);
                         GAUGE_IN_SPILL_DATA_SIZE.sub(size as i64);
                         GAUGE_MEMORY_SPILL_OPERATION.dec();
                         drop(concurrency_guarder);