[Chore](be) Stop spill hash join repartition on cancel (#63456)

Problem Summary: Partitioned hash join spill recovery could continue
normal repartition progress after cancellation because some loops
stopped on `state->is_cancelled()` but then fell through to completion
handling. This could mark partially recovered or repartitioned spill
data as complete. This PR returns the cancellation status before
advancing partition state, clears recovered build data during close, and
replaces a debug-only child EOS assertion with a runtime error.
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 56e80ce..4810aca 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -36,6 +36,7 @@
 #include "exec/spill/spill_repartitioner.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_profile.h"
+#include "runtime/runtime_state.h"
 
 namespace doris {
 
@@ -215,6 +216,7 @@
         }
         _current_probe_reader.reset();
     }
+    _recovered_build_block.reset();
 
     // Clean up any remaining spill partition queue entries
     for (auto& entry : _spill_partition_queue) {
@@ -347,7 +349,7 @@
         RETURN_IF_ERROR(_current_build_reader->open());
     }
     bool eos = false;
-    while (!eos) {
+    while (!eos && !state->is_cancelled()) {
         Block block;
         RETURN_IF_ERROR(_current_build_reader->read(&block, &eos));
         COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -371,6 +373,7 @@
             return Status::OK(); // yield — buffer full, more data may remain
         }
     }
+    RETURN_IF_CANCELLED(state);
     // Build file fully consumed.
     RETURN_IF_ERROR(_current_build_reader->close());
     _current_build_reader.reset();
@@ -407,6 +410,7 @@
             return Status::OK(); // yield — enough data read
         }
     }
+    RETURN_IF_CANCELLED(state);
     // Probe file fully consumed.
     RETURN_IF_ERROR(_current_probe_reader->close());
     _current_probe_reader.reset();
@@ -414,6 +418,7 @@
     return Status::OK();
 }
 
+// NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity): existing spill repartition state machine handles build/probe phases together.
 Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
         RuntimeState* state, JoinSpillPartitionInfo& partition) {
     auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
@@ -472,6 +477,7 @@
         }
     }
     RETURN_IF_ERROR(_repartitioner.finalize());
+    RETURN_IF_CANCELLED(state);
     _recovered_build_block.reset();
     _current_build_reader.reset(); // clear any leftover reader state
     partition.build_file.reset();
@@ -495,9 +501,9 @@
         while (!done && !state->is_cancelled()) {
             RETURN_IF_ERROR(_repartitioner.repartition(state, partition.probe_file, &done));
         }
-        partition.probe_file.reset();
-
         RETURN_IF_ERROR(_repartitioner.finalize());
+        RETURN_IF_CANCELLED(state);
+        partition.probe_file.reset();
         _current_probe_reader.reset();
     }
 
@@ -696,7 +702,11 @@
     // per-partition build and probe spill streams. After this point every partition
     // (including the original "level-0" ones) is accessed uniformly via the queue.
     if (!local_state._spill_queue_initialized) {
-        DCHECK(local_state._child_eos) << "pull() with is_spilled=true called before child EOS";
+        if (UNLIKELY(!local_state._child_eos)) {
+            return Status::InternalError(
+                    "query:{}, node:{}, pull() with is_spilled=true called before child EOS",
+                    print_id(state->query_id()), node_id());
+        }
         // There maybe some blocks still in partitioned block or probe blocks. Flush them to disk.
         RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
         // Close all probe writers so that SpillFile metadata (part_count, etc.)
@@ -729,6 +739,7 @@
     return _pull_from_spill_queue(local_state, state, output_block, eos);
 }
 
+// NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity): existing spill queue pull handles setup, recovery, and probing phases.
 Status PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
         PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state, Block* output_block,
         bool* eos) const {
diff --git a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index e50ccb1..7e69aed 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -1139,6 +1139,45 @@
     ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
 }
 
+TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskCancelledBeforeEmptyEos) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), shared_state);
+
+    SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(relative_path, spill_file)
+                        .ok());
+
+    {
+        SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), writer)
+                            .ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
+
+    _helper.runtime_state->cancel(Status::Cancelled("test cancel"));
+
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+    auto status = local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                   partition_info);
+    ASSERT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    ASSERT_NE(partition_info.build_file, nullptr);
+    ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
+
+    ASSERT_TRUE(local_state->close(_helper.runtime_state.get()).ok());
+    ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition_info.build_file);
+    partition_info.build_file.reset();
+}
+
 TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskLargeData) {
     // Similar setup as above...
     auto [probe_operator, sink_operator] = _helper.create_operators();