Move prohibited warnings detection in deferrable pytest plugin (#39411)

diff --git a/pyproject.toml b/pyproject.toml
index 54c6f2a..fadfa13 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -529,6 +529,14 @@
     'ignore:jsonschema\.exceptions\.RefResolutionError:DeprecationWarning:connexion.json_schema',
     'ignore:Accessing jsonschema\.draft4_format_checker:DeprecationWarning:connexion.decorators.validation',
 ]
+# We cannot add warnings from the airflow package into `filterwarnings`,
+# because it invokes import airflow before we set up test environment which breaks the tests.
+# Instead of that, we use a separate parameter and dynamically add it into `filterwarnings` marker.
+forbidden_warnings = [
+    "airflow.exceptions.RemovedInAirflow3Warning",
+    "airflow.utils.context.AirflowContextDeprecationWarning",
+    "airflow.exceptions.AirflowProviderDeprecationWarning",
+]
 python_files = [
     "test_*.py",
     "example_*.py",
diff --git a/tests/_internals/forbidden_warnings.py b/tests/_internals/forbidden_warnings.py
new file mode 100644
index 0000000..c78e4b0
--- /dev/null
+++ b/tests/_internals/forbidden_warnings.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from pathlib import Path
+
+import pytest
+import yaml
+
+TESTS_DIR = Path(__file__).parents[1].resolve()
+
+
+class ForbiddenWarningsPlugin:
+    """Internal plugin for restricting warnings during the tests run."""
+
+    node_key: str = "forbidden_warnings_node"
+    deprecations_ignore: Path = (TESTS_DIR / "deprecations_ignore.yml").resolve(strict=True)
+
+    def __init__(self, config: pytest.Config, forbidden_warnings: tuple[str, ...]):
+        excluded_cases = {
+            # Skip: Integration and System Tests
+            "tests/integration/",
+            "tests/system/",
+            # Skip: DAGs for tests
+            "tests/dags/",
+            "tests/dags_corrupted/",
+            "tests/dags_with_system_exit/",
+        }
+        with self.deprecations_ignore.open() as fp:
+            excluded_cases.update(yaml.safe_load(fp))
+
+        self.config = config
+        self.forbidden_warnings = forbidden_warnings
+        self.is_worker_node = hasattr(config, "workerinput")
+        self.detected_cases: set[str] = set()
+        self.excluded_cases: tuple[str, ...] = tuple(sorted(excluded_cases))
+
+    @staticmethod
+    def prune_params_node_id(node_id: str) -> str:
+        """Remove parametrized parts from node id."""
+        return node_id.partition("[")[0]
+
+    def pytest_itemcollected(self, item: pytest.Item):
+        if item.nodeid.startswith(self.excluded_cases):
+            return
+        for fw in self.forbidden_warnings:
+            # Add marker at the beginning of the markers list. In this case, it does not conflict with
+            # filterwarnings markers, which are set explicitly in the test suite.
+            item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False)
+
+    @pytest.hookimpl(hookwrapper=True, trylast=True)
+    def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int):
+        """Save set of test node ids in the session finish on xdist worker node"""
+        yield
+        if self.is_worker_node and self.detected_cases and hasattr(self.config, "workeroutput"):
+            self.config.workeroutput[self.node_key] = frozenset(self.detected_cases)
+
+    @pytest.hookimpl(optionalhook=True)
+    def pytest_testnodedown(self, node, error):
+        """Get a set of test node ids from the xdist worker node."""
+        if not (workeroutput := getattr(node, "workeroutput", {})):
+            return
+
+        node_detected_cases: tuple[tuple[str, int]] = workeroutput.get(self.node_key)
+        if not node_detected_cases:
+            return
+
+        self.detected_cases |= node_detected_cases
+
+    def pytest_exception_interact(self, node: pytest.Item, call: pytest.CallInfo, report: pytest.TestReport):
+        if not call.excinfo or call.when not in ["setup", "call", "teardown"]:
+            # Skip analyze exception if there is no exception exists
+            # or exception happens outside of tests or fixtures
+            return
+
+        exc = call.excinfo.type
+        exception_qualname = exc.__name__
+        if (exception_module := exc.__module__) != "builtins":
+            exception_qualname = f"{exception_module}.{exception_qualname}"
+        if exception_qualname in self.forbidden_warnings:
+            self.detected_cases.add(node.nodeid)
+
+    @pytest.hookimpl(hookwrapper=True, tryfirst=True)
+    def pytest_terminal_summary(self, terminalreporter, exitstatus: int, config: pytest.Config):
+        yield
+        if not self.detected_cases or self.is_worker_node:  # No need to print report on worker node
+            return
+
+        total_cases = len(self.detected_cases)
+        uniq_tests_cases = len(set(map(self.prune_params_node_id, self.detected_cases)))
+        terminalreporter.section(f"{total_cases:,} prohibited warning(s) detected", red=True, bold=True)
+
+        report_message = "By default selected warnings are prohibited during tests runs:\n * "
+        report_message += "\n * ".join(self.forbidden_warnings)
+        report_message += "\n\n"
+        report_message += (
+            "Please make sure that you follow Airflow Unit test developer guidelines:\n"
+            "https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#handling-warnings"
+        )
+        if total_cases <= 20:
+            # Print tests node ids only if there is a small amount of it,
+            # otherwise it could turn into a mess in the terminal
+            report_message += "\n\nWarnings detected in test case(s):\n - "
+            report_message += "\n - ".join(sorted(self.detected_cases))
+        if uniq_tests_cases >= 15:
+            # If there are a lot of unique tests where this happens,
+            # we might suggest adding it into the exclusion list
+            report_message += (
+                "\n\nIf this is significant change in code base you might add tests cases ids into the "
+                f"{self.deprecations_ignore} file, please make sure that you also create "
+                "follow up Issue/Task in https://github.com/apache/airflow/issues"
+            )
+        terminalreporter.write_line(report_message.rstrip(), red=True)
+        terminalreporter.write_line(
+            "You could disable this check by provide the `--disable-forbidden-warnings` flag, "
+            "however this check always turned on in the Airflow's CI.",
+            white=True,
+        )
diff --git a/tests/conftest.py b/tests/conftest.py
index e0aab55..efff93f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import functools
 import json
 import os
 import platform
