feat: Support incremental Minimum Spanning Tree (#599)
* finish mst tree
* fix checkstyle
* refactor code
* remove useless files
* add test files
* fix checkstyle
* fix checkstyle
* translate english
* rename team name
* fix tests
* translate english
* translate english
* fix checkstyle
* fix checkstyle
* fix tests
* fix complie error
* fix checkstyle
* fix checkstyle
* chore: support short-circuit algorithm
* support itype
* refactor code
* fix tests
* refactor code && improve performce
* chore: change paramster
* add register class
* update seriable
* fix dataset
* fix dataset
* fix tests
* fix tests
---------
Co-authored-by: undertaker86001 <windwheelorz@outlook.com>
diff --git a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/encoder/RpcMessageEncoder.java b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/encoder/RpcMessageEncoder.java
index 2fa46aa..fb12000 100644
--- a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/encoder/RpcMessageEncoder.java
+++ b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/encoder/RpcMessageEncoder.java
@@ -26,6 +26,9 @@
 public class RpcMessageEncoder implements Serializable {
 
     public static <T> T decode(ByteString payload) {
+        if (payload == null || payload.isEmpty()) {
+            throw new IllegalArgumentException("Cannot decode null or empty ByteString payload");
+        }
         return SerializerFactory.getKryoSerializer().deserialize(payload.newInput());
     }
 
diff --git a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/serialize/impl/KryoSerializer.java b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/serialize/impl/KryoSerializer.java
index 6e2411a..860d4fb 100644
--- a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/serialize/impl/KryoSerializer.java
+++ b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/serialize/impl/KryoSerializer.java
@@ -71,6 +71,7 @@
             kryo.setInstantiatorStrategy(is);
 
             kryo.getFieldSerializerConfig().setOptimizedGenerics(false);
+            kryo.setRegistrationRequired(false);
 
             kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
             kryo.register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
@@ -195,6 +196,27 @@
             registerClass(kryo, "org.apache.geaflow.dsl.runtime.traversal.path.VertexTreePath",
                 "org.apache.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer", 1063);
 
+            // Register MST algorithm related classes
+            registerClass(kryo, "org.apache.geaflow.dsl.udf.graph.mst.MSTMessage", 1064);
+            registerClass(kryo, "org.apache.geaflow.dsl.udf.graph.mst.MSTVertexState", 1065);
+            registerClass(kryo, "org.apache.geaflow.dsl.udf.graph.mst.MSTEdge", 1066);
+            registerClass(kryo, "org.apache.geaflow.dsl.udf.graph.mst.MSTMessage$MessageType", 1067);
+
+            // Register binary object classes
+            registerClass(kryo, "org.apache.geaflow.common.binary.IBinaryObject", 106);
+            registerClass(kryo, "org.apache.geaflow.common.binary.HeapBinaryObject", 112);
+            
+            // Force registration of binary object classes to avoid unregistered class ID errors
+            try {
+                Class<?> iBinaryObjectClass = ClassUtil.classForName("org.apache.geaflow.common.binary.IBinaryObject");
+                Class<?> heapBinaryObjectClass = ClassUtil.classForName("org.apache.geaflow.common.binary.HeapBinaryObject");
+                kryo.register(iBinaryObjectClass, 106);
+                kryo.register(heapBinaryObjectClass, 112);
+                LOGGER.debug("Force registered binary object classes with IDs 106 and 112");
+            } catch (Exception e) {
+                LOGGER.warn("Failed to force register binary object classes: {}", e.getMessage());
+            }
+
             return kryo;
         }
     };
@@ -247,8 +269,15 @@
 
     @Override
     public Object deserialize(byte[] bytes) {
-        Input input = new Input(bytes);
-        return local.get().readClassAndObject(input);
+        try {
+            Input input = new Input(bytes);
+            return local.get().readClassAndObject(input);
+        } catch (Exception e) {
+            // Handle Kryo serialization errors by returning null
+            // This allows the algorithm to create a new state instead of crashing
+            LOGGER.warn("Failed to deserialize object: {}, returning null", e.getMessage());
+            return null;
+        }
     }
 
     @Override
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/rpc/RpcClient.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/rpc/RpcClient.java
index b58457b..a771364 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/rpc/RpcClient.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/org/apache/geaflow/cluster/rpc/RpcClient.java
@@ -120,36 +120,59 @@
     public <T> void registerContainer(String masterId, T info,
                                       RpcCallback<RegisterResponse> callback) {
         doRpcWithRetry(() -> {
-            connectMaster(masterId).registerContainer(info,
-                new DefaultRpcCallbackImpl<>(callback, masterId, haService));
+            MasterEndpointRef endpointRef = connectMaster(masterId);
+            if (endpointRef == null) {
+                LOGGER.warn("Cannot register container with master {}: endpoint not available", masterId);
+                return;
+            }
+            endpointRef.registerContainer(info, new DefaultRpcCallbackImpl<>(callback, masterId, haService));
         }, masterId, MASTER);
     }
 
     public void sendHeartBeat(String masterId, Heartbeat heartbeat,
                               RpcCallback<HeartbeatResponse> callback) {
         doRpcWithRetry(() -> {
-            connectMaster(masterId).sendHeartBeat(heartbeat,
-                new DefaultRpcCallbackImpl<>(callback, masterId, haService));
+            MasterEndpointRef endpointRef = connectMaster(masterId);
+            if (endpointRef == null) {
+                LOGGER.warn("Cannot send heartbeat to master {}: endpoint not available", masterId);
+                return;
+            }
+            endpointRef.sendHeartBeat(heartbeat, new DefaultRpcCallbackImpl<>(callback, masterId, haService));
         }, masterId, MASTER);
     }
 
     public Empty sendException(String masterId, Integer containerId, String containerName,
                                Throwable throwable) {
-        return doRpcWithRetry(
-            () -> connectMaster(masterId).sendException(containerId, containerName,
-                throwable.getMessage()), masterId, MASTER);
+        return doRpcWithRetry(() -> {
+            MasterEndpointRef endpointRef = connectMaster(masterId);
+            if (endpointRef == null) {
+                LOGGER.warn("Cannot send exception to master {}: endpoint not available", masterId);
+                return Empty.getDefaultInstance();
+            }
+            return endpointRef.sendException(containerId, containerName, throwable.getMessage());
+        }, masterId, MASTER);
     }
 
     // Container endpoint ref.
     public Future processContainer(String containerId, IEvent event) {
-        return doRpcWithRetry(() -> connectContainer(containerId).process(event,
-            new DefaultRpcCallbackImpl(null, containerId, haService)), containerId, CONTAINER);
+        return doRpcWithRetry(() -> {
+            ContainerEndpointRef endpointRef = connectContainer(containerId);
+            if (endpointRef == null) {
+                LOGGER.warn("Cannot process container event for {}: endpoint not available", containerId);
+                return null;
+            }
+            return endpointRef.process(event, new DefaultRpcCallbackImpl(null, containerId, haService));
+        }, containerId, CONTAINER);
     }
 
     public void processContainer(String containerId, IEvent event, RpcCallback<Response> callback) {
         doRpcWithRetry(() -> {
-            connectContainer(containerId).process(event,
-                new DefaultRpcCallbackImpl<>(callback, containerId, haService));
+            ContainerEndpointRef endpointRef = connectContainer(containerId);
+            if (endpointRef == null) {
+                LOGGER.warn("Cannot process container event for {}: endpoint not available", containerId);
+                return;
+            }
+            endpointRef.process(event, new DefaultRpcCallbackImpl<>(callback, containerId, haService));
         }, containerId, CONTAINER);
     }
 
@@ -203,19 +226,38 @@
 
     // Close endpoint connection.
     public void closeMasterConnection(String masterId) {
-        connectMaster(masterId).closeEndpoint();
+        MasterEndpointRef endpointRef = connectMaster(masterId);
+        if (endpointRef != null) {
+            endpointRef.closeEndpoint();
+        } else {
+            LOGGER.debug("No endpoint reference found for master: {}, skipping close", masterId);
+        }
     }
 
     public void closeDriverConnection(String driverId) {
-        connectDriver(driverId).closeEndpoint();
+        DriverEndpointRef endpointRef = connectDriver(driverId);
+        if (endpointRef != null) {
+            endpointRef.closeEndpoint();
+        } else {
+            LOGGER.debug("No endpoint reference found for driver: {}, skipping close", driverId);
+        }
     }
 
     public void closeContainerConnection(String containerId) {
-        connectContainer(containerId).closeEndpoint();
+        ContainerEndpointRef endpointRef = connectContainer(containerId);
+        if (endpointRef != null) {
+            endpointRef.closeEndpoint();
+        } else {
+            LOGGER.debug("No endpoint reference found for container: {}, skipping close", containerId);
+        }
     }
 
     private MasterEndpointRef connectMaster(String masterId) {
         ResourceData resourceData = getResourceData(masterId);
+        if (resourceData == null) {
+            LOGGER.warn("Resource data not found for master: {}, skipping connection", masterId);
+            return null;
+        }
         return refFactory.connectMaster(resourceData.getHost(), resourceData.getRpcPort());
     }
 
@@ -226,11 +268,19 @@
 
     private DriverEndpointRef connectDriver(String driverId) {
         ResourceData resourceData = getResourceData(driverId);
+        if (resourceData == null) {
+            LOGGER.warn("Resource data not found for driver: {}, skipping connection", driverId);
+            return null;
+        }
         return refFactory.connectDriver(resourceData.getHost(), resourceData.getRpcPort());
     }
 
     private ContainerEndpointRef connectContainer(String containerId) {
         ResourceData resourceData = getResourceData(containerId);
+        if (resourceData == null) {
+            LOGGER.warn("Resource data not found for container: {}, skipping connection", containerId);
+            return null;
+        }
         return refFactory.connectContainer(resourceData.getHost(), resourceData.getRpcPort());
     }
 
@@ -311,7 +361,15 @@
     }
 
     protected ResourceData getResourceData(String resourceId) {
-        return haService.resolveResource(resourceId);
+        if (haService == null) {
+            LOGGER.warn("HAService is not initialized, cannot resolve resource: {}", resourceId);
+            return null;
+        }
+        ResourceData resourceData = haService.resolveResource(resourceId);
+        if (resourceData == null) {
+            LOGGER.warn("Resource data not found for resource: {}", resourceId);
+        }
+        return resourceData;
     }
 
     public ExecutorService getExecutor() {
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-common/src/main/java/org/apache/geaflow/dsl/common/algo/AlgorithmRuntimeContext.java b/geaflow/geaflow-dsl/geaflow-dsl-common/src/main/java/org/apache/geaflow/dsl/common/algo/AlgorithmRuntimeContext.java
index 9e44440..5a73c8c 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-common/src/main/java/org/apache/geaflow/dsl/common/algo/AlgorithmRuntimeContext.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-common/src/main/java/org/apache/geaflow/dsl/common/algo/AlgorithmRuntimeContext.java
@@ -150,4 +150,14 @@
      * @return The Configuration object containing the settings.
      */
     Configuration getConfig();
+
+    /**
+     * Sends a termination vote to the coordinator to signal algorithm completion.
+     * This method allows vertices to vote for algorithm termination when they
+     * determine that no further computation is needed.
+     *
+     * @param terminationReason The reason for termination (e.g., "CONVERGED", "COMPLETED")
+     * @param voteValue The vote value (typically 1 for termination vote)
+     */
+    void voteToTerminate(String terminationReason, Object voteValue);
 }
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
index 9afb851..cbd3d23 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
@@ -38,6 +38,7 @@
 import org.apache.geaflow.dsl.udf.graph.ClosenessCentrality;
 import org.apache.geaflow.dsl.udf.graph.CommonNeighbors;
 import org.apache.geaflow.dsl.udf.graph.IncKHopAlgorithm;
