[Improvement](profile) Provide more info for schedule time (#38290)

diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 9611e1a..025a4c7 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -288,6 +288,9 @@
                                           const PExecPlanFragmentRequest* request,
                                           PExecPlanFragmentResult* response,
                                           google::protobuf::Closure* done) {
+    timeval tv {};
+    gettimeofday(&tv, nullptr);
+    response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
         _exec_plan_fragment_in_pthread(controller, request, response, done);
     });
@@ -301,6 +304,9 @@
                                                       const PExecPlanFragmentRequest* request,
                                                       PExecPlanFragmentResult* response,
                                                       google::protobuf::Closure* done) {
+    timeval tv1 {};
+    gettimeofday(&tv1, nullptr);
+    response->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
     bool compact = request->has_compact() ? request->compact() : false;
@@ -318,12 +324,18 @@
         LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
     }
     st.to_protobuf(response->mutable_status());
+    timeval tv2 {};
+    gettimeofday(&tv2, nullptr);
+    response->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
 }
 
 void PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
                                                   const PExecPlanFragmentRequest* request,
                                                   PExecPlanFragmentResult* response,
                                                   google::protobuf::Closure* done) {
+    timeval tv {};
+    gettimeofday(&tv, nullptr);
+    response->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
     bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
         _exec_plan_fragment_in_pthread(controller, request, response, done);
     });
