Simplify dataset subgraph logic (#27987)
* fix merging connected dataset graphs
* refactor graph calculation
diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts b/airflow/www/static/js/api/useDatasetDependencies.ts
index bd61ee9..a4167c6 100644
--- a/airflow/www/static/js/api/useDatasetDependencies.ts
+++ b/airflow/www/static/js/api/useDatasetDependencies.ts
@@ -24,13 +24,16 @@
import { getMetaValue } from 'src/utils';
import type { DepEdge, DepNode } from 'src/types';
import type { NodeType } from 'src/datasets/Graph/Node';
-import { unionBy } from 'lodash';
interface DatasetDependencies {
edges: DepEdge[];
nodes: DepNode[];
}
+interface EdgeGroup {
+ edges: DepEdge[];
+}
+
interface GenerateProps {
nodes: DepNode[];
edges: DepEdge[];
@@ -82,111 +85,92 @@
edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: [e.source], targets: [e.target] })),
});
-interface SeparateGraphsProps extends DatasetDependencies {
- graphs: DatasetDependencies[];
+interface SeparateGraphsProps {
+ edges: DepEdge[];
+ graphs: EdgeGroup[];
}
-const graphIndicesToMerge: Record<number, number[]> = {};
-const indicesToRemove: number[] = [];
-
// find the downstream graph of each upstream edge
const findDownstreamGraph = (
- { edges, nodes, graphs = [] }: SeparateGraphsProps,
-): DatasetDependencies[] => {
- const newGraphs = [...graphs];
- let filteredEdges = [...edges];
+ { edges, graphs = [] }: SeparateGraphsProps,
+): EdgeGroup[] => {
+ let unassignedEdges = [...edges];
- graphs.forEach((g, i) => {
- // find downstream edges
- const downstreamEdges = edges.filter((e) => g.edges.some((ge) => ge.target === e.source));
- const downstreamNodes: DepNode[] = [];
-
- downstreamEdges.forEach((e) => {
- const newNode = nodes.find((n) => n.id === e.target);
- if (newNode) {
- downstreamNodes.push(newNode);
-
- // check if the node already exists in a different graph
- const existingGraphIndex = newGraphs
- .findIndex(((ng) => ng.nodes.some((n) => n.id === newNode.id)));
-
- // mark if the graph needs to merge with another
- if (existingGraphIndex > -1) {
- indicesToRemove.push(existingGraphIndex);
- graphIndicesToMerge[i] = [...(graphIndicesToMerge[i] || []), existingGraphIndex];
+ const mergedGraphs = graphs
+ .reduce(
+ (newGraphs, graph) => {
+ const otherGroupIndex = newGraphs.findIndex(
+ (otherGroup) => otherGroup.edges.some(
+ (otherEdge) => graph.edges.some(
+ (edge) => edge.target === otherEdge.target,
+ ),
+ ),
+ );
+ if (otherGroupIndex === -1) {
+ return [...newGraphs, graph];
}
- // add node and edge to the graph
- newGraphs[i] = {
- nodes: [...newGraphs[i].nodes, newNode],
- edges: [...newGraphs[i].edges, e],
- };
-
- // remove edge from edge list
- filteredEdges = filteredEdges
- .filter((fe) => !(fe.source === e.source && fe.target === e.target));
- }
- });
- });
-
- // once there are no more filtered edges left, merge relevant graphs
- // we merge afterwards to make sure we captured all nodes + edges
- if (!filteredEdges.length) {
- Object.keys(graphIndicesToMerge).forEach((key) => {
- const realKey = key as unknown as number;
- const values = graphIndicesToMerge[realKey];
- values.forEach((v) => {
- newGraphs[realKey] = {
- nodes: unionBy(newGraphs[realKey].nodes, newGraphs[v].nodes, 'id'),
- edges: [...newGraphs[realKey].edges, ...newGraphs[v].edges]
- .filter((e, i, s) => (
- i === s.findIndex((t) => t.source === e.source && t.target === e.target)
- )),
- };
+ const mergedEdges = [...newGraphs[otherGroupIndex].edges, ...graph.edges]
+ .filter((edge, edgeIndex, otherEdges) => (
+ edgeIndex === otherEdges.findIndex(
+ (otherEdge) => otherEdge.source === edge.source && otherEdge.target === edge.target,
+ )
+ ));
+ return [
+ ...newGraphs.filter((_, newGraphIndex) => newGraphIndex !== otherGroupIndex),
+ { edges: mergedEdges },
+ ];
+ },
+ [] as EdgeGroup[],
+ )
+ .map((graph) => {
+ // find the next set of downstream edges and filter them out of the unassigned edges list
+ const downstreamEdges: DepEdge[] = [];
+ unassignedEdges = unassignedEdges.filter((edge) => {
+ const isDownstream = graph.edges.some((graphEdge) => graphEdge.target === edge.source);
+ if (isDownstream) downstreamEdges.push(edge);
+ return !isDownstream;
});
- });
- return newGraphs.filter((g, i) => !indicesToRemove.some((j) => i === j));
- }
- return findDownstreamGraph({ edges: filteredEdges, nodes, graphs: newGraphs });
+ return {
+ edges: [...graph.edges, ...downstreamEdges],
+ };
+ });
+
+ // recursively find downstream edges until there are no unassigned edges
+ return unassignedEdges.length
+ ? findDownstreamGraph({ edges: unassignedEdges, graphs: mergedGraphs })
+ : mergedGraphs;
};
// separate the list of nodes/edges into distinct dataset pipeline graphs
const separateGraphs = ({ edges, nodes }: DatasetDependencies): DatasetDependencies[] => {
- const separatedGraphs: DatasetDependencies[] = [];
- let remainingEdges = [...edges];
- let remainingNodes = [...nodes];
+ const separatedGraphs: EdgeGroup[] = [];
+ const remainingEdges: DepEdge[] = [];
- edges.forEach((edge) => {
- const isDownstream = edges.some((e) => e.target === edge.source);
-
- // if the edge is not downstream of anything, then start building the graph
- if (!isDownstream) {
- const connectedNodes = nodes.filter((n) => n.id === edge.source || n.id === edge.target);
-
- // check if one of the nodes is already connected to a separated graph
- const nodesInUse = separatedGraphs
- .findIndex((g) => g.nodes.some((n) => connectedNodes.some((nn) => nn.id === n.id)));
-
- if (nodesInUse > -1) {
- // if one of the nodes is already in use, merge the graphs
- const { nodes: existingNodes, edges: existingEdges } = separatedGraphs[nodesInUse];
- separatedGraphs[nodesInUse] = { nodes: unionBy(existingNodes, connectedNodes, 'id'), edges: [...existingEdges, edge] };
- } else {
- // else just add the new separated graph
- separatedGraphs.push({ nodes: connectedNodes, edges: [edge] });
- }
-
- // filter out used nodes and edges
- remainingEdges = remainingEdges.filter((e) => e.source !== edge.source);
- remainingNodes = remainingNodes.filter((n) => !connectedNodes.some((nn) => nn.id === n.id));
+ edges.forEach((e) => {
+ // add a separate graph for each edge without an upstream
+ if (!edges.some((ee) => e.source === ee.target)) {
+ separatedGraphs.push({ edges: [e] });
+ } else {
+ remainingEdges.push(e);
}
});
- if (remainingEdges.length) {
- return findDownstreamGraph({ edges: remainingEdges, nodes, graphs: separatedGraphs });
- }
- return separatedGraphs;
+ const edgeGraphs = findDownstreamGraph({ edges: remainingEdges, graphs: separatedGraphs });
+
+ // once all the edges are found, add the nodes
+ return edgeGraphs.map((eg) => {
+ const graphNodes = nodes.filter(
+ (n) => eg.edges.some(
+ (e) => e.target === n.id || e.source === n.id,
+ ),
+ );
+ return ({
+ edges: eg.edges,
+ nodes: graphNodes,
+ });
+ });
};
const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
@@ -202,6 +186,7 @@
elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font }))
)));
const fullGraph = await elk.layout(generateGraph({ nodes, edges, font }));
+
return {
fullGraph,
subGraphs,