Simplify dataset subgraph logic (#27987)

* fix merging connected dataset graphs

* refactor graph calculation
diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts b/airflow/www/static/js/api/useDatasetDependencies.ts
index bd61ee9..a4167c6 100644
--- a/airflow/www/static/js/api/useDatasetDependencies.ts
+++ b/airflow/www/static/js/api/useDatasetDependencies.ts
@@ -24,13 +24,16 @@
 import { getMetaValue } from 'src/utils';
 import type { DepEdge, DepNode } from 'src/types';
 import type { NodeType } from 'src/datasets/Graph/Node';
-import { unionBy } from 'lodash';
 
 interface DatasetDependencies {
   edges: DepEdge[];
   nodes: DepNode[];
 }
 
+interface EdgeGroup {
+  edges: DepEdge[];
+}
+
 interface GenerateProps {
   nodes: DepNode[];
   edges: DepEdge[];
@@ -82,111 +85,92 @@
   edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: [e.source], targets: [e.target] })),
 });
 
-interface SeparateGraphsProps extends DatasetDependencies {
-  graphs: DatasetDependencies[];
+interface SeparateGraphsProps {
+  edges: DepEdge[];
+  graphs: EdgeGroup[];
 }
 
-const graphIndicesToMerge: Record<number, number[]> = {};
-const indicesToRemove: number[] = [];
-
 // find the downstream graph of each upstream edge
 const findDownstreamGraph = (
-  { edges, nodes, graphs = [] }: SeparateGraphsProps,
-): DatasetDependencies[] => {
-  const newGraphs = [...graphs];
-  let filteredEdges = [...edges];
+  { edges, graphs = [] }: SeparateGraphsProps,
+): EdgeGroup[] => {
+  let unassignedEdges = [...edges];
 
-  graphs.forEach((g, i) => {
-    // find downstream edges
-    const downstreamEdges = edges.filter((e) => g.edges.some((ge) => ge.target === e.source));
-    const downstreamNodes: DepNode[] = [];
-
-    downstreamEdges.forEach((e) => {
-      const newNode = nodes.find((n) => n.id === e.target);
-      if (newNode) {
-        downstreamNodes.push(newNode);
-
-        // check if the node already exists in a different graph
-        const existingGraphIndex = newGraphs
-          .findIndex(((ng) => ng.nodes.some((n) => n.id === newNode.id)));
-
-        // mark if the graph needs to merge with another
-        if (existingGraphIndex > -1) {
-          indicesToRemove.push(existingGraphIndex);
-          graphIndicesToMerge[i] = [...(graphIndicesToMerge[i] || []), existingGraphIndex];
+  const mergedGraphs = graphs
+    .reduce(
+      (newGraphs, graph) => {
+        const otherGroupIndex = newGraphs.findIndex(
+          (otherGroup) => otherGroup.edges.some(
+            (otherEdge) => graph.edges.some(
+              (edge) => edge.target === otherEdge.target,
+            ),
+          ),
+        );
+        if (otherGroupIndex === -1) {
+          return [...newGraphs, graph];
         }
 
-        // add node and edge to the graph
-        newGraphs[i] = {
-          nodes: [...newGraphs[i].nodes, newNode],
-          edges: [...newGraphs[i].edges, e],
-        };
-
-        // remove edge from edge list
-        filteredEdges = filteredEdges
-          .filter((fe) => !(fe.source === e.source && fe.target === e.target));
-      }
-    });
-  });
-
-  // once there are no more filtered edges left, merge relevant graphs
-  // we merge afterwards to make sure we captured all nodes + edges
-  if (!filteredEdges.length) {
-    Object.keys(graphIndicesToMerge).forEach((key) => {
-      const realKey = key as unknown as number;
-      const values = graphIndicesToMerge[realKey];
-      values.forEach((v) => {
-        newGraphs[realKey] = {
-          nodes: unionBy(newGraphs[realKey].nodes, newGraphs[v].nodes, 'id'),
-          edges: [...newGraphs[realKey].edges, ...newGraphs[v].edges]
-            .filter((e, i, s) => (
-              i === s.findIndex((t) => t.source === e.source && t.target === e.target)
-            )),
-        };
+        const mergedEdges = [...newGraphs[otherGroupIndex].edges, ...graph.edges]
+          .filter((edge, edgeIndex, otherEdges) => (
+            edgeIndex === otherEdges.findIndex(
+              (otherEdge) => otherEdge.source === edge.source && otherEdge.target === edge.target,
+            )
+          ));
+        return [
+          ...newGraphs.filter((_, newGraphIndex) => newGraphIndex !== otherGroupIndex),
+          { edges: mergedEdges },
+        ];
+      },
+      [] as EdgeGroup[],
+    )
+    .map((graph) => {
+      // find the next set of downstream edges and filter them out of the unassigned edges list
+      const downstreamEdges: DepEdge[] = [];
+      unassignedEdges = unassignedEdges.filter((edge) => {
+        const isDownstream = graph.edges.some((graphEdge) => graphEdge.target === edge.source);
+        if (isDownstream) downstreamEdges.push(edge);
+        return !isDownstream;
       });
-    });
-    return newGraphs.filter((g, i) => !indicesToRemove.some((j) => i === j));
-  }
 
-  return findDownstreamGraph({ edges: filteredEdges, nodes, graphs: newGraphs });
+      return {
+        edges: [...graph.edges, ...downstreamEdges],
+      };
+    });
+
+  // recursively find downstream edges until there are no unassigned edges
+  return unassignedEdges.length
+    ? findDownstreamGraph({ edges: unassignedEdges, graphs: mergedGraphs })
+    : mergedGraphs;
 };
 
 // separate the list of nodes/edges into distinct dataset pipeline graphs
 const separateGraphs = ({ edges, nodes }: DatasetDependencies): DatasetDependencies[] => {
-  const separatedGraphs: DatasetDependencies[] = [];
-  let remainingEdges = [...edges];
-  let remainingNodes = [...nodes];
+  const separatedGraphs: EdgeGroup[] = [];
+  const remainingEdges: DepEdge[] = [];
 
-  edges.forEach((edge) => {
-    const isDownstream = edges.some((e) => e.target === edge.source);
-
-    // if the edge is not downstream of anything, then start building the graph
-    if (!isDownstream) {
-      const connectedNodes = nodes.filter((n) => n.id === edge.source || n.id === edge.target);
-
-      // check if one of the nodes is already connected to a separated graph
-      const nodesInUse = separatedGraphs
-        .findIndex((g) => g.nodes.some((n) => connectedNodes.some((nn) => nn.id === n.id)));
-
-      if (nodesInUse > -1) {
-        // if one of the nodes is already in use, merge the graphs
-        const { nodes: existingNodes, edges: existingEdges } = separatedGraphs[nodesInUse];
-        separatedGraphs[nodesInUse] = { nodes: unionBy(existingNodes, connectedNodes, 'id'), edges: [...existingEdges, edge] };
-      } else {
-        // else just add the new separated graph
-        separatedGraphs.push({ nodes: connectedNodes, edges: [edge] });
-      }
-
-      // filter out used nodes and edges
-      remainingEdges = remainingEdges.filter((e) => e.source !== edge.source);
-      remainingNodes = remainingNodes.filter((n) => !connectedNodes.some((nn) => nn.id === n.id));
+  edges.forEach((e) => {
+    // add a separate graph for each edge without an upstream
+    if (!edges.some((ee) => e.source === ee.target)) {
+      separatedGraphs.push({ edges: [e] });
+    } else {
+      remainingEdges.push(e);
     }
   });
 
-  if (remainingEdges.length) {
-    return findDownstreamGraph({ edges: remainingEdges, nodes, graphs: separatedGraphs });
-  }
-  return separatedGraphs;
+  const edgeGraphs = findDownstreamGraph({ edges: remainingEdges, graphs: separatedGraphs });
+
+  // once all the edges are found, add the nodes
+  return edgeGraphs.map((eg) => {
+    const graphNodes = nodes.filter(
+      (n) => eg.edges.some(
+        (e) => e.target === n.id || e.source === n.id,
+      ),
+    );
+    return ({
+      edges: eg.edges,
+      nodes: graphNodes,
+    });
+  });
 };
 
 const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
@@ -202,6 +186,7 @@
     elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font }))
   )));
   const fullGraph = await elk.layout(generateGraph({ nodes, edges, font }));
+
   return {
     fullGraph,
     subGraphs,