@@ -30,11 +29,11 @@
 
 import pytest
 import time_machine
-import yaml
 from itsdangerous import URLSafeSerializer
 
 if TYPE_CHECKING:
     from tests._internals.capture_warnings import CaptureWarningsPlugin  # noqa: F401
+    from tests._internals.forbidden_warnings import ForbiddenWarningsPlugin  # noqa: F401
 
 # We should set these before loading _any_ of the rest of airflow so that the
 # unit test mode config is set as early as possible.
@@ -121,6 +120,7 @@
 
 # https://docs.pytest.org/en/stable/reference/reference.html#stash
 capture_warnings_key = pytest.StashKey["CaptureWarningsPlugin"]()
+forbidden_warnings_key = pytest.StashKey["ForbiddenWarningsPlugin"]()
 
 
 @pytest.fixture
@@ -213,7 +213,7 @@
         yield
 
 
-def pytest_addoption(parser):
+def pytest_addoption(parser: pytest.Parser):
     """Add options parser for custom plugins."""
     group = parser.getgroup("airflow")
     group.addoption(
@@ -298,6 +298,12 @@
         help="Disable DB clear before each test module.",
     )
     group.addoption(
+        "--disable-forbidden-warnings",
+        action="store_true",
+        dest="disable_forbidden_warnings",
+        help="Disable raising an error if forbidden warnings detected.",
+    )
+    group.addoption(
         "--disable-capture-warnings",
         action="store_true",
         dest="disable_capture_warnings",
@@ -314,6 +320,11 @@
             "then 'warnings.txt' will be used."
         ),
     )
+    parser.addini(
+        name="forbidden_warnings",
+        type="linelist",
+        help="List of internal Airflow warnings which are prohibited during tests execution.",
+    )
 
 
 def initial_db_init():
@@ -417,25 +428,43 @@
 
     os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
 
-    # Setup capture warnings
+    # Setup internal warnings plugins
     if "ignore" in sys.warnoptions:
+        config.option.disable_forbidden_warnings = True
         config.option.disable_capture_warnings = True
