Explicitly shutdown logging in tasks so concurrent.futures can be used (#13057)
This fixes three problems:
1. That remote logs weren't being uploaded due to the fork change
2. That the S3 hook attempted to fetch credentials from the DB, but the
ORM had already been disposed.
3. That even if forking was disabled, that S3 logs would fail due to use
of concurrent.futures. See https://bugs.python.org/issue33097
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 7fc00b9..61e8abf 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -89,7 +89,7 @@
class Arg:
"""Class to keep information about command line argument"""
- # pylint: disable=redefined-builtin,unused-argument
+ # pylint: disable=redefined-builtin,unused-argument,too-many-arguments
def __init__(
self,
flags=_UNSET,
@@ -101,6 +101,7 @@
choices=_UNSET,
required=_UNSET,
metavar=_UNSET,
+ dest=_UNSET,
):
self.flags = flags
self.kwargs = {}
@@ -112,7 +113,7 @@
self.kwargs[k] = v
- # pylint: enable=redefined-builtin,unused-argument
+ # pylint: enable=redefined-builtin,unused-argument,too-many-arguments
def add_to_parser(self, parser: argparse.ArgumentParser):
"""Add this argument to an ArgumentParser"""
@@ -308,6 +309,16 @@
# list_tasks
ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
+# tasks_run
+# This is a hidden option -- not meant for users to set or know about
+ARG_SHUT_DOWN_LOGGING = Arg(
+ ("--no-shut-down-logging",),
+ help=argparse.SUPPRESS,
+ dest="shut_down_logging",
+ action="store_false",
+ default=True,
+)
+
# clear
ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
@@ -943,6 +954,7 @@
ARG_PICKLE,
ARG_JOB_ID,
ARG_INTERACTIVE,
+ ARG_SHUT_DOWN_LOGGING,
),
),
ActionCommand(
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 356f60c..b153d20 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -116,7 +116,12 @@
ignore_ti_state=args.force,
pool=args.pool,
)
- run_job.run()
+ try:
+ run_job.run()
+
+ finally:
+ if args.shut_down_logging:
+ logging.shutdown()
RAW_TASK_UNSUPPORTED_OPTION = [
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index f4f3412..ad5c76e 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -106,6 +106,7 @@
parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command_to_exec[1:])
+ args.shut_down_logging = False
setproctitle(f"airflow task supervisor: {command_to_exec}")
@@ -116,6 +117,7 @@
ret = 1
finally:
Sentry.flush()
+ logging.shutdown()
os._exit(ret) # pylint: disable=protected-access
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 4057b14..7fcba8a 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -22,6 +22,7 @@
For more information on how the LocalExecutor works, take a look at the guide:
:ref:`executor:LocalExecutor`
"""
+import logging
import os
import subprocess
from abc import abstractmethod
@@ -115,6 +116,7 @@
parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command[1:])
+ args.shut_down_logging = False
setproctitle(f"airflow task supervisor: {command}")
@@ -125,6 +127,7 @@
self.log.error("Failed to execute task %s.", str(e))
finally:
Sentry.flush()
+ logging.shutdown()
os._exit(ret) # pylint: disable=protected-access
raise RuntimeError('unreachable -- keep mypy happy')
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 61ea43a..fee9b0d 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Standard task runner"""
+import logging
import os
import psutil
@@ -87,6 +88,7 @@
finally:
# Explicitly flush any pending exception to Sentry if enabled
Sentry.flush()
+ logging.shutdown()
os._exit(return_code) # pylint: disable=protected-access
def return_code(self, timeout=0):