Explicitly shutdown logging in tasks so concurrent.futures can be used (#13057)

This fixes three problems:

1. That remote logs weren't being uploaded due to the fork change
2. That the S3 hook attempted to fetch credentials from the DB, but the
   ORM had already been disposed.
3. That even if forking was disabled, that S3 logs would fail due to use
   of concurrent.futures. See https://bugs.python.org/issue33097
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 7fc00b9..61e8abf 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -89,7 +89,7 @@
 class Arg:
     """Class to keep information about command line argument"""
 
-    # pylint: disable=redefined-builtin,unused-argument
+    # pylint: disable=redefined-builtin,unused-argument,too-many-arguments
     def __init__(
         self,
         flags=_UNSET,
@@ -101,6 +101,7 @@
         choices=_UNSET,
         required=_UNSET,
         metavar=_UNSET,
+        dest=_UNSET,
     ):
         self.flags = flags
         self.kwargs = {}
@@ -112,7 +113,7 @@
 
             self.kwargs[k] = v
 
-    # pylint: enable=redefined-builtin,unused-argument
+    # pylint: enable=redefined-builtin,unused-argument,too-many-arguments
 
     def add_to_parser(self, parser: argparse.ArgumentParser):
         """Add this argument to an ArgumentParser"""
@@ -308,6 +309,16 @@
 # list_tasks
 ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
 
+# tasks_run
+# This is a hidden option -- not meant for users to set or know about
+ARG_SHUT_DOWN_LOGGING = Arg(
+    ("--no-shut-down-logging",),
+    help=argparse.SUPPRESS,
+    dest="shut_down_logging",
+    action="store_false",
+    default=True,
+)
+
 # clear
 ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
 ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
@@ -943,6 +954,7 @@
             ARG_PICKLE,
             ARG_JOB_ID,
             ARG_INTERACTIVE,
+            ARG_SHUT_DOWN_LOGGING,
         ),
     ),
     ActionCommand(
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 356f60c..b153d20 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -116,7 +116,12 @@
         ignore_ti_state=args.force,
         pool=args.pool,
     )
-    run_job.run()
+    try:
+        run_job.run()
+
+    finally:
+        if args.shut_down_logging:
+            logging.shutdown()
 
 
 RAW_TASK_UNSUPPORTED_OPTION = [
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index f4f3412..ad5c76e 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -106,6 +106,7 @@
         parser = get_parser()
         # [1:] - remove "airflow" from the start of the command
         args = parser.parse_args(command_to_exec[1:])
+        args.shut_down_logging = False
 
         setproctitle(f"airflow task supervisor: {command_to_exec}")
 
@@ -116,6 +117,7 @@
         ret = 1
     finally:
         Sentry.flush()
+        logging.shutdown()
         os._exit(ret)  # pylint: disable=protected-access
 
 
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 4057b14..7fcba8a 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -22,6 +22,7 @@
     For more information on how the LocalExecutor works, take a look at the guide:
     :ref:`executor:LocalExecutor`
 """
+import logging
 import os
 import subprocess
 from abc import abstractmethod
@@ -115,6 +116,7 @@
             parser = get_parser()
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(command[1:])
+            args.shut_down_logging = False
 
             setproctitle(f"airflow task supervisor: {command}")
 
@@ -125,6 +127,7 @@
             self.log.error("Failed to execute task %s.", str(e))
         finally:
             Sentry.flush()
+            logging.shutdown()
             os._exit(ret)  # pylint: disable=protected-access
             raise RuntimeError('unreachable -- keep mypy happy')
 
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 61ea43a..fee9b0d 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -16,6 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """Standard task runner"""
+import logging
 import os
 
 import psutil
@@ -87,6 +88,7 @@
             finally:
                 # Explicitly flush any pending exception to Sentry if enabled
                 Sentry.flush()
+                logging.shutdown()
                 os._exit(return_code)  # pylint: disable=protected-access
 
     def return_code(self, timeout=0):