@@ -337,10 +349,19 @@
                                                 const PExecPlanFragmentStartRequest* request,
                                                 PExecPlanFragmentResult* result,
                                                 google::protobuf::Closure* done) {
+    timeval tv {};
+    gettimeofday(&tv, nullptr);
+    result->set_received_time(tv.tv_sec * 1000LL + tv.tv_usec / 1000);
     bool ret = _light_work_pool.try_offer([this, request, result, done]() {
+        timeval tv1 {};
+        gettimeofday(&tv1, nullptr);
+        result->set_execution_time(tv1.tv_sec * 1000LL + tv1.tv_usec / 1000);
         brpc::ClosureGuard closure_guard(done);
         auto st = _exec_env->fragment_mgr()->start_query_execution(request);
         st.to_protobuf(result->mutable_status());
+        timeval tv2 {};
+        gettimeofday(&tv2, nullptr);
+        result->set_execution_done_time(tv2.tv_sec * 1000LL + tv2.tv_usec / 1000);
     });
     if (!ret) {
         offer_failed(result, done, _light_work_pool);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 2e68617..629a952 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -20,13 +20,18 @@
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.TransactionType;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.gson.Gson;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -50,6 +55,7 @@
     public static final String IS_NEREIDS = "Is Nereids";
     public static final String TOTAL_INSTANCES_NUM = "Total Instances Num";
     public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE";
+    public static final String SCHEDULE_TIME_PER_BE = "Schedule Time Of BE";
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num";
     public static final String TRACE_ID = "Trace ID";
     public static final String WORKLOAD_GROUP = "Workload Group";
@@ -102,6 +108,10 @@
     public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition Count";
     public static final String HMS_UPDATE_PARTITION_TIME = "HMS Update Partition Time";
     public static final String HMS_UPDATE_PARTITION_CNT = "HMS Update Partition Count";
+    public static final String LATENCY_FROM_FE_TO_BE = "RPC Latency From FE To BE";
+    public static final String RPC_QUEUE_TIME = "RPC Work Queue Time";
+    public static final String RPC_WORK_TIME = "RPC Work Time";
+    public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To FE";
 
     // These info will display on FE's web ui table, every one will be displayed as
     // a column, so that should not
@@ -145,6 +155,7 @@
             SEND_FRAGMENT_PHASE2_TIME,
             FRAGMENT_COMPRESSED_SIZE,
             FRAGMENT_RPC_COUNT,
+            SCHEDULE_TIME_PER_BE,
             WAIT_FETCH_RESULT_TIME,
             FETCH_RESULT_TIME,
             WRITE_RESULT_TIME,
@@ -256,6 +267,11 @@
     private long filesystemDeleteDirCnt = 0;
     private TransactionType transactionType = TransactionType.UNKNOWN;
 
+    // BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
+    // to FE)
+    private Map<TNetworkAddress, List<Long>> rpcPhase1Latency;
+    private Map<TNetworkAddress, List<Long>> rpcPhase2Latency;
+
     public SummaryProfile() {
         summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
         executionSummaryProfile = new RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME);
@@ -344,6 +360,7 @@
                 getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(SCHEDULE_TIME,
                 getPrettyTime(queryScheduleFinishTime, queryPlanFinishTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(SCHEDULE_TIME_PER_BE, getRpcLatency());
         executionSummaryProfile.addInfoString(ASSIGN_FRAGMENT_TIME,
                 getPrettyTime(assignFragmentTime, queryPlanFinishTime, TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(FRAGMENT_SERIALIZE_TIME,
@@ -550,6 +567,14 @@
         return queryBeginTime;
     }
 
+    public void setRpcPhase1Latency(Map<TNetworkAddress, List<Long>> rpcPhase1Latency) {
+        this.rpcPhase1Latency = rpcPhase1Latency;
+    }
+
+    public void setRpcPhase2Latency(Map<TNetworkAddress, List<Long>> rpcPhase2Latency) {
+        this.rpcPhase2Latency = rpcPhase2Latency;
+    }
+
     public static class SummaryBuilder {
         private Map<String, String> map = Maps.newHashMap();
 
@@ -759,4 +784,43 @@
     public void incDeleteFileCnt() {
         this.filesystemDeleteFileCnt += 1;
     }
+
+    private String getRpcLatency() {
+        Map<String, Map<String, Map<String, String>>> jsonObject = new HashMap<>();
+        if (rpcPhase1Latency != null) {
+            Map<String, Map<String, String>> latencyForPhase1 = new HashMap<>();
+            for (TNetworkAddress key : rpcPhase1Latency.keySet()) {
+                Preconditions.checkState(rpcPhase1Latency.get(key).size() == 4, "rpc latency should have 4 elements");
+                Map<String, String> latency = new HashMap<>();
+                latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(0),
+                        TUnit.TIME_MS));
+                latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(1),
+                        TUnit.TIME_MS));
+                latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(2),
+                        TUnit.TIME_MS));
+                latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase1Latency.get(key).get(3),
+                        TUnit.TIME_MS));
+                latencyForPhase1.put(key.getHostname() + ": " + key.getPort(), latency);
+            }
+            jsonObject.put("phase1", latencyForPhase1);
+        }
+        if (rpcPhase2Latency != null) {
+            Map<String, Map<String, String>> latencyForPhase2 = new HashMap<>();
+            for (TNetworkAddress key : rpcPhase2Latency.keySet()) {
+                Preconditions.checkState(rpcPhase2Latency.get(key).size() == 4, "rpc latency should have 4 elements");
+                Map<String, String> latency = new HashMap<>();
+                latency.put(LATENCY_FROM_FE_TO_BE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(0),
+                        TUnit.TIME_MS));
+                latency.put(RPC_QUEUE_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(1),
+                        TUnit.TIME_MS));
+                latency.put(RPC_WORK_TIME, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(2),
+                        TUnit.TIME_MS));
+                latency.put(LATENCY_FROM_BE_TO_FE, RuntimeProfile.printCounter(rpcPhase2Latency.get(key).get(3),
+                        TUnit.TIME_MS));
+                latencyForPhase2.put(key.getHostname() + ": " + key.getPort(), latency);
+            }
+            jsonObject.put("phase2", latencyForPhase2);
+        }
+        return new Gson().toJson(jsonObject);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a52bffe..fdfa468 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -140,6 +140,7 @@
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.jetbrains.annotations.NotNull;
+import org.joda.time.DateTime;
 
 import java.security.SecureRandom;
 import java.time.LocalDateTime;
@@ -893,37 +894,44 @@
             updateProfileIfPresent(profile -> profile.setFragmentSerializeTime());
 
             // 4.2 send fragments rpc
-            List<Triple<PipelineExecContexts, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>
-                    futures = Lists.newArrayList();
+            List<Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
+                    Future<InternalService.PExecPlanFragmentResult>>>> futures = Lists.newArrayList();
             BackendServiceProxy proxy = BackendServiceProxy.getInstance();
             for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(ctxs.debugInfo());
                 }
-                futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy)));
+                futures.add(Pair.of(DateTime.now().getMillis(),
+                        ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy))));
             }
-            waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
+            Map<TNetworkAddress, List<Long>> rpcPhase1Latency =
+                    waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
 
             updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
             updateProfileIfPresent(profile -> profile.setFragmentSendPhase1Time());
+            updateProfileIfPresent(profile -> profile.setRpcPhase1Latency(rpcPhase1Latency));
 
             if (twoPhaseExecution) {
                 // 5. send and wait execution start rpc
                 futures.clear();
                 for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
-                    futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy)));
+                    futures.add(Pair.of(DateTime.now().getMillis(),
+                            ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy))));
                 }
-                waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
+                Map<TNetworkAddress, List<Long>> rpcPhase2Latency =
+                        waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(),
+                                "send execution start");
                 updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size()));
                 updateProfileIfPresent(profile -> profile.setFragmentSendPhase2Time());
+                updateProfileIfPresent(profile -> profile.setRpcPhase2Latency(rpcPhase2Latency));
             }
         } finally {
             unlock();
         }
     }
 
-    private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServiceProxy,
-            Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
+    private Map<TNetworkAddress, List<Long>>  waitPipelineRpc(List<Pair<Long, Triple<PipelineExecContexts,
+            BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>>> futures, long leftTimeMs,
             String operation) throws RpcException, UserException {
         if (leftTimeMs <= 0) {
             long currentTimeMillis = System.currentTimeMillis();
@@ -944,14 +952,26 @@
             throw new UserException(msg);
         }
 
+        // BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
+        // to FE)
+        Map<TNetworkAddress, List<Long>> beToPrepareLatency = new HashMap<>();
         long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms);
-        for (Triple<PipelineExecContexts, BackendServiceProxy, Future<PExecPlanFragmentResult>> triple : futures) {
+        for (Pair<Long, Triple<PipelineExecContexts, BackendServiceProxy,
+                Future<InternalService.PExecPlanFragmentResult>>> pair : futures) {
+            Triple<PipelineExecContexts, BackendServiceProxy,
+                    Future<InternalService.PExecPlanFragmentResult>> triple = pair.second;
             TStatusCode code;
             String errMsg = null;
             Exception exception = null;
 
             try {
                 PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
+                long rpcDone = DateTime.now().getMillis();
+                beToPrepareLatency.put(triple.getLeft().brpcAddr,
+                        Lists.newArrayList(result.getReceivedTime() - pair.first,
+                        result.getExecutionTime() - result.getReceivedTime(),
+                        result.getExecutionDoneTime() - result.getExecutionTime(),
+                        rpcDone - result.getExecutionDoneTime()));
                 code = TStatusCode.findByValue(result.getStatus().getStatusCode());
                 if (code == null) {
                     code = TStatusCode.INTERNAL_ERROR;
@@ -1003,6 +1023,7 @@
                 }
             }
         }
+        return beToPrepareLatency;
     }
 
     public List<String> getExportFiles() {
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 8857b65..5c12055 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -246,6 +246,12 @@
 
 message PExecPlanFragmentResult {
     required PStatus status = 1;
+    // BE receive rpc
+    optional int64 received_time = 2;
+    // Start executing on bthread
+    optional int64 execution_time = 3;
+    // Done on bthread
+    optional int64 execution_done_time = 4;
 };
 
 message PCancelPlanFragmentRequest {