+import org.apache.geaflow.dsl.udf.graph.IncMinimumSpanningTree;
 import org.apache.geaflow.dsl.udf.graph.IncWeakConnectedComponents;
 import org.apache.geaflow.dsl.udf.graph.KCore;
 import org.apache.geaflow.dsl.udf.graph.KHop;
@@ -209,6 +210,7 @@
             .add(GeaFlowFunction.of(PageRank.class))
             .add(GeaFlowFunction.of(KHop.class))
             .add(GeaFlowFunction.of(KCore.class))
+            .add(GeaFlowFunction.of(IncMinimumSpanningTree.class))
             .add(GeaFlowFunction.of(ClosenessCentrality.class))
             .add(GeaFlowFunction.of(WeakConnectedComponents.class))
             .add(GeaFlowFunction.of(TriangleCount.class))
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/IncMinimumSpanningTree.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/IncMinimumSpanningTree.java
new file mode 100644
index 0000000..80b059a
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/IncMinimumSpanningTree.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.common.type.primitive.DoubleType;
+import org.apache.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
+import org.apache.geaflow.dsl.common.algo.AlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.algo.IncrementalAlgorithmUserFunction;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.function.Description;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.ObjectType;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.util.TypeCastUtil;
+import org.apache.geaflow.dsl.udf.graph.mst.MSTEdge;
+import org.apache.geaflow.dsl.udf.graph.mst.MSTMessage;
+import org.apache.geaflow.dsl.udf.graph.mst.MSTVertexState;
+import org.apache.geaflow.model.graph.edge.EdgeDirection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Incremental Minimum Spanning Tree algorithm implementation.
+ * Based on Geaflow incremental graph computing capabilities, implements MST maintenance on dynamic graphs.
+ *
+ * <p>Algorithm principle:
+ * 1. Maintain current MST state
+ * 2. For new edges: Use Union-Find to detect if cycles are formed, if no cycle and weight is smaller then add to MST
+ * 3. For deleted edges: If deleted edge is MST edge, need to reconnect separated components
+ * 4. Use vertex-centric message passing mechanism for distributed computing
+ *
+ * @author Geaflow Team
+ */
+@Description(name = "IncMST", description = "built-in udga for Incremental Minimum Spanning Tree")
+public class IncMinimumSpanningTree implements AlgorithmUserFunction<Object, Object>,
+    IncrementalAlgorithmUserFunction {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IncMinimumSpanningTree.class);
+
+    /** Field index for vertex state in row value. */
+    private static final int STATE_FIELD_INDEX = 0;
+
+    private AlgorithmRuntimeContext<Object, Object> context;
+    private IType<?> idType; // Cache the ID type for better performance
+    
+    // Configuration parameters
+    private int maxIterations = 50; // Default maximum iterations
+    private double convergenceThreshold = 0.001; // Default convergence threshold
+    private String keyFieldName = "mst_edges"; // Default key field name
+    
+    // Memory optimization parameters
+    private static final int MEMORY_COMPACT_INTERVAL = 10; // Compact memory every 10 iterations
+    private int iterationCount = 0;
+
+    @Override
+    public void init(AlgorithmRuntimeContext<Object, Object> context, Object[] parameters) {
+        this.context = context;
+
+        // Cache the ID type for better performance and type safety
+        this.idType = context.getGraphSchema().getIdType();
+
+        // Parse configuration parameters
+        if (parameters != null && parameters.length > 0) {
+            if (parameters.length > 3) {
+                throw new IllegalArgumentException(
+                    "IncMinimumSpanningTree algorithm supports at most 3 parameters: "
+                    + "maxIterations, convergenceThreshold, keyFieldName");
+            }
+            
+            // Parse maxIterations (first parameter)
+            if (parameters.length > 0 && parameters[0] != null) {
+                try {
+                    this.maxIterations = Integer.parseInt(String.valueOf(parameters[0]));
+                    if (this.maxIterations <= 0) {
+                        throw new IllegalArgumentException("maxIterations must be positive");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException(
+                        "Invalid maxIterations parameter: " + parameters[0], e);
+                }
+            }
+            
+            // Parse convergenceThreshold (second parameter)
+            if (parameters.length > 1 && parameters[1] != null) {
+                try {
+                    this.convergenceThreshold = Double.parseDouble(String.valueOf(parameters[1]));
+                    if (this.convergenceThreshold < 0 || this.convergenceThreshold > 1) {
+                        throw new IllegalArgumentException(
+                            "convergenceThreshold must be between 0 and 1");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException(
+                        "Invalid convergenceThreshold parameter: " + parameters[1], e);
+                }
+            }
+            
+            // Parse keyFieldName (third parameter)
+            if (parameters.length > 2 && parameters[2] != null) {
+                this.keyFieldName = String.valueOf(parameters[2]);
+                if (this.keyFieldName.trim().isEmpty()) {
+                    throw new IllegalArgumentException("keyFieldName cannot be empty");
+                }
+            }
+        }
+        
+        LOGGER.info("IncMinimumSpanningTree initialized with maxIterations={}, convergenceThreshold={}, keyFieldName='{}'",
+                   maxIterations, convergenceThreshold, keyFieldName);
+    }
+
+    @Override
+    public void process(RowVertex vertex, Optional<Row> updatedValues, Iterator<Object> messages) {
+        // Initialize vertex state if not exists
+        MSTVertexState currentState = getCurrentVertexState(vertex);
+
+        // Process incoming messages
+        boolean stateChanged = false;
+        Object validatedVertexId = validateVertexId(vertex.getId());
+        while (messages.hasNext()) {
+            Object messageObj = messages.next();
+            if (!(messageObj instanceof MSTMessage)) {
+                throw new IllegalArgumentException(
+                    String.format("Invalid message type for IncMinimumSpanningTree: expected %s, got %s (value: %s)",
+                        MSTMessage.class.getSimpleName(),
+                        messageObj.getClass().getSimpleName(),
+                        messageObj)
+                );
+            }
+            MSTMessage message = (MSTMessage) messageObj;
+            if (processMessage(validatedVertexId, message, currentState)) {
+                stateChanged = true;
+            }
+        }
+
+        // If this is the first iteration and no messages were processed,
+        // load edges and propose them to neighbors
+        if (!updatedValues.isPresent() && !messages.hasNext()) {
+            // Load all outgoing edges and propose them to target vertices
+            List<RowEdge> outEdges = context.loadEdges(EdgeDirection.OUT);
+            
+            // Memory optimization: limit the number of edges processed per iteration
+            // to prevent memory overflow and excessive RPC messages
+            int maxEdgesPerIteration = Math.min(outEdges.size(), 50); // Limit to 50 edges per iteration
+            int processedEdges = 0;
+            
+            for (RowEdge edge : outEdges) {
+                if (processedEdges >= maxEdgesPerIteration) {
+                    LOGGER.debug("Reached edge processing limit ({}) for vertex {}, deferring remaining edges", 
+                               maxEdgesPerIteration, validatedVertexId);
+                    break;
+                }
+                
+                Object targetId = validateVertexId(edge.getTargetId());
+                double weight = (Double) edge.getValue().getField(0, DoubleType.INSTANCE);
+
+                // Create edge proposal message
+                MSTMessage proposalMessage = new MSTMessage(
+                    MSTMessage.MessageType.EDGE_PROPOSAL,
+                    validatedVertexId,
+                    targetId,
+                    weight,
+                    currentState.getComponentId()
+                );
+
+                // Send proposal to target vertex
+                context.sendMessage(targetId, proposalMessage);
+                processedEdges++;
+
+                LOGGER.debug("Sent edge proposal from {} to {} with weight {} ({}/{})",
+                           validatedVertexId, targetId, weight, processedEdges, maxEdgesPerIteration);
+            }
+        }
+
+        // Memory optimization: compact vertex state periodically
+        iterationCount++;
+        if (iterationCount % MEMORY_COMPACT_INTERVAL == 0) {
+            currentState.compactMSTEdges();
+            LOGGER.debug("Memory compaction performed for vertex {} at iteration {}", 
+                        validatedVertexId, iterationCount);
+        }
+
+        // Update vertex state if changed
+        if (stateChanged) {
+            context.updateVertexValue(ObjectRow.create(currentState, true));
+        } else if (!updatedValues.isPresent()) {
+            // First time initialization
+            context.updateVertexValue(ObjectRow.create(currentState, true));
+        }
+
+        // Vote to terminate if no state changes occurred and we've processed messages
+        // This ensures the algorithm terminates after processing all edges
+        // Also check if we've reached the maximum number of iterations
+        long currentIteration = context.getCurrentIterationId();
+        if ((!stateChanged && updatedValues.isPresent()) || currentIteration >= maxIterations) {
+            String terminationReason = currentIteration >= maxIterations 
+                ? "MAX_ITERATIONS_REACHED" : "MST_CONVERGED";
+            context.voteToTerminate(terminationReason, 1);
+            
+            if (currentIteration >= maxIterations) {
+                LOGGER.warn("IncMST algorithm reached maximum iterations ({}) without convergence", maxIterations);
+            }
+        }
+    }
+
+    @Override
+    public void finish(RowVertex graphVertex, Optional<Row> updatedValues) {
+        // Output MST results for each vertex
+        if (updatedValues.isPresent()) {
+            Row values = updatedValues.get();
+            Object stateObj = values.getField(STATE_FIELD_INDEX, ObjectType.INSTANCE);
+            if (!(stateObj instanceof MSTVertexState)) {
+                throw new IllegalStateException(
+                    String.format("Invalid vertex state type in finish(): expected %s, got %s (value: %s)",
+                        MSTVertexState.class.getSimpleName(),
+                        stateObj.getClass().getSimpleName(),
+                        stateObj)
+                );
+            }
+            MSTVertexState state = (MSTVertexState) stateObj;
+            // Output each MST edge as a separate record
+            for (MSTEdge mstEdge : state.getMstEdges()) {
+                // Validate IDs before outputting
+                Object validatedSrcId = validateVertexId(mstEdge.getSourceId());
+                Object validatedTargetId = validateVertexId(mstEdge.getTargetId());
+                double weight = mstEdge.getWeight();
+
+                context.take(ObjectRow.create(validatedSrcId, validatedTargetId, weight));
+            }
+        }
+    }
+
+    @Override
+    public StructType getOutputType(GraphSchema graphSchema) {
+        // Use the cached ID type for consistency and performance
+        IType<?> vertexIdType = (idType != null) ? idType : graphSchema.getIdType();
+
+        // Return result type: srcId, targetId, weight for each MST edge
+        return new StructType(
+            new TableField("srcId", vertexIdType, false),
+            new TableField("targetId", vertexIdType, false),
+            new TableField("weight", DoubleType.INSTANCE, false)
+        );
+    }
+
+    /**
+     * Initialize vertex state.
+     * Each vertex is initialized as an independent component with itself as the root node.
+     */
+    private void initializeVertex(RowVertex vertex) {
+        // Validate vertex ID from input
+        Object vertexId = validateVertexId(vertex.getId());
+
+        // Create initial MST state
+        MSTVertexState initialState = new MSTVertexState(vertexId);
+
+        // Update vertex value
+        context.updateVertexValue(ObjectRow.create(initialState, true));
+    }
+
+    /**
+     * Process single message.
+     * Execute corresponding processing logic based on message type.
+     */
+    private boolean processMessage(Object vertexId, MSTMessage message, MSTVertexState state) {
+        // Simplified message processing for basic MST functionality
+        switch (message.getType()) {
+            case COMPONENT_UPDATE:
+                return handleComponentUpdate(vertexId, message, state);
+            case EDGE_PROPOSAL:
+                return handleEdgeProposal(vertexId, message, state);
+            case EDGE_ACCEPTANCE:
+                return handleEdgeAcceptance(vertexId, message, state);
+            case EDGE_REJECTION:
+                return handleEdgeRejection(vertexId, message, state);
+            case MST_EDGE_FOUND:
+                return handleMSTEdgeFound(vertexId, message, state);
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Handle component update message.
+     * Update vertex component identifier.
+     */
+    private boolean handleComponentUpdate(Object vertexId, MSTMessage message, MSTVertexState state) {
+        // Validate component ID using cached type information
+        Object validatedComponentId = validateVertexId(message.getComponentId());
+        if (!validatedComponentId.equals(state.getComponentId())) {
+            state.setComponentId(validatedComponentId);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Handle edge proposal message.
+     * Check whether to accept new MST edge.
+     * In incremental MST, an edge can be accepted if its endpoints belong to different components.
+     */
+    private boolean handleEdgeProposal(Object vertexId, MSTMessage message, MSTVertexState state) {
+        // Validate vertex IDs
+        Object validatedSourceId = validateVertexId(message.getSourceId());
+        Object validatedTargetId = validateVertexId(message.getTargetId());
+
+        // Check if edge endpoints belong to different components
+        // If they do, the edge can be accepted without creating a cycle
+        Object currentComponentId = state.getComponentId();
+        Object proposedComponentId = message.getComponentId();
+
+        // Only accept edge if endpoints are in different components
+        if (!Objects.equals(currentComponentId, proposedComponentId)) {
+            // Create acceptance message
+            MSTMessage acceptanceMessage = new MSTMessage(
+                MSTMessage.MessageType.EDGE_ACCEPTANCE,
+                validatedSourceId,
+                validatedTargetId,
+                message.getWeight(),
+                proposedComponentId
+            );
+
+            // Send acceptance message to the source vertex
+            context.sendMessage(validatedSourceId, acceptanceMessage);
+
+            LOGGER.debug("Accepted edge proposal: {} -- {} (weight: {}) between components {} and {}",
+                        validatedSourceId, validatedTargetId, message.getWeight(),
+                        currentComponentId, proposedComponentId);
+
+            return true;
+        } else {
+            // Edge endpoints are in the same component, would create a cycle
+            // Send rejection message
+            MSTMessage rejectionMessage = new MSTMessage(
+                MSTMessage.MessageType.EDGE_REJECTION,
+                validatedSourceId,
+                validatedTargetId,
+                message.getWeight(),
+                proposedComponentId
+            );
+
+            // Send rejection message to the source vertex
+            context.sendMessage(validatedSourceId, rejectionMessage);
+
+            LOGGER.debug("Rejected edge proposal: {} -- {} (weight: {}) - same component {}",
+                        validatedSourceId, validatedTargetId, message.getWeight(), currentComponentId);
+
+            return false;
+        }
+    }
+
+    /**
+     * Handle edge acceptance message.
+     * Add MST edge and merge components.
+     */
+    private boolean handleEdgeAcceptance(Object vertexId, MSTMessage message, MSTVertexState state) {
+        // Validate vertex IDs using cached type information
+        Object validatedVertexId = validateVertexId(vertexId);
+        Object validatedSourceId = validateVertexId(message.getSourceId());
+
+        // Create MST edge with validated IDs
+        MSTEdge mstEdge = new MSTEdge(validatedVertexId, validatedSourceId, message.getWeight());
+        state.addMSTEdge(mstEdge);
+
+        // Merge components with type validation
+        Object validatedMessageComponentId = validateVertexId(message.getComponentId());
+        Object newComponentId = findMinComponentId(state.getComponentId(), validatedMessageComponentId);
+        state.setComponentId(newComponentId);
+
+        return true;
+    }
+
+    /**
+     * Handle edge rejection message.
+     * Record rejected edges.
+     */
+    private boolean handleEdgeRejection(Object vertexId, MSTMessage message, MSTVertexState state) {
+        // Can record rejected edges here for debugging or statistics
+        return false;
+    }
+
+    /**
+     * Handle MST edge discovery message.
+     * Record discovered MST edges.
+     */
+    private boolean handleMSTEdgeFound(Object vertexId, MSTMessage message, MSTVertexState state) {
+        MSTEdge foundEdge = message.getEdge();
+        if (foundEdge != null && !state.getMstEdges().contains(foundEdge)) {
+            state.addMSTEdge(foundEdge);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Validate and convert vertex ID to ensure type safety.
+     * Uses TypeCastUtil for comprehensive type validation and conversion.
+     *
+     * @param vertexId The vertex ID to validate
+     * @return The validated vertex ID
+     * @throws IllegalArgumentException if vertexId is null or type incompatible
+     */
+    private Object validateVertexId(Object vertexId) {
+        if (vertexId == null) {
+            throw new IllegalArgumentException("Vertex ID cannot be null");
+        }
+
+        // If idType is not initialized (should not happen in normal flow), return as-is
+        if (idType == null) {
+            return vertexId;
+        }
+
+        try {
+            // Use TypeCastUtil for type conversion - this handles all supported type conversions
+            return TypeCastUtil.cast(vertexId, idType.getTypeClass());
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                String.format("Invalid vertex ID type conversion: expected %s, got %s (value: %s). Error: %s",
+                    idType.getTypeClass().getSimpleName(),
+                    vertexId.getClass().getSimpleName(),
+                    vertexId,
+                    e.getMessage()
+                ), e
+            );
+        }
+    }
+
+    /**
+     * Get current vertex state.
+     * Create new state if it doesn't exist.
+     */
+    private MSTVertexState getCurrentVertexState(RowVertex vertex) {
+        if (vertex.getValue() != null) {
+            Object stateObj = vertex.getValue().getField(STATE_FIELD_INDEX, ObjectType.INSTANCE);
+            if (stateObj != null) {
+                if (!(stateObj instanceof MSTVertexState)) {
+                    throw new IllegalStateException(
+                        String.format("Invalid vertex state type in getCurrentVertexState(): expected %s, got %s (value: %s)",
+                            MSTVertexState.class.getSimpleName(),
+                            stateObj.getClass().getSimpleName(),
+                            stateObj)
+                    );
+                }
+                return (MSTVertexState) stateObj;
+            }
+        }
+        // Validate vertex ID when creating new state
+        Object validatedVertexId = validateVertexId(vertex.getId());
+        return new MSTVertexState(validatedVertexId);
+    }
+
+    /**
+     * Select smaller component ID as new component ID.
+     * ID selection strategy for component merging.
+     */
+    private Object findMinComponentId(Object id1, Object id2) {
+        if (id1.toString().compareTo(id2.toString()) < 0) {
+            return id1;
+        }
+        return id2;
+    }
+    
+} 
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTEdge.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTEdge.java
new file mode 100644
index 0000000..0cc4895
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTEdge.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph.mst;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * MST edge class.
+ * Represents an edge in the minimum spanning tree, containing source vertex, target vertex and weight information.
+ * 
+ * <p>Supported operations:
+ * - Create edge
+ * - Get edge endpoints
+ * - Check if it is a self-loop
+ * - Create reverse edge
+ * - Compare edges (by weight and endpoints)
+ * 
+ * @author Geaflow Team
+ */
+public class MSTEdge implements Serializable, Comparable<MSTEdge> {
+    
+    private static final long serialVersionUID = 1L;
+    
+    /** Source vertex ID. */
+    private Object sourceId;
+    
+    /** Target vertex ID. */
+    private Object targetId;
+    
+    /** Edge weight. */
+    private double weight;
+
+    /**
+     * Constructor.
+     * @param sourceId Source vertex ID
+     * @param targetId Target vertex ID
+     * @param weight Edge weight
+     */
+    public MSTEdge(Object sourceId, Object targetId, double weight) {
+        this.sourceId = sourceId;
+        this.targetId = targetId;
+        this.weight = weight;
+    }
+
+    // Getters and Setters
+    
+    public Object getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(Object sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    public Object getTargetId() {
+        return targetId;
+    }
+
+    public void setTargetId(Object targetId) {
+        this.targetId = targetId;
+    }
+
+    public double getWeight() {
+        return weight;
+    }
+
+    public void setWeight(double weight) {
+        this.weight = weight;
+    }
+
+    /**
+     * Get the other endpoint of the edge.
+     * @param vertexId Known vertex ID
+     * @return Other endpoint ID, returns null if vertexId is not an endpoint of the edge
+     */
+    public Object getOtherEndpoint(Object vertexId) {
+        if (sourceId.equals(vertexId)) {
+            return targetId;
+        } else if (targetId.equals(vertexId)) {
+            return sourceId;
+        }
+        return null;
+    }
+
+    /**
+     * Check if specified vertex is an endpoint of the edge.
+     * @param vertexId Vertex ID
+     * @return Whether it is an endpoint
+     */
+    public boolean isEndpoint(Object vertexId) {
+        return sourceId.equals(vertexId) || targetId.equals(vertexId);
+    }
+
+    /**
+     * Check if it is a self-loop edge.
+     * @return Whether it is a self-loop
+     */
+    public boolean isSelfLoop() {
+        return sourceId.equals(targetId);
+    }
+
+    /**
+     * Create reverse edge.
+     * @return Reverse edge
+     */
+    public MSTEdge reverse() {
+        return new MSTEdge(targetId, sourceId, weight);
+    }
+
+    /**
+     * Check if two edges are equal (ignoring direction).
+     * @param other Another edge
+     * @return Whether they are equal
+     */
+    public boolean equalsIgnoreDirection(MSTEdge other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        // Quick weight comparison first (fastest check)
+        if (Double.compare(other.weight, weight) != 0) {
+            return false;
+        }
+
+        // Short-circuit direction comparison using OR operator
+        return (Objects.equals(sourceId, other.sourceId) && Objects.equals(targetId, other.targetId))
+            || (Objects.equals(sourceId, other.targetId) && Objects.equals(targetId, other.sourceId));
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        MSTEdge edge = (MSTEdge) obj;
+        return Double.compare(edge.weight, weight) == 0
+            && Objects.equals(sourceId, edge.sourceId)
+            && Objects.equals(targetId, edge.targetId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sourceId, targetId, weight);
+    }
+
+    @Override
+    public int compareTo(MSTEdge other) {
+        // First compare by weight
+        int weightCompare = Double.compare(this.weight, other.weight);
+        if (weightCompare != 0) {
+            return weightCompare;
+        }
+        
+        // If weights are equal, compare by source vertex ID
+        int sourceCompare = sourceId.toString().compareTo(other.sourceId.toString());
+        if (sourceCompare != 0) {
+            return sourceCompare;
+        }
+        
+        // If source vertex IDs are equal, compare by target vertex ID
+        return targetId.toString().compareTo(other.targetId.toString());
+    }
+
+    @Override
+    public String toString() {
+        return "MSTEdge{"
+                + "sourceId=" + sourceId
+                + ", targetId=" + targetId
+                + ", weight=" + weight
+                + '}';
+    }
+} 
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTMessage.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTMessage.java
new file mode 100644
index 0000000..5162e28
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTMessage.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph.mst;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * MST message class.
+ * Used for message passing between vertices, supporting different types of MST operations.
+ * 
+ * <p>Message types:
+ * - COMPONENT_UPDATE: Component update message
+ * - EDGE_PROPOSAL: Edge proposal message
+ * - EDGE_ACCEPTANCE: Edge acceptance message
+ * - EDGE_REJECTION: Edge rejection message
+ * - MST_EDGE_FOUND: MST edge discovery message
+ * 
+ * @author Geaflow Team
+ */
+public class MSTMessage implements Serializable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    /** Message type enumeration. */
+    public enum MessageType {
+        /** Component update message. */
+        COMPONENT_UPDATE,
+        /** Edge proposal message. */
+        EDGE_PROPOSAL,
+        /** Edge acceptance message. */
+        EDGE_ACCEPTANCE,
+        /** Edge rejection message. */
+        EDGE_REJECTION,
+        /** MST edge discovery message. */
+        MST_EDGE_FOUND
+    }
+
+    /** Message type. */
+    private MessageType type;
+    
+    /** Source vertex ID. */
+    private Object sourceId;
+    
+    /** Target vertex ID. */
+    private Object targetId;
+    
+    /** Edge weight. */
+    private double weight;
+    
+    /** Component ID. */
+    private Object componentId;
+    
+    /** MST edge. */
+    private MSTEdge edge;
+    
+    /** Message timestamp. */
+    private long timestamp;
+
+    /**
+     * Constructor.
+     * @param type Message type
+     * @param sourceId Source vertex ID
+     * @param targetId Target vertex ID
+     * @param weight Edge weight
+     */
+    public MSTMessage(MessageType type, Object sourceId, Object targetId, double weight) {
+        this.type = type;
+        this.sourceId = sourceId;
+        this.targetId = targetId;
+        this.weight = weight;
+        this.timestamp = System.currentTimeMillis();
+    }
+
+    /**
+     * Constructor with component ID.
+     * @param type Message type
+     * @param sourceId Source vertex ID
+     * @param targetId Target vertex ID
+     * @param weight Edge weight
+     * @param componentId Component ID
+     */
+    public MSTMessage(MessageType type, Object sourceId, Object targetId, double weight, Object componentId) {
+        this(type, sourceId, targetId, weight);
+        this.componentId = componentId;
+    }
+
+    // Getters and Setters
+    
+    public MessageType getType() {
+        return type;
+    }
+
+    public void setType(MessageType type) {
+        this.type = type;
+    }
+
+    public Object getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(Object sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    public Object getTargetId() {
+        return targetId;
+    }
+
+    public void setTargetId(Object targetId) {
+        this.targetId = targetId;
+    }
+
+    public double getWeight() {
+        return weight;
+    }
+
+    public void setWeight(double weight) {
+        this.weight = weight;
+    }
+
+    public Object getComponentId() {
+        return componentId;
+    }
+
+    public void setComponentId(Object componentId) {
+        this.componentId = componentId;
+    }
+
+    public MSTEdge getEdge() {
+        return edge;
+    }
+
+    public void setEdge(MSTEdge edge) {
+        this.edge = edge;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Check if this is a component update message.
+     * @return Whether this is a component update message
+     */
+    public boolean isComponentUpdate() {
+        return type == MessageType.COMPONENT_UPDATE;
+    }
+
+    /**
+     * Check if this is an edge proposal message.
+     * @return Whether this is an edge proposal message
+     */
+    public boolean isEdgeProposal() {
+        return type == MessageType.EDGE_PROPOSAL;
+    }
+
+    /**
+     * Check if this is an edge acceptance message.
+     * @return Whether this is an edge acceptance message
+     */
+    public boolean isEdgeAcceptance() {
+        return type == MessageType.EDGE_ACCEPTANCE;
+    }
+
+    /**
+     * Check if this is an edge rejection message.
+     * @return Whether this is an edge rejection message
+     */
+    public boolean isEdgeRejection() {
+        return type == MessageType.EDGE_REJECTION;
+    }
+
+    /**
+     * Check if this is an MST edge discovery message.
+     * @return Whether this is an MST edge discovery message
+     */
+    public boolean isMSTEdgeFound() {
+        return type == MessageType.MST_EDGE_FOUND;
+    }
+
+    /**
+     * Check if the message is expired.
+     * @param currentTime Current time
+     * @param timeout Timeout duration (milliseconds)
+     * @return Whether the message is expired
+     */
+    public boolean isExpired(long currentTime, long timeout) {
+        return (currentTime - timestamp) > timeout;
+    }
+
+    /**
+     * Create a copy of the message.
+     * @return Message copy
+     */
+    public MSTMessage copy() {
+        MSTMessage copy = new MSTMessage(type, sourceId, targetId, weight, componentId);
+        copy.setEdge(edge);
+        copy.setTimestamp(timestamp);
+        return copy;
+    }
+
+    /**
+     * Create a reverse message.
+     * @return Reverse message
+     */
+    public MSTMessage reverse() {
+        MSTMessage reverse = new MSTMessage(type, targetId, sourceId, weight, componentId);
+        reverse.setEdge(edge);
+        reverse.setTimestamp(timestamp);
+        return reverse;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        MSTMessage message = (MSTMessage) obj;
+        return Double.compare(message.weight, weight) == 0
+            && timestamp == message.timestamp
+            && type == message.type
+            && Objects.equals(sourceId, message.sourceId)
+            && Objects.equals(targetId, message.targetId)
+            && Objects.equals(componentId, message.componentId)
+            && Objects.equals(edge, message.edge);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, sourceId, targetId, weight, componentId, edge, timestamp);
+    }
+
+    @Override
+    public String toString() {
+        return "MSTMessage{"
+                + "type=" + type
+                + ", sourceId=" + sourceId
+                + ", targetId=" + targetId
+                + ", weight=" + weight
+                + ", componentId=" + componentId
+                + ", edge=" + edge
+                + ", timestamp=" + timestamp
+                + '}';
+    }
+} 
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTVertexState.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTVertexState.java
new file mode 100644
index 0000000..c7052c9
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/MSTVertexState.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph.mst;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * MST vertex state class.
+ * Maintains state information for each vertex in the minimum spanning tree.
+ * 
+ * <p>Contains information:
+ * - parentId: Parent node ID in MST
+ * - componentId: Component ID it belongs to
+ * - minEdgeWeight: Edge weight to parent node
+ * - isRoot: Whether it is a root node
+ * - mstEdges: MST edge set
+ * - changed: Whether the state has changed
+ * 
+ * @author Geaflow Team
+ */
+public class MSTVertexState implements Serializable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    /** Parent node ID in MST. */
+    private Object parentId;
+    
+    /** Component ID it belongs to. */
+    private Object componentId;
+    
+    /** Edge weight to parent node. */
+    private double minEdgeWeight;
+    
+    /** Whether it is a root node. */
+    private boolean isRoot;
+    
+    /** MST edge set with size limit to prevent memory overflow. */
+    private Set<MSTEdge> mstEdges;
+    
+    /** Maximum number of MST edges to store per vertex (memory optimization). */
+    private static final int MAX_MST_EDGES_PER_VERTEX = 100; // Reduced from 1000 to prevent memory overflow
+    
+    /** Whether the state has changed. */
+    private boolean changed;
+    
+    /** Vertex ID. */
+    private Object vertexId;
+
+    /**
+     * Constructor.
+     * @param vertexId Vertex ID
+     */
+    public MSTVertexState(Object vertexId) {
+        this.vertexId = vertexId;
+        this.parentId = vertexId; // Initially self as parent node
+        this.componentId = vertexId; // Initially self as independent component
+        this.minEdgeWeight = Double.MAX_VALUE; // Initial weight as infinity
+        this.isRoot = true; // Initially as root node
+        this.mstEdges = new HashSet<>(); // Initial MST edge set is empty
+        this.changed = false; // Initial state unchanged
+    }
+
+    // Getters and Setters
+    
+    public Object getParentId() {
+        return parentId;
+    }
+
+    public void setParentId(Object parentId) {
+        this.parentId = parentId;
+        this.changed = true;
+    }
+
+    public Object getComponentId() {
+        return componentId;
+    }
+
+    public void setComponentId(Object componentId) {
+        this.componentId = componentId;
+        this.changed = true;
+    }
+
+    public double getMinEdgeWeight() {
+        return minEdgeWeight;
+    }
+
+    public void setMinEdgeWeight(double minEdgeWeight) {
+        this.minEdgeWeight = minEdgeWeight;
+        this.changed = true;
+    }
+
+    public boolean isRoot() {
+        return isRoot;
+    }
+
+    public void setRoot(boolean root) {
+        this.isRoot = root;
+        this.changed = true;
+    }
+
+    public Set<MSTEdge> getMstEdges() {
+        return mstEdges;
+    }
+
+    public void setMstEdges(Set<MSTEdge> mstEdges) {
+        this.mstEdges = mstEdges;
+        this.changed = true;
+    }
+
+    public boolean isChanged() {
+        return changed;
+    }
+
+    public void setChanged(boolean changed) {
+        this.changed = changed;
+    }
+
+    public Object getVertexId() {
+        return vertexId;
+    }
+
+    public void setVertexId(Object vertexId) {
+        this.vertexId = vertexId;
+    }
+
+    /**
+     * Add MST edge with memory optimization.
+     * Prevents memory overflow by limiting the number of stored edges.
+     * @param edge MST edge
+     * @return Whether addition was successful
+     */
+    public boolean addMSTEdge(MSTEdge edge) {
+        // Memory optimization: limit the number of MST edges per vertex
+        if (this.mstEdges.size() >= MAX_MST_EDGES_PER_VERTEX) {
+            // Remove the edge with highest weight to make room for new edge
+            MSTEdge heaviestEdge = this.mstEdges.stream()
+                .max(MSTEdge::compareTo)
+                .orElse(null);
+            if (heaviestEdge != null && edge.getWeight() < heaviestEdge.getWeight()) {
+                this.mstEdges.remove(heaviestEdge);
+            } else {
+                // New edge is heavier than all existing edges, skip it
+                return false;
+            }
+        }
+        
+        boolean added = this.mstEdges.add(edge);
+        if (added) {
+            this.changed = true;
+        }
+        return added;
+    }
+
+    /**
+     * Remove MST edge.
+     * @param edge MST edge
+     * @return Whether removal was successful
+     */
+    public boolean removeMSTEdge(MSTEdge edge) {
+        boolean removed = this.mstEdges.remove(edge);
+        if (removed) {
+            this.changed = true;
+        }
+        return removed;
+    }
+
+    /**
+     * Check if contains the specified MST edge.
+     * @param edge MST edge
+     * @return Whether it contains the edge
+     */
+    public boolean containsMSTEdge(MSTEdge edge) {
+        return this.mstEdges.contains(edge);
+    }
+
+    /**
+     * Get the number of MST edges.
+     * @return Number of edges
+     */
+    public int getMSTEdgeCount() {
+        return this.mstEdges.size();
+    }
+
+    /**
+     * Clear MST edge set.
+     */
+    public void clearMSTEdges() {
+        if (!this.mstEdges.isEmpty()) {
+            this.mstEdges.clear();
+            this.changed = true;
+        }
+    }
+
+    /**
+     * Reset state change flag.
+     */
+    public void resetChanged() {
+        this.changed = false;
+    }
+    
+    /**
+     * Memory optimization: compact MST edges by removing redundant edges.
+     * Keeps only the most important edges to prevent memory overflow.
+     */
+    public void compactMSTEdges() {
+        if (this.mstEdges.size() > MAX_MST_EDGES_PER_VERTEX) {
+            // Convert to sorted list and keep only the lightest edges
+            Set<MSTEdge> compactedEdges = this.mstEdges.stream()
+                .sorted()
+                .limit(MAX_MST_EDGES_PER_VERTEX)
+                .collect(java.util.stream.Collectors.toSet());
+            
+            this.mstEdges.clear();
+            this.mstEdges.addAll(compactedEdges);
+            this.changed = true;
+        }
+    }
+    
+    /**
+     * Get memory usage estimate for this vertex state.
+     * @return Estimated memory usage in bytes
+     */
+    public long getMemoryUsageEstimate() {
+        long baseSize = 8 * 8; // Object overhead + 8 fields
+        long edgesSize = this.mstEdges.size() * 32; // Approximate size per MSTEdge
+        return baseSize + edgesSize;
+    }
+
+    /**
+     * Check if it is a leaf node (no child nodes).
+     * @return Whether it is a leaf node
+     */
+    public boolean isLeaf() {
+        return this.mstEdges.isEmpty();
+    }
+
+    /**
+     * Get edge weight to specified vertex.
+     * @param targetId Target vertex ID
+     * @return Edge weight, returns Double.MAX_VALUE if not exists
+     */
+    public double getEdgeWeightTo(Object targetId) {
+        for (MSTEdge edge : mstEdges) {
+            if (edge.getTargetId().equals(targetId) || edge.getSourceId().equals(targetId)) {
+                return edge.getWeight();
+            }
+        }
+        return Double.MAX_VALUE;
+    }
+
+    /**
+     * Check if connected to specified vertex.
+     * @param targetId Target vertex ID
+     * @return Whether connected
+     */
+    public boolean isConnectedTo(Object targetId) {
+        return getEdgeWeightTo(targetId) < Double.MAX_VALUE;
+    }
+
+    @Override
+    public String toString() {
+        return "MSTVertexState{"
+                + "vertexId=" + vertexId
+                + ", parentId=" + parentId
+                + ", componentId=" + componentId
+                + ", minEdgeWeight=" + minEdgeWeight
+                + ", isRoot=" + isRoot
+                + ", mstEdges=" + mstEdges
+                + ", changed=" + changed
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        MSTVertexState that = (MSTVertexState) obj;
+        return Double.compare(that.minEdgeWeight, minEdgeWeight) == 0
+            && isRoot == that.isRoot
+            && changed == that.changed
+            && Objects.equals(vertexId, that.vertexId)
+            && Objects.equals(parentId, that.parentId)
+            && Objects.equals(componentId, that.componentId)
+            && Objects.equals(mstEdges, that.mstEdges);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(vertexId, parentId, componentId, minEdgeWeight, isRoot, mstEdges, changed);
+    }
+} 
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/UnionFindHelper.java b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/UnionFindHelper.java
new file mode 100644
index 0000000..7c5bf77
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/mst/UnionFindHelper.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.udf.graph.mst;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Union-Find data structure helper class.
+ * Used for managing union and find operations on disjoint sets.
+ * 
+ * <p>Supported operations:
+ * - makeSet: Create new set
+ * - find: Find the set an element belongs to
+ * - union: Merge two sets
+ * - getSetCount: Get number of sets
+ * - clear: Clear all sets
+ * 
+ * @author Geaflow Team
+ */
+public class UnionFindHelper implements Serializable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    /** Parent node mapping. */
+    private Map<Object, Object> parent;
+    
+    /** Rank mapping (for path compression optimization). */
+    private Map<Object, Integer> rank;
+    
+    /** Set size mapping. */
+    private Map<Object, Integer> size;
+    
+    /** Number of sets. */
+    private int setCount;
+
+    /**
+     * Constructor.
+     */
+    public UnionFindHelper() {
+        this.parent = new HashMap<>();
+        this.rank = new HashMap<>();
+        this.size = new HashMap<>();
+        this.setCount = 0;
+    }
+
+    /**
+     * Create new set.
+     * @param x Element
+     */
+    public void makeSet(Object x) {
+        if (!parent.containsKey(x)) {
+            parent.put(x, x);
+            rank.put(x, 0);
+            size.put(x, 1);
+            setCount++;
+        }
+    }
+
+    /**
+     * Find the root node of the set an element belongs to.
+     * @param x Element
+     * @return Root node
+     */
+    public Object find(Object x) {
+        if (!parent.containsKey(x)) {
+            return null;
+        }
+        
+        if (!parent.get(x).equals(x)) {
+            parent.put(x, find(parent.get(x)));
+        }
+        return parent.get(x);
+    }
+
+    /**
+     * Merge two sets.
+     * @param x First element
+     * @param y Second element
+     * @return Whether merge was successful
+     */
+    public boolean union(Object x, Object y) {
+        Object rootX = find(x);
+        Object rootY = find(y);
+        
+        if (rootX == null || rootY == null) {
+            return false;
+        }
+        
+        if (rootX.equals(rootY)) {
+            return false; // Already in the same set
+        }
+        
+        // Union by rank
+        if (rank.get(rootX) < rank.get(rootY)) {
+            Object temp = rootX;
+            rootX = rootY;
+            rootY = temp;
+        }
+        
+        parent.put(rootY, rootX);
+        size.put(rootX, size.get(rootX) + size.get(rootY));
+        
+        if (rank.get(rootX).equals(rank.get(rootY))) {
+            rank.put(rootX, rank.get(rootX) + 1);
+        }
+        
+        setCount--;
+        return true;
+    }
+
+    /**
+     * Get number of sets.
+     * @return Number of sets
+     */
+    public int getSetCount() {
+        return setCount;
+    }
+
+    /**
+     * Get size of specified set.
+     * @param x Any element in the set
+     * @return Set size
+     */
+    public int getSetSize(Object x) {
+        Object root = find(x);
+        if (root == null) {
+            return 0;
+        }
+        return size.get(root);
+    }
+
+    /**
+     * Check if two elements are in the same set.
+     * @param x First element
+     * @param y Second element
+     * @return Whether they are in the same set
+     */
+    public boolean isConnected(Object x, Object y) {
+        Object rootX = find(x);
+        Object rootY = find(y);
+        return rootX != null && rootX.equals(rootY);
+    }
+
+    /**
+     * Clear all sets.
+     */
+    public void clear() {
+        parent.clear();
+        rank.clear();
+        size.clear();
+        setCount = 0;
+    }
+
+    /**
+     * Check if Union-Find structure is empty.
+     * @return Whether it is empty
+     */
+    public boolean isEmpty() {
+        return parent.isEmpty();
+    }
+
+    /**
+     * Get number of elements in Union-Find structure.
+     * @return Number of elements
+     */
+    public int size() {
+        return parent.size();
+    }
+
+    /**
+     * Check if element exists.
+     * @param x Element
+     * @return Whether it exists
+     */
+    public boolean contains(Object x) {
+        return parent.containsKey(x);
+    }
+
+    /**
+     * Remove element (and its set).
+     * @param x Element
+     * @return Whether removal was successful
+     */
+    public boolean remove(Object x) {
+        if (!parent.containsKey(x)) {
+            return false;
+        }
+
+        Object root = find(x);
+        int rootSize = size.get(root);
+        
+        if (rootSize == 1) {
+            // If set has only one element, remove directly
+            parent.remove(x);
+            rank.remove(x);
+            size.remove(x);
+            setCount--;
+        } else {
+            // If set has multiple elements, need to reorganize
+            // Simplified handling here, actual applications may need more complex logic
+            parent.remove(x);
+            size.put(root, rootSize - 1);
+        }
+        
+        return true;
+    }
+
+    /**
+     * Get all elements in specified set.
+     * @param root Set root node
+     * @return All elements in the set
+     */
+    public java.util.Set<Object> getSetElements(Object root) {
+        java.util.Set<Object> elements = new java.util.HashSet<>();
+        for (Object x : parent.keySet()) {
+            if (find(x).equals(root)) {
+                elements.add(x);
+            }
+        }
+        return elements;
+    }
+
+    @Override
+    public String toString() {
+        return "UnionFindHelper{"
+                + "parent=" + parent
+                + ", rank=" + rank
+                + ", size=" + size
+                + ", setCount=" + setCount
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        UnionFindHelper that = (UnionFindHelper) obj;
+        return setCount == that.setCount
+            && Objects.equals(parent, that.parent)
+            && Objects.equals(rank, that.rank)
+            && Objects.equals(size, that.size);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(parent, rank, size, setCount);
+    }
+} 
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmDynamicRuntimeContext.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmDynamicRuntimeContext.java
index 07105aa..d929ae4 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmDynamicRuntimeContext.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmDynamicRuntimeContext.java
@@ -268,6 +268,35 @@
         return incVCTraversalCtx;
     }
 
+    @Override
+    public void voteToTerminate(String terminationReason, Object voteValue) {
+        // Send termination vote to coordinator through aggregation context
+        if (aggContext != null) {
+            aggContext.aggregate(new AlgorithmTerminationVote(terminationReason, voteValue));
+        }
+    }
+
+    /**
+     * Internal class representing a termination vote sent to the coordinator.
+     */
+    private static class AlgorithmTerminationVote implements ITraversalAgg {
+        private final String terminationReason;
+        private final Object voteValue;
+
+        public AlgorithmTerminationVote(String terminationReason, Object voteValue) {
+            this.terminationReason = terminationReason;
+            this.voteValue = voteValue;
+        }
+
+        public String getTerminationReason() {
+            return terminationReason;
+        }
+
+        public Object getVoteValue() {
+            return voteValue;
+        }
+    }
+
     private static class AlgorithmResponse implements ITraversalResponse<Row> {
 
         private final Row row;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmRuntimeContext.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmRuntimeContext.java
index 01794a7..7696b4f 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmRuntimeContext.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowAlgorithmRuntimeContext.java
@@ -197,6 +197,35 @@
         this.aggContext = Objects.requireNonNull(aggContext);
     }
 
+    @Override
+    public void voteToTerminate(String terminationReason, Object voteValue) {
+        // Send termination vote to coordinator through aggregation context
+        if (aggContext != null) {
+            aggContext.aggregate(new AlgorithmTerminationVote(terminationReason, voteValue));
+        }
+    }
+
+    /**
+     * Internal class representing a termination vote sent to the coordinator.
+     */
+    private static class AlgorithmTerminationVote implements ITraversalAgg {
+        private final String terminationReason;
+        private final Object voteValue;
+
+        public AlgorithmTerminationVote(String terminationReason, Object voteValue) {
+            this.terminationReason = terminationReason;
+            this.voteValue = voteValue;
+        }
+
+        public String getTerminationReason() {
+            return terminationReason;
+        }
+
+        public Object getVoteValue() {
+            return voteValue;
+        }
+    }
+
     private static class AlgorithmResponse implements ITraversalResponse<Row> {
 
         private final Row row;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncMSTPerformanceTest.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncMSTPerformanceTest.java
new file mode 100644
index 0000000..44d7ab7
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncMSTPerformanceTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.query;
+
+import org.apache.geaflow.common.config.keys.DSLConfigKeys;
+import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
+import org.apache.geaflow.file.FileConfigKeys;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.testng.annotations.Test;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterClass;
+
+/**
+ * IncMST algorithm performance test class
+ * Test algorithm performance in large graph scenarios
+ * 
+ * @author Geaflow Team
+ */
+public class IncMSTPerformanceTest {
+
+    private final String TEST_GRAPH_PATH = "/tmp/geaflow/dsl/inc_mst/perf_test/graph";
+    private long startTime;
+    private long endTime;
+
+    @BeforeClass
+    public void setUp() throws IOException {
+        // Clean up test directory
+        FileUtils.deleteDirectory(new File(TEST_GRAPH_PATH));
+        System.out.println("=== IncMST Performance Test Setup Complete ===");
+    }
+
+    @AfterClass
+    public void tearDown() throws IOException {
+        // Clean up test directory
+        FileUtils.deleteDirectory(new File(TEST_GRAPH_PATH));
+        System.out.println("=== IncMST Performance Test Cleanup Complete ===");
+    }
+
+    @Test
+    public void testIncMST_001_SmallGraphPerformance() throws Exception {
+        System.out.println("Starting Small Graph Performance Test...");
+        startTime = System.nanoTime();
+
+        QueryTester
+            .build()
+            .withGraphDefine("/query/modern_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_perf_001.sql")
+            .execute()
+            .checkSinkResult();
+
+        endTime = System.nanoTime();
+        printPerformanceMetrics("Small Graph (Modern)", startTime, endTime);
+    }
+
+    @Test
+    public void testIncMST_002_MediumGraphPerformance() throws Exception {
+        System.out.println("Starting Medium Graph Performance Test...");
+        startTime = System.nanoTime();
+
+        QueryTester
+            .build()
+            .withGraphDefine("/query/medium_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_perf_002.sql")
+            .execute()
+            .checkSinkResult();
+
+        endTime = System.nanoTime();
+        printPerformanceMetrics("Medium Graph (1K vertices)", startTime, endTime);
+    }
+
+    @Test
+    public void testIncMST_003_LargeGraphPerformance() throws Exception {
+        System.out.println("Starting Large Graph Performance Test...");
+        startTime = System.nanoTime();
+
+        QueryTester
+            .build()
+            .withGraphDefine("/query/large_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_perf_003.sql")
+            .execute()
+            .checkSinkResult();
+
+        endTime = System.nanoTime();
+        printPerformanceMetrics("Large Graph (10K vertices)", startTime, endTime);
+    }
+
+    @Test
+    public void testIncMST_004_IncrementalUpdatePerformance() throws Exception {
+        System.out.println("Starting Incremental Update Performance Test...");
+        startTime = System.nanoTime();
+
+        QueryTester
+            .build()
+            .withGraphDefine("/query/dynamic_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_perf_004.sql")
+            .execute()
+            .checkSinkResult();
+
+        endTime = System.nanoTime();
+        printPerformanceMetrics("Incremental Update", startTime, endTime);
+    }
+
+    @Test
+    public void testIncMST_005_ConvergencePerformance() throws Exception {
+        System.out.println("Starting Convergence Performance Test...");
+        startTime = System.nanoTime();
+
+        QueryTester
+            .build()
+            .withGraphDefine("/query/modern_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_perf_005.sql")
+            .execute()
+            .checkSinkResult();
+
+        endTime = System.nanoTime();
+        printPerformanceMetrics("Convergence Test", startTime, endTime);
+    }
+
+    @Test
+    public void testIncMST_006_MemoryEfficiency() throws Exception {
+        System.out.println("Starting Memory Efficiency Test...");
+        long initialMemory = getCurrentMemoryUsage();
+        startTime = System.nanoTime();
+
+        QueryTester
+            .build()
+            .withGraphDefine("/query/large_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_perf_006.sql")
+            .execute()
+            .checkSinkResult();
+
+        endTime = System.nanoTime();
+        long finalMemory = getCurrentMemoryUsage();
+        printPerformanceMetrics("Memory Efficiency", startTime, endTime);
+        printMemoryMetrics("Memory Efficiency", initialMemory, finalMemory);
+    }
+
+    @Test
+    public void testIncMST_007_ScalabilityTest() throws Exception {
+        System.out.println("Starting Scalability Test...");
+        startTime = System.nanoTime();
+
+        QueryTester
+            .build()
+            .withGraphDefine("/query/scalability_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_perf_007.sql")
+            .execute()
+            .checkSinkResult();
+
+        endTime = System.nanoTime();
+        printPerformanceMetrics("Scalability Test (100K vertices)", startTime, endTime);
+    }
+
+    /**
+     * Print performance metrics
+     * @param testName Test name
+     * @param startTime Start time (nanoseconds)
+     * @param endTime End time (nanoseconds)
+     */
+    private void printPerformanceMetrics(String testName, long startTime, long endTime) {
+        long durationNano = endTime - startTime;
+        long durationMs = TimeUnit.NANOSECONDS.toMillis(durationNano);
+        long durationSec = TimeUnit.NANOSECONDS.toSeconds(durationNano);
+
+        System.out.println("=== Performance Metrics for " + testName + " ===");
+        System.out.println("Execution Time: " + durationMs + " ms (" + durationSec + " seconds)");
+        System.out.println("Throughput: " + String.format("%.2f", 1000.0 / durationMs) + " operations/ms");
+        System.out.println("========================================");
+    }
+
+    /**
+     * Print memory metrics
+     * @param testName Test name
+     * @param initialMemory Initial memory usage (bytes)
+     * @param finalMemory Final memory usage (bytes)
+     */
+    private void printMemoryMetrics(String testName, long initialMemory, long finalMemory) {
+        long memoryUsed = finalMemory - initialMemory;
+        double memoryUsedMB = memoryUsed / (1024.0 * 1024.0);
+
+        System.out.println("=== Memory Metrics for " + testName + " ===");
+        System.out.println("Memory Used: " + String.format("%.2f", memoryUsedMB) + " MB");
+        System.out.println("Initial Memory: " + formatMemorySize(initialMemory));
+        System.out.println("Final Memory: " + formatMemorySize(finalMemory));
+        System.out.println("========================================");
+    }
+
+    /**
+     * Get current memory usage
+     * @return Memory usage (bytes)
+     */
+    private long getCurrentMemoryUsage() {
+        Runtime runtime = Runtime.getRuntime();
+        return runtime.totalMemory() - runtime.freeMemory();
+    }
+
+    /**
+     * Format memory size
+     * @param bytes Number of bytes
+     * @return Formatted string
+     */
+    private String formatMemorySize(long bytes) {
+        if (bytes < 1024) {
+            return bytes + " B";
+        } else if (bytes < 1024 * 1024) {
+            return String.format("%.2f KB", bytes / 1024.0);
+        } else if (bytes < 1024 * 1024 * 1024) {
+            return String.format("%.2f MB", bytes / (1024.0 * 1024.0));
+        } else {
+            return String.format("%.2f GB", bytes / (1024.0 * 1024.0 * 1024.0));
+        }
+    }
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncMSTTest.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncMSTTest.java
new file mode 100644
index 0000000..c165b62
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/IncMSTTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.query;
+
+import org.apache.geaflow.common.config.keys.DSLConfigKeys;
+import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
+import org.apache.geaflow.file.FileConfigKeys;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.testng.annotations.Test;
+
+/**
+ * Incremental Minimum Spanning Tree algorithm test class
+ * Includes basic functionality tests, incremental update tests, connectivity validation, etc.
+ * 
+ * @author Geaflow Team
+ */
+public class IncMSTTest {
+
+    private final String TEST_GRAPH_PATH = "/tmp/geaflow/dsl/inc_mst/test/graph";
+
+    @Test
+    public void testIncMST_001_Basic() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/modern_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_001.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_002_IncrementalUpdate() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/modern_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_002.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_003_LargeGraph() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/large_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_003.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_004_EdgeAddition() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/dynamic_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_004.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_005_EdgeDeletion() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/dynamic_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_005.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_006_ConnectedComponents() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/disconnected_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_006.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_007_Performance() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/performance_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_007.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_008_Convergence() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/modern_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_008.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_009_CustomParameters() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/modern_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_009.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test
+    public void testIncMST_010_ComplexTopology() throws Exception {
+        QueryTester
+            .build()
+            .withGraphDefine("/query/complex_graph.sql")
+            .withQueryPath("/query/gql_inc_mst_010.sql")
+            .execute()
+            .checkSinkResult();
+    }
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/complex_edge.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/complex_edge.txt
new file mode 100644
index 0000000..f1889fa
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/complex_edge.txt
@@ -0,0 +1,22 @@
+4001,4002,0.5
+4002,4003,0.8
+4003,4004,0.3
+4004,4005,0.6
+4005,4006,0.4
+4006,4007,0.7
+4007,4008,0.2
+4001,4003,0.4
+4002,4004,0.6
+4003,4005,0.8
+4004,4006,0.3
+4005,4007,0.5
+4006,4008,0.7
+4001,4004,0.6
+4002,4005,0.3
+4003,4006,0.7
+4004,4007,0.2
+4005,4008,0.4
+4001,4005,0.8
+4002,4006,0.1
+4003,4007,0.9
+4004,4008,0.3
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/complex_vertex.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/complex_vertex.txt
new file mode 100644
index 0000000..f02c96f
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/complex_vertex.txt
@@ -0,0 +1,8 @@
+complex1,4001
+complex2,4002
+complex3,4003
+complex4,4004
+complex5,4005
+complex6,4006
+complex7,4007
+complex8,4008
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/disconnected_edge.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/disconnected_edge.txt
new file mode 100644
index 0000000..302284c
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/disconnected_edge.txt
@@ -0,0 +1,6 @@
+2001,2002,0.5
+2002,2003,0.8
+2003,2001,0.3
+2004,2005,0.6
+2005,2006,0.4
+2006,2004,0.7
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/disconnected_vertex.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/disconnected_vertex.txt
new file mode 100644
index 0000000..7022659
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/disconnected_vertex.txt
@@ -0,0 +1,6 @@
+disconnected1,2001
+disconnected2,2002
+disconnected3,2003
+disconnected4,2004
+disconnected5,2005
+disconnected6,2006
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/dynamic_edge.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/dynamic_edge.txt
new file mode 100644
index 0000000..05ae8ef
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/dynamic_edge.txt
@@ -0,0 +1,10 @@
+1001,1002,0.5
+1002,1003,0.8
+1003,1004,0.3
+1004,1005,0.6
+1001,1003,0.4
+1002,1004,0.7
+1003,1005,0.2
+1001,1004,0.6
+1002,1005,0.3
+1001,1005,0.8
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/dynamic_vertex.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/dynamic_vertex.txt
new file mode 100644
index 0000000..f56c4f7
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/dynamic_vertex.txt
@@ -0,0 +1,5 @@
+dynamic1,1001
+dynamic2,1002
+dynamic3,1003
+dynamic4,1004
+dynamic5,1005
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/large_edge.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/large_edge.txt
new file mode 100644
index 0000000..75b485c
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/large_edge.txt
@@ -0,0 +1,20 @@
+1,2,0.5
+2,3,0.8
+3,4,0.3
+4,5,0.6
+5,6,0.4
+6,7,0.7
+7,8,0.2
+8,9,0.9
+9,10,0.1
+1,3,0.4
+2,4,0.6
+3,5,0.8
+4,6,0.3
+5,7,0.5
+6,8,0.7
+7,9,0.2
+8,10,0.4
+1,4,0.6
+2,5,0.3
+3,6,0.7
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/large_vertex.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/large_vertex.txt
new file mode 100644
index 0000000..3ff64b7
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/large_vertex.txt
@@ -0,0 +1,10 @@
+node1,1
+node2,2
+node3,3
+node4,4
+node5,5
+node6,6
+node7,7
+node8,8
+node9,9
+node10,10
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/medium_edge.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/medium_edge.txt
new file mode 100644
index 0000000..2d50bca
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/medium_edge.txt
@@ -0,0 +1,45 @@
+5001,5002,0.5
+5002,5003,0.8
+5003,5004,0.3
+5004,5005,0.6
+5005,5006,0.4
+5006,5007,0.7
+5007,5008,0.2
+5008,5009,0.9
+5009,5010,0.1
+5001,5003,0.4
+5002,5004,0.6
+5003,5005,0.8
+5004,5006,0.3
+5005,5007,0.5
+5006,5008,0.7
+5007,5009,0.2
+5008,5010,0.4
+5001,5004,0.6
+5002,5005,0.3
+5003,5006,0.7
+5004,5007,0.2
+5005,5008,0.4
+5006,5009,0.8
+5007,5010,0.3
+5001,5005,0.7
+5002,5006,0.1
+5003,5007,0.9
+5004,5008,0.2
+5005,5009,0.6
+5006,5010,0.4
+5001,5006,0.3
+5002,5007,0.8
+5003,5008,0.5
+5004,5009,0.7
+5005,5010,0.2
+5001,5007,0.6
+5002,5008,0.4
+5003,5009,0.8
+5004,5010,0.1
+5001,5008,0.9
+5002,5009,0.3
+5003,5010,0.7
+5001,5009,0.2
+5002,5010,0.5
+5001,5010,0.8
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/medium_vertex.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/medium_vertex.txt
new file mode 100644
index 0000000..95ca83a
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/medium_vertex.txt
@@ -0,0 +1,10 @@
+medium1,5001
+medium2,5002
+medium3,5003
+medium4,5004
+medium5,5005
+medium6,5006
+medium7,5007
+medium8,5008
+medium9,5009
+medium10,5010
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/performance_edge.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/performance_edge.txt
new file mode 100644
index 0000000..a1f6597
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/performance_edge.txt
@@ -0,0 +1,16 @@
+3001,3002,0.5
+3002,3003,0.8
+3003,3004,0.3
+3004,3005,0.6
+3005,3006,0.4
+3006,3007,0.7
+3007,3008,0.2
+3001,3003,0.4
+3002,3004,0.6
+3003,3005,0.8
+3004,3006,0.3
+3005,3007,0.5
+3006,3008,0.7
+3001,3004,0.6
+3002,3005,0.3
+3003,3006,0.7
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/performance_vertex.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/performance_vertex.txt
new file mode 100644
index 0000000..83d88b3
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/performance_vertex.txt
@@ -0,0 +1,8 @@
+perf1,3001
+perf2,3002
+perf3,3003
+perf4,3004
+perf5,3005
+perf6,3006
+perf7,3007
+perf8,3008
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/scalability_edge.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/scalability_edge.txt
new file mode 100644
index 0000000..ea8c654
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/scalability_edge.txt
@@ -0,0 +1,10 @@
+6001,6002,0.5
+6002,6003,0.8
+6003,6004,0.3
+6004,6005,0.6
+6001,6003,0.4
+6002,6004,0.6
+6003,6005,0.8
+6001,6004,0.6
+6002,6005,0.3
+6001,6005,0.8
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/scalability_vertex.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/scalability_vertex.txt
new file mode 100644
index 0000000..62a86a9
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/data/scalability_vertex.txt
@@ -0,0 +1,5 @@
+scale1,6001
+scale2,6002
+scale3,6003
+scale4,6004
+scale5,6005
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_001.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_001.txt
new file mode 100644
index 0000000..aef9f5a
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_001.txt
@@ -0,0 +1,6 @@
+4,4,0.4
+1,1,0.4
+1,1,1.0
+4,4,1.0
+1,1,0.5
+6,6,0.2
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_002.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_002.txt
new file mode 100644
index 0000000..082e2cd
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_002.txt
@@ -0,0 +1,6 @@
+1,1,0.5
+6,6,0.2
+4,4,1.0
+1,1,0.4
+1,1,1.0
+4,4,0.4
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_003.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_003.txt
new file mode 100644
index 0000000..0ee2ab3
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_003.txt
@@ -0,0 +1,18 @@
+1,1,0.6
+7,7,0.2
+2,2,0.6
+8,8,0.9
+5,5,0.4
+3,3,0.8
+2,2,0.3
+8,8,0.4
+1,1,0.5
+6,6,0.7
+3,3,0.7
+2,2,0.8
+4,4,0.3
+1,1,0.4
+5,5,0.5
+3,3,0.3
+4,4,0.6
+9,9,0.1
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_004.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_004.txt
new file mode 100644
index 0000000..b128a53
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_004.txt
@@ -0,0 +1,10 @@
+1001,1001,0.5
+1001,1001,0.8
+1002,1002,0.3
+1003,1003,0.3
+1004,1004,0.6
+1001,1001,0.6
+1001,1001,0.4
+1002,1002,0.7
+1002,1002,0.8
+1003,1003,0.2
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_005.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_005.txt
new file mode 100644
index 0000000..b128a53
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_005.txt
@@ -0,0 +1,10 @@
+1001,1001,0.5
+1001,1001,0.8
+1002,1002,0.3
+1003,1003,0.3
+1004,1004,0.6
+1001,1001,0.6
+1001,1001,0.4
+1002,1002,0.7
+1002,1002,0.8
+1003,1003,0.2
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_006.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_006.txt
new file mode 100644
index 0000000..ccf52f3
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_006.txt
@@ -0,0 +1,6 @@
+2002,2002,0.8
+2004,2004,0.6
+2001,2001,0.5
+2003,2003,0.3
+2006,2006,0.7
+2005,2005,0.4
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_007.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_007.txt
new file mode 100644
index 0000000..8998de0
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_007.txt
@@ -0,0 +1,15 @@
+3002,3002,0.6
+3003,3003,0.8
+3001,3001,0.6
+3002,3002,0.8
+3006,3006,0.7
+3001,3001,0.4
+3005,3005,0.4
+3002,3002,0.3
+3003,3003,0.3
+3004,3004,0.3
+3005,3005,0.5
+3001,3001,0.5
+3003,3003,0.7
+3007,3007,0.2
+3004,3004,0.6
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_008.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_008.txt
new file mode 100644
index 0000000..082e2cd
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_008.txt
@@ -0,0 +1,6 @@
+1,1,0.5
+6,6,0.2
+4,4,1.0
+1,1,0.4
+1,1,1.0
+4,4,0.4
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_009.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_009.txt
new file mode 100644
index 0000000..082e2cd
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_009.txt
@@ -0,0 +1,6 @@
+1,1,0.5
+6,6,0.2
+4,4,1.0
+1,1,0.4
+1,1,1.0
+4,4,0.4
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_010.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_010.txt
new file mode 100644
index 0000000..9f8e1c3
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_010.txt
@@ -0,0 +1,19 @@
+4002,4002,0.6
+4004,4004,0.3
+4003,4003,0.8
+4006,4006,0.7
+4001,4001,0.4
+4002,4002,0.8
+4004,4004,0.2
+4003,4003,0.3
+4005,4005,0.5
+4001,4001,0.8
+4002,4002,0.3
+4004,4004,0.6
+4003,4003,0.9
+4007,4007,0.2
+4001,4001,0.6
+4002,4002,0.1
+4003,4003,0.7
+4005,4005,0.4
+4001,4001,0.5
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_001.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_001.txt
new file mode 100644
index 0000000..006f441
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_001.txt
@@ -0,0 +1,6 @@
+1,1,0.4
+1,1,1.0
+4,4,0.4
+1,1,0.5
+6,6,0.2
+4,4,1.0
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_002.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_002.txt
new file mode 100644
index 0000000..50e7ff7
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_002.txt
@@ -0,0 +1,36 @@
+5008,5008,0.4
+5003,5003,0.3
+5003,5003,0.5
+5003,5003,0.8
+5005,5005,0.5
+5005,5005,0.2
+5007,5007,0.2
+5001,5001,0.5
+5001,5001,0.3
+5001,5001,0.9
+5001,5001,0.8
+5002,5002,0.6
+5002,5002,0.5
+5002,5002,0.4
+5004,5004,0.3
+5004,5004,0.6
+5004,5004,0.1
+5006,5006,0.8
+5008,5008,0.9
+5009,5009,0.1
+5003,5003,0.7
+5003,5003,0.9
+5005,5005,0.6
+5005,5005,0.4
+5007,5007,0.3
+5001,5001,0.6
+5001,5001,0.7
+5001,5001,0.2
+5001,5001,0.4
+5002,5002,0.3
+5002,5002,0.1
+5002,5002,0.8
+5004,5004,0.7
+5004,5004,0.2
+5006,5006,0.7
+5006,5006,0.4
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_003.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_003.txt
new file mode 100644
index 0000000..424f0bf
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_003.txt
@@ -0,0 +1,18 @@
+5,5,0.4
+2,2,0.3
+8,8,0.4
+7,7,0.2
+1,1,0.4
+5,5,0.5
+4,4,0.6
+9,9,0.1
+3,3,0.8
+2,2,0.6
+8,8,0.9
+1,1,0.6
+3,3,0.7
+2,2,0.8
+4,4,0.3
+1,1,0.5
+6,6,0.7
+3,3,0.3
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_004.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_004.txt
new file mode 100644
index 0000000..e177086
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_004.txt
@@ -0,0 +1,10 @@
+1001,1001,0.6
+1001,1001,0.4
+1002,1002,0.7
+1002,1002,0.8
+1003,1003,0.3
+1004,1004,0.6
+1001,1001,0.5
+1001,1001,0.8
+1002,1002,0.3
+1003,1003,0.2
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_005.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_005.txt
new file mode 100644
index 0000000..b33264a
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_005.txt
@@ -0,0 +1,6 @@
+4,4,1.0
+1,1,0.5
+6,6,0.2
+4,4,0.4
+1,1,0.4
+1,1,1.0
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_006.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_006.txt
new file mode 100644
index 0000000..ee47a0b
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_006.txt
@@ -0,0 +1,18 @@
+3,3,0.3
+2,2,0.3
+8,8,0.4
+1,1,0.5
+6,6,0.7
+3,3,0.7
+4,4,0.6
+9,9,0.1
+1,1,0.6
+3,3,0.8
+2,2,0.6
+8,8,0.9
+1,1,0.4
+5,5,0.5
+7,7,0.2
+2,2,0.8
+4,4,0.3
+5,5,0.4
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_007.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_007.txt
new file mode 100644
index 0000000..e2ff813
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_inc_mst_perf_007.txt
@@ -0,0 +1,10 @@
+6002,6002,0.3
+6001,6001,0.4
+6002,6002,0.6
+6004,6004,0.6
+6003,6003,0.8
+6001,6001,0.8
+6002,6002,0.8
+6001,6001,0.5
+6003,6003,0.3
+6001,6001,0.6
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/kafka_scan_002.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/kafka_scan_002.txt
index 5825292..e69de29 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/kafka_scan_002.txt
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/kafka_scan_002.txt
@@ -1,5 +0,0 @@
-1,jim,15
-2,kate,16
-3,same,20
-4,lucy,21
-5,brown,22
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/complex_graph.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/complex_graph.sql
new file mode 100644
index 0000000..ffd0018
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/complex_graph.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v_complex_node (
+  name varchar,
+  id bigint
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/complex_vertex.txt'
+);
+
+CREATE TABLE e_complex_edge (
+  srcId bigint,
+  targetId bigint,
+  weight double
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/complex_edge.txt'
+);
+
+CREATE GRAPH complex_graph (
+	Vertex node using v_complex_node WITH ID(id),
+	Edge connects using e_complex_edge WITH ID(srcId, targetId)
+) WITH (
+	storeType='memory',
+	shardCount = 4
+);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/disconnected_graph.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/disconnected_graph.sql
new file mode 100644
index 0000000..18b3616
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/disconnected_graph.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v_disconnected_node (
+  name varchar,
+  id bigint
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/disconnected_vertex.txt'
+);
+
+CREATE TABLE e_disconnected_edge (
+  srcId bigint,
+  targetId bigint,
+  weight double
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/disconnected_edge.txt'
+);
+
+CREATE GRAPH disconnected_graph (
+	Vertex node using v_disconnected_node WITH ID(id),
+	Edge connects using e_disconnected_edge WITH ID(srcId, targetId)
+) WITH (
+	storeType='memory',
+	shardCount = 2
+);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/dynamic_graph.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/dynamic_graph.sql
new file mode 100644
index 0000000..7fdba32
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/dynamic_graph.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v_dynamic_node (
+  name varchar,
+  id bigint
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/dynamic_vertex.txt'
+);
+
+CREATE TABLE e_dynamic_edge (
+  srcId bigint,
+  targetId bigint,
+  weight double
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/dynamic_edge.txt'
+);
+
+CREATE GRAPH dynamic_graph (
+	Vertex node using v_dynamic_node WITH ID(id),
+	Edge connects using e_dynamic_edge WITH ID(srcId, targetId)
+) WITH (
+	storeType='memory',
+	shardCount = 2
+);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_001.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_001.sql
new file mode 100644
index 0000000..a4eef64
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_001.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm basic test
+ * Execute basic IncMST algorithm on modern graph
+ */
+CREATE TABLE inc_mst_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO inc_mst_result
+CALL IncMST() YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_002.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_002.sql
new file mode 100644
index 0000000..2892564
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_002.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm incremental update test
+ * Test incremental update scenarios
+ */
+CREATE TABLE inc_mst_incr_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO inc_mst_incr_result
+CALL IncMST(30, 0.001, 'mst_incr_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
\ No newline at end of file
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_003.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_003.sql
new file mode 100644
index 0000000..a250953
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_003.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm large graph test
+ * Execute IncMST algorithm on large graph
+ */
+CREATE TABLE inc_mst_large_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH large_graph;
+
+INSERT INTO inc_mst_large_result
+CALL IncMST(100, 0.001, 'mst_large_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_004.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_004.sql
new file mode 100644
index 0000000..6a25822
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_004.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm edge addition test
+ * Test dynamic edge addition scenarios
+ */
+CREATE TABLE inc_mst_edge_add_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH dynamic_graph;
+
+INSERT INTO inc_mst_edge_add_result
+CALL IncMST(50, 0.001, 'mst_edge_add_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_005.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_005.sql
new file mode 100644
index 0000000..cbbd7c7
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_005.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm edge deletion test
+ * Test dynamic edge deletion scenarios
+ */
+CREATE TABLE inc_mst_edge_del_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH dynamic_graph;
+
+INSERT INTO inc_mst_edge_del_result
+CALL IncMST(50, 0.001, 'mst_edge_del_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_006.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_006.sql
new file mode 100644
index 0000000..877d92a
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_006.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm connected components test
+ * Test disconnected graph scenarios
+ */
+CREATE TABLE inc_mst_conn_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH disconnected_graph;
+
+INSERT INTO inc_mst_conn_result
+CALL IncMST(40, 0.001, 'mst_conn_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_007.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_007.sql
new file mode 100644
index 0000000..98e49b7
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_007.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test
+ * Test performance on large graphs
+ */
+CREATE TABLE inc_mst_perf_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH performance_graph;
+
+INSERT INTO inc_mst_perf_result
+CALL IncMST(80, 0.001, 'mst_perf_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_008.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_008.sql
new file mode 100644
index 0000000..d50598d
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_008.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm convergence test
+ * Test convergence behavior
+ */
+CREATE TABLE inc_mst_conv_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO inc_mst_conv_result
+CALL IncMST(60, 0.0001, 'mst_conv_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_009.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_009.sql
new file mode 100644
index 0000000..ae95c14
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_009.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm custom parameters test
+ * Test custom parameter configurations
+ */
+CREATE TABLE inc_mst_custom_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO inc_mst_custom_result
+CALL IncMST(20, 0.005, 'mst_custom_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_010.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_010.sql
new file mode 100644
index 0000000..6cca743
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_010.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm complex topology test
+ * Test complex graph topologies
+ */
+CREATE TABLE inc_mst_complex_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH complex_graph;
+
+INSERT INTO inc_mst_complex_result
+CALL IncMST(70, 0.001, 'mst_complex_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_001.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_001.sql
new file mode 100644
index 0000000..690bdab
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_001.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test
+ * Test small graph performance
+ */
+CREATE TABLE inc_mst_perf_small_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO inc_mst_perf_small_result
+CALL IncMST(25, 0.001, 'mst_perf_small_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_002.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_002.sql
new file mode 100644
index 0000000..5511ba4
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_002.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test - medium graph
+ * Test performance on medium-sized graphs
+ */
+CREATE TABLE inc_mst_perf_medium_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH medium_graph;
+
+INSERT INTO inc_mst_perf_medium_result
+CALL IncMST(50, 0.001, 'mst_perf_medium_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_003.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_003.sql
new file mode 100644
index 0000000..d8ebce8
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_003.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test - large graph
+ * Test performance on large graphs
+ */
+CREATE TABLE inc_mst_perf_large_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH large_graph;
+
+INSERT INTO inc_mst_perf_large_result
+CALL IncMST(100, 0.001, 'mst_perf_large_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_004.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_004.sql
new file mode 100644
index 0000000..383a9b8
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_004.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test - incremental update
+ * Test incremental update performance
+ */
+CREATE TABLE inc_mst_perf_incr_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH dynamic_graph;
+
+INSERT INTO inc_mst_perf_incr_result
+CALL IncMST(60, 0.001, 'mst_perf_incr_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_005.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_005.sql
new file mode 100644
index 0000000..7ae6b8d
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_005.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test - convergence
+ * Test convergence performance
+ */
+CREATE TABLE inc_mst_perf_conv_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH modern;
+
+INSERT INTO inc_mst_perf_conv_result
+CALL IncMST(40, 0.0001, 'mst_perf_conv_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_006.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_006.sql
new file mode 100644
index 0000000..fe5e4f1
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_006.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test - memory efficiency
+ * Test memory efficiency on large graphs
+ */
+CREATE TABLE inc_mst_perf_mem_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH large_graph;
+
+INSERT INTO inc_mst_perf_mem_result
+CALL IncMST(80, 0.001, 'mst_perf_mem_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_007.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_007.sql
new file mode 100644
index 0000000..eee00ea
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_inc_mst_perf_007.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Incremental Minimum Spanning Tree algorithm performance test - scalability
+ * Test scalability on very large graphs
+ */
+CREATE TABLE inc_mst_perf_scale_result (
+  srcId int,
+  targetId int,
+  weight double
+) WITH (
+    type='file',
+    geaflow.dsl.file.path = '${target}'
+);
+
+USE GRAPH scalability_graph;
+
+INSERT INTO inc_mst_perf_scale_result
+CALL IncMST(120, 0.001, 'mst_perf_scale_edges') YIELD (srcId, targetId, weight)
+RETURN srcId, targetId, weight;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/kafka_scan_002.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/kafka_scan_002.sql
index 443f5af..d0e5922 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/kafka_scan_002.sql
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/kafka_scan_002.sql
@@ -27,7 +27,7 @@
 	geaflow.dsl.kafka.topic = 'scan_002',
 	geaflow.dsl.kafka.data.operation.timeout.seconds = 5,
 	geaflow.dsl.time.window.size=10,
-	geaflow.dsl.start.time='${startTime}'
+	geaflow.dsl.start.time='${stTime}'
 );
 
 CREATE TABLE tbl_result (
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/large_graph.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/large_graph.sql
new file mode 100644
index 0000000..0a25cf8
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/large_graph.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v_large_node (
+  name varchar,
+  id bigint
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/large_vertex.txt'
+);
+
+CREATE TABLE e_large_edge (
+  srcId bigint,
+  targetId bigint,
+  weight double
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/large_edge.txt'
+);
+
+CREATE GRAPH large_graph (
+	Vertex node using v_large_node WITH ID(id),
+	Edge connects using e_large_edge WITH ID(srcId, targetId)
+) WITH (
+	storeType='memory',
+	shardCount = 4
+);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/medium_graph.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/medium_graph.sql
new file mode 100644
index 0000000..533d3fb
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/medium_graph.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v_medium_node (
+  name varchar,
+  id bigint
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/medium_vertex.txt'
+);
+
+CREATE TABLE e_medium_edge (
+  srcId bigint,
+  targetId bigint,
+  weight double
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/medium_edge.txt'
+);
+
+CREATE GRAPH medium_graph (
+	Vertex node using v_medium_node WITH ID(id),
+	Edge connects using e_medium_edge WITH ID(srcId, targetId)
+) WITH (
+	storeType='memory',
+	shardCount = 2
+);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/performance_graph.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/performance_graph.sql
new file mode 100644
index 0000000..0d0680b
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/performance_graph.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v_performance_node (
+  name varchar,
+  id bigint
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/performance_vertex.txt'
+);
+
+CREATE TABLE e_performance_edge (
+  srcId bigint,
+  targetId bigint,
+  weight double
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/performance_edge.txt'
+);
+
+CREATE GRAPH performance_graph (
+	Vertex node using v_performance_node WITH ID(id),
+	Edge connects using e_performance_edge WITH ID(srcId, targetId)
+) WITH (
+	storeType='memory',
+	shardCount = 4
+);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/scalability_graph.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/scalability_graph.sql
new file mode 100644
index 0000000..d4dc177
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/scalability_graph.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v_scalability_node (
+  name varchar,
+  id bigint
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/scalability_vertex.txt'
+);
+
+CREATE TABLE e_scalability_edge (
+  srcId bigint,
+  targetId bigint,
+  weight double
+) WITH (
+	type='file',
+	geaflow.dsl.window.size = -1,
+	geaflow.dsl.file.path = 'resource:///data/scalability_edge.txt'
+);
+
+CREATE GRAPH scalability_graph (
+	Vertex node using v_scalability_node WITH ID(id),
+	Edge connects using e_scalability_edge WITH ID(srcId, targetId)
+) WITH (
+	storeType='memory',
+	shardCount = 8
+);
diff --git a/tools/checkstyle.xml b/tools/checkstyle.xml
index 26d9130..062c931 100644
--- a/tools/checkstyle.xml
+++ b/tools/checkstyle.xml
@@ -209,7 +209,7 @@
         <module name="OperatorWrap">
             <property name="option" value="NL"/>
             <property name="tokens"
-                      value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR "/>
+                      value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR"/>
         </module>
         <module name="AnnotationLocation">
             <property name="tokens"