Add optional pending dag runs check to auto refresh (#56014)
* Add optional pending dag runs check to auto refresh
* Readd hasActiveRun check for structure
diff --git a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
index 6ceb54a..c0ec70f 100644
--- a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
+++ b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
@@ -27,16 +27,15 @@
export const NeedsReviewButton = ({
dagId,
- refreshInterval,
runId,
taskId,
}: {
readonly dagId?: string;
- readonly refreshInterval?: number | false;
readonly runId?: string;
readonly taskId?: string;
}) => {
- const hookAutoRefresh = useAutoRefresh({ dagId });
+ const refetchInterval = useAutoRefresh({ checkPendingRuns: true, dagId });
+
const { data: hitlStatsData, isLoading } = useTaskInstanceServiceGetHitlDetails(
{
dagId: dagId ?? "~",
@@ -47,7 +46,7 @@
},
undefined,
{
- refetchInterval: refreshInterval ?? hookAutoRefresh,
+ refetchInterval,
},
);
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
index efc3759..5817d75 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
@@ -52,7 +52,6 @@
const gridRef = useRef<HTMLDivElement>(null);
const [selectedIsVisible, setSelectedIsVisible] = useState<boolean | undefined>();
- const [hasActiveRun, setHasActiveRun] = useState<boolean | undefined>();
const { openGroupIds, toggleGroupId } = useOpenGroups();
const { dagId = "", runId = "" } = useParams();
@@ -70,25 +69,15 @@
}
}, [runId, gridRuns, selectedIsVisible, setSelectedIsVisible]);
- useEffect(() => {
- if (gridRuns) {
- const run = gridRuns.some((dr: GridRunsResponse) => isStatePending(dr.state));
-
- if (!run) {
- setHasActiveRun(false);
- }
- }
- }, [gridRuns, setHasActiveRun]);
-
const { data: dagStructure } = useGridStructure({
dagRunState,
- hasActiveRun,
+ hasActiveRun: gridRuns?.some((dr) => isStatePending(dr.state)),
limit,
runType,
triggeringUser,
});
- // calculate dag run bar heights relative to max
+ // calculate dag run bar heights relative to max
const max = Math.max.apply(
undefined,
gridRuns === undefined
diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
index ffcc37b..cd34789 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
@@ -92,7 +92,7 @@
);
const { tabs: processedTabs } = useRequiredActionTabs({ dagId }, tabs, {
- refetchInterval: isStatePending(latestRun?.state) ? refetchInterval : false,
+ refetchInterval,
});
const displayTabs = processedTabs.filter((tab) => {
diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx
index 8de8220..8cebcc4 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx
@@ -35,7 +35,6 @@
import { TrendCountButton } from "src/components/TrendCountButton";
import { SearchParamsKeys } from "src/constants/searchParams";
import { useGridRuns } from "src/queries/useGridRuns.ts";
-import { isStatePending, useAutoRefresh } from "src/utils";
const FailedLogs = lazy(() => import("./FailedLogs"));
@@ -76,14 +75,9 @@
timestampLte: endDate,
});
- const refetchInterval = useAutoRefresh({});
-
return (
<Box m={4} spaceY={4}>
- <NeedsReviewButton
- dagId={dagId}
- refreshInterval={gridRuns?.some((dr) => isStatePending(dr.state)) ? refetchInterval : false}
- />
+ <NeedsReviewButton dagId={dagId} />
<Box my={2}>
<TimeRangeSelector
defaultValue={defaultHour}
diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx
index b9646d4..bda8b7d 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx
@@ -43,7 +43,7 @@
const { t: translate } = useTranslation(["common", "dag"]);
const [latestRun] = dag.latest_dag_runs;
- const refetchInterval = useAutoRefresh({ isPaused: dag.is_paused });
+ const refetchInterval = useAutoRefresh({});
return (
<Box borderColor="border.emphasized" borderRadius={8} borderWidth={1} overflow="hidden">
@@ -95,7 +95,9 @@
startDate={latestRun.start_date}
state={latestRun.state}
/>
- {isStatePending(latestRun.state) && Boolean(refetchInterval) ? <Spinner /> : undefined}
+ {isStatePending(latestRun.state) && !dag.is_paused && Boolean(refetchInterval) ? (
+ <Spinner />
+ ) : undefined}
</RouterLink>
</Link>
) : undefined}
diff --git a/airflow-core/src/airflow/ui/src/pages/Dashboard/Health/Health.tsx b/airflow-core/src/airflow/ui/src/pages/Dashboard/Health/Health.tsx
index cb8d87f..054ceed 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dashboard/Health/Health.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dashboard/Health/Health.tsx
@@ -27,7 +27,7 @@
import { HealthBadge } from "./HealthBadge";
export const Health = () => {
- const refetchInterval = useAutoRefresh({});
+ const refetchInterval = useAutoRefresh({ checkPendingRuns: true });
const { data, error, isLoading } = useMonitorServiceGetHealth(undefined, {
refetchInterval,
diff --git a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
index 2ccae75..560102c 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
@@ -41,7 +41,7 @@
const [endDate, setEndDate] = useState(now.toISOString());
const [assetSortBy, setAssetSortBy] = useState("-timestamp");
- const refetchInterval = useAutoRefresh({});
+ const refetchInterval = useAutoRefresh({ checkPendingRuns: true });
const { data, error, isLoading } = useDashboardServiceHistoricalMetrics(
{
diff --git a/airflow-core/src/airflow/ui/src/pages/Dashboard/PoolSummary/PoolSummary.tsx b/airflow-core/src/airflow/ui/src/pages/Dashboard/PoolSummary/PoolSummary.tsx
index 3ce4e40..936e9bb 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dashboard/PoolSummary/PoolSummary.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dashboard/PoolSummary/PoolSummary.tsx
@@ -29,7 +29,7 @@
export const PoolSummary = () => {
const { t: translate } = useTranslation("dashboard");
- const refetchInterval = useAutoRefresh({});
+ const refetchInterval = useAutoRefresh({ checkPendingRuns: true });
const { data, isLoading } = usePoolServiceGetPools(undefined, undefined, {
refetchInterval,
});
diff --git a/airflow-core/src/airflow/ui/src/pages/Dashboard/Stats/Stats.tsx b/airflow-core/src/airflow/ui/src/pages/Dashboard/Stats/Stats.tsx
index 5cfc7b2..e1027f0 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dashboard/Stats/Stats.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dashboard/Stats/Stats.tsx
@@ -29,7 +29,7 @@
import { PluginImportErrors } from "./PluginImportErrors";
export const Stats = () => {
- const refetchInterval = useAutoRefresh({});
+ const refetchInterval = useAutoRefresh({ checkPendingRuns: true });
const { data: statsData, isLoading: isStatsLoading } = useDashboardServiceDagStats(undefined, {
refetchInterval,
});
diff --git a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx
index 6eb82b7..d003174 100644
--- a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx
@@ -54,7 +54,7 @@
taskId: Boolean(groupId) ? undefined : taskId,
});
- const { data: taskInstances, isLoading: isLoadingTaskInstances } = useTaskInstanceServiceGetTaskInstances(
+ const { data: tiData, isLoading: isLoadingTaskInstances } = useTaskInstanceServiceGetTaskInstances(
{
dagId,
dagRunId: "~",
@@ -72,12 +72,7 @@
return (
<Box m={4} spaceY={4}>
- <NeedsReviewButton
- refreshInterval={
- taskInstances?.task_instances.some((ti) => isStatePending(ti.state)) ? refetchInterval : false
- }
- taskId={taskId}
- />
+ <NeedsReviewButton taskId={taskId} />
<Box my={2}>
<TimeRangeSelector
defaultValue={defaultHour}
@@ -111,7 +106,7 @@
{isLoadingTaskInstances ? (
<Skeleton height="200px" w="full" />
) : (
- <DurationChart entries={taskInstances?.task_instances.slice().reverse()} kind="Task Instance" />
+ <DurationChart entries={tiData?.task_instances.slice().reverse()} kind="Task Instance" />
)}
</Box>
</SimpleGrid>
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts b/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts
index f5772ae..35508b8 100644
--- a/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts
@@ -35,7 +35,7 @@
}) => {
const { dagId = "" } = useParams();
- const defaultRefetchInterval = useAutoRefresh({ dagId });
+ const refetchInterval = useAutoRefresh({ dagId });
const { data: GridRuns, ...rest } = useGridServiceGetGridRuns(
{
@@ -50,7 +50,7 @@
{
placeholderData: (prev) => prev,
refetchInterval: (query) =>
- query.state.data?.some((run) => isStatePending(run.state)) && defaultRefetchInterval,
+ query.state.data?.some((run) => isStatePending(run.state)) && refetchInterval,
},
);
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts b/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts
index c7a2580..ce33475 100644
--- a/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts
@@ -24,7 +24,7 @@
export const useGridStructure = ({
dagRunState,
- hasActiveRun = undefined,
+ hasActiveRun,
limit,
runType,
triggeringUser,
@@ -50,7 +50,6 @@
},
undefined,
{
- placeholderData: (prev) => prev,
refetchInterval: hasActiveRun ? refetchInterval : false,
},
);
diff --git a/airflow-core/src/airflow/ui/src/utils/query.ts b/airflow-core/src/airflow/ui/src/utils/query.ts
index 5becaa0..10ca41d 100644
--- a/airflow-core/src/airflow/ui/src/utils/query.ts
+++ b/airflow-core/src/airflow/ui/src/utils/query.ts
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { useDagServiceGetDagDetails } from "openapi/queries";
+import { useDagRunServiceGetDagRuns, useDagServiceGetDagDetails } from "openapi/queries";
import type { TaskInstanceState } from "openapi/requests/types.gen";
import { useConfig } from "src/queries/useConfig";
@@ -30,7 +30,14 @@
state === "restarting" ||
!Boolean(state);
-export const useAutoRefresh = ({ dagId, isPaused }: { dagId?: string; isPaused?: boolean }) => {
+// checkPendingRuns=false assumes that the component is already handling pending, setting to true will have useAutoRefresh handle it
+export const useAutoRefresh = ({
+ checkPendingRuns,
+ dagId,
+}: {
+ checkPendingRuns?: boolean;
+ dagId?: string;
+}) => {
const autoRefreshInterval = useConfig("auto_refresh_interval") as number | undefined;
const { data: dag } = useDagServiceGetDagDetails(
{
@@ -40,9 +47,27 @@
{ enabled: dagId !== undefined },
);
- const paused = isPaused ?? dag?.is_paused;
+ const { data: dagRunData } = useDagRunServiceGetDagRuns(
+ {
+ dagId: dagId ?? "~",
+ state: ["running", "queued"],
+ },
+ undefined,
+ // Scale back refetching to 10x longer if there are no pending runs (eg: every 3 secs for active runs, otherwise 30 secs)
+ {
+ enabled: checkPendingRuns,
+ refetchInterval: (query) =>
+ autoRefreshInterval !== undefined &&
+ ((query.state.data?.dag_runs ?? []).length > 0
+ ? autoRefreshInterval * 1000
+ : autoRefreshInterval * 10 * 1000),
+ },
+ );
- const canRefresh = autoRefreshInterval !== undefined && !paused;
+ const pendingRuns = checkPendingRuns ? (dagRunData?.dag_runs ?? []).length > 1 : true;
+ const paused = Boolean(dagId) ? dag?.is_paused : false;
+
+ const canRefresh = autoRefreshInterval !== undefined && !paused && pendingRuns;
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
return (canRefresh ? autoRefreshInterval * 1000 : false) as number | false;