+    if not config.pluginmanager.get_plugin("warnings"):
+        # Internal forbidden warnings plugin depends on builtin pytest warnings plugin
+        config.option.disable_forbidden_warnings = True
+
+    forbidden_warnings: list[str] | None = config.getini("forbidden_warnings")
+    if not config.option.disable_forbidden_warnings and forbidden_warnings:
+        from tests._internals.forbidden_warnings import ForbiddenWarningsPlugin
+
+        forbidden_warnings_plugin = ForbiddenWarningsPlugin(
+            config=config,
+            forbidden_warnings=tuple(map(str.strip, forbidden_warnings)),
+        )
+        config.pluginmanager.register(forbidden_warnings_plugin)
+        config.stash[forbidden_warnings_key] = forbidden_warnings_plugin
+
     if not config.option.disable_capture_warnings:
         from tests._internals.capture_warnings import CaptureWarningsPlugin
 
-        plugin = CaptureWarningsPlugin(
+        capture_warnings_plugin = CaptureWarningsPlugin(
             config=config, output_path=config.getoption("warning_output_path", default=None)
         )
-        config.pluginmanager.register(plugin)
-        config.stash[capture_warnings_key] = plugin
+        config.pluginmanager.register(capture_warnings_plugin)
+        config.stash[capture_warnings_key] = capture_warnings_plugin
 
 
 def pytest_unconfigure(config: pytest.Config) -> None:
     os.environ.pop("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK", None)
-    capture_warnings = config.stash.get(capture_warnings_key, None)
-    if capture_warnings:
+    if forbidden_warnings_plugin := config.stash.get(forbidden_warnings_key, None):
+        del config.stash[forbidden_warnings_key]
+        config.pluginmanager.unregister(forbidden_warnings_plugin)
+    if capture_warnings_plugin := config.stash.get(capture_warnings_key, None):
         del config.stash[capture_warnings_key]
-        config.pluginmanager.unregister(capture_warnings)
+        config.pluginmanager.unregister(capture_warnings_plugin)
 
 
 def skip_if_not_marked_with_integration(selected_integrations, item):
@@ -614,34 +643,6 @@
             pytest.skip(f"The test requires credential file {credential_path}: {item}")
 
 
-@functools.lru_cache(maxsize=None)
-def deprecations_ignore() -> tuple[str, ...]:
-    with open(Path(__file__).resolve().parent / "deprecations_ignore.yml") as fp:
-        return tuple(yaml.safe_load(fp))
-
-
-def setup_error_warnings(item: pytest.Item):
-    if item.nodeid.startswith(deprecations_ignore()):
-        return
-
-    # We cannot add everything related to the airflow package it into `filterwarnings`
-    # in the pyproject.toml sections, because it invokes airflow import before we setup test environment.
-    # Instead of that, we are dynamically adding as `filterwarnings` marker.
-    prohibited_warnings = (
-        "airflow.exceptions.RemovedInAirflow3Warning",
-        "airflow.utils.context.AirflowContextDeprecationWarning",
-        "airflow.exceptions.AirflowProviderDeprecationWarning",
-    )
-    for w in prohibited_warnings:
-        # Add marker at the beginning of the markers list. In this case, it does not conflict with
-        # filterwarnings markers, which are set explicitly in the test suite.
-        item.add_marker(pytest.mark.filterwarnings(f"error::{w}"), append=False)
-
-
-def pytest_itemcollected(item: pytest.Item):
-    setup_error_warnings(item)
-
-
 def pytest_runtest_setup(item):
     selected_integrations_list = item.config.option.integration
     selected_systems_list = item.config.option.system
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 5956bab..1a8c822 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -16,15 +16,6 @@
 # under the License.
 ---
 
-# Skip: Integration and System Tests
-- tests/integration/
-- tests/system/
-# Skip: DAGs for tests
-- tests/dags/
-- tests/dags_corrupted/
-- tests/dags_with_system_exit/
-
-
 # API
 - tests/api_connexion/endpoints/test_connection_endpoint.py::TestGetConnection::test_should_respond_200
 - tests/api_connexion/endpoints/test_connection_endpoint.py::TestPatchConnection::test_patch_should_respond_200