[SPARK-48126][CORE] Make `spark.log.structuredLogging.enabled` effective

### What changes were proposed in this pull request?

Currently, the spark conf `spark.log.structuredLogging.enabled` is not taking effect. The current code base checks this config in the method `prepareSubmitEnvironment`. However, Log4j is already initialized before that.

This PR is to fix it by checking the config `spark.log.structuredLogging.enabled` before the initialization of Log4j.
Also, this PR enhances the doc for this configuration.

### Why are the changes needed?

Bug fix. After the fix, the Spark conf `spark.log.structuredLogging.enabled` takes effect.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manual test.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: GPT-4
I used GPT-4 to improve the documents.

Closes #46452 from gengliangwang/makeConfEffective.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 076aa83..5a7e554 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -69,10 +69,20 @@
 
   def doSubmit(args: Array[String]): Unit = {
     val appArgs = parseArguments(args)
+    val sparkConf = appArgs.toSparkConf()
+
     // For interpreters, structured logging is disabled by default to avoid generating mixed
     // plain text and structured logs on the same console.
     if (isShell(appArgs.primaryResource) || isSqlShell(appArgs.mainClass)) {
       Logging.disableStructuredLogging()
+    } else {
+      // For non-shell applications, enable structured logging if it's not explicitly disabled
+      // via the configuration `spark.log.structuredLogging.enabled`.
+      if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = true)) {
+        Logging.enableStructuredLogging()
+      } else {
+        Logging.disableStructuredLogging()
+      }
     }
     // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
     // be reset before the application starts.
@@ -82,9 +92,9 @@
       logInfo(appArgs.toString)
     }
     appArgs.action match {
-      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
-      case SparkSubmitAction.KILL => kill(appArgs)
-      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
+      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog, sparkConf)
+      case SparkSubmitAction.KILL => kill(appArgs, sparkConf)
+      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs, sparkConf)
       case SparkSubmitAction.PRINT_VERSION => printVersion()
     }
   }
@@ -96,12 +106,11 @@
   /**
    * Kill an existing submission.
    */
-  private def kill(args: SparkSubmitArguments): Unit = {
+  private def kill(args: SparkSubmitArguments, sparkConf: SparkConf): Unit = {
     if (RestSubmissionClient.supportsRestClient(args.master)) {
       new RestSubmissionClient(args.master)
         .killSubmission(args.submissionToKill)
     } else {
-      val sparkConf = args.toSparkConf()
       sparkConf.set("spark.master", args.master)
       SparkSubmitUtils
         .getSubmitOperations(args.master)
@@ -112,12 +121,11 @@
   /**
    * Request the status of an existing submission.
    */
-  private def requestStatus(args: SparkSubmitArguments): Unit = {
+  private def requestStatus(args: SparkSubmitArguments, sparkConf: SparkConf): Unit = {
     if (RestSubmissionClient.supportsRestClient(args.master)) {
       new RestSubmissionClient(args.master)
         .requestSubmissionStatus(args.submissionToRequestStatusFor)
     } else {
-      val sparkConf = args.toSparkConf()
       sparkConf.set("spark.master", args.master)
       SparkSubmitUtils
         .getSubmitOperations(args.master)
@@ -148,7 +156,7 @@
    * in a doAs when --proxy-user is specified.
    */
   @tailrec
-  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
+  private def submit(args: SparkSubmitArguments, uninitLog: Boolean, sparkConf: SparkConf): Unit = {
 
     def doRunMain(): Unit = {
       if (args.proxyUser != null) {
@@ -157,7 +165,7 @@
         // is done in client mode.
         val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
           "client".equals(args.deployMode) &&
-          args.toSparkConf().getBoolean("spark.kubernetes.submitInDriver", false)
+          sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
         if (isKubernetesClusterModeDriver) {
           logInfo("Running driver with proxy user. Cluster manager: Kubernetes")
           SparkHadoopUtil.get.runAsSparkUser(() => runMain(args, uninitLog))
@@ -204,7 +212,7 @@
           logWarning(log"Master endpoint ${MDC(LogKeys.MASTER_URL, args.master)} " +
             log"was not a REST server. Falling back to legacy submission gateway instead.")
           args.useRest = false
-          submit(args, false)
+          submit(args, false, sparkConf)
       }
     // In all other modes, just run the main class as prepared
     } else {
@@ -234,11 +242,6 @@
     val childClasspath = new ArrayBuffer[String]()
     val sparkConf = args.toSparkConf()
     if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote")
-    if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = true)) {
-      Logging.enableStructuredLogging()
-    } else {
-      Logging.disableStructuredLogging()
-    }
     var childMainClass = ""
 
     // Set the cluster manager
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2e20742..a5be608 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -143,10 +143,11 @@
 
   private[spark] val STRUCTURED_LOGGING_ENABLED =
     ConfigBuilder("spark.log.structuredLogging.enabled")
-      .doc("When true, the default log4j output format is structured JSON lines, and there will " +
-        "be Mapped Diagnostic Context (MDC) from Spark added to the logs. This is useful for log " +
-        "aggregation and analysis tools. When false, the default log4j output will be plain " +
-        "text and no MDC from Spark will be set.")
+      .doc("When true, Spark logs are output as structured JSON lines with added Spark " +
+        "Mapped Diagnostic Context (MDC), facilitating easier integration with log aggregation " +
+        "and analysis tools. When false, logs are plain text without MDC. This configuration " +
+        "does not apply to interactive environments such as spark-shell, spark-sql, and " +
+        "PySpark shell.")
       .version("4.0.0")
       .booleanConf
       .createWithDefault(true)
diff --git a/docs/configuration.md b/docs/configuration.md
index fb14af6..c018b9f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3693,7 +3693,11 @@
 ```
 
 ## Plain Text Logging
-If you prefer plain text logging, you can use the `log4j2.properties.pattern-layout-template` file as a starting point. This is the default configuration used by Spark before the 4.0.0 release. This configuration uses the [PatternLayout](https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternLayout) to log all the logs in plain text. MDC information is not included by default. In order to print it in the logs, you can update the patternLayout in the file. For example, you can add `%X{task_name}` to print the task name in the logs.
+If you prefer plain text logging, you have two options:
+- Disable structured JSON logging by setting the Spark configuration `spark.log.structuredLogging.enabled` to `false`.
+- Use a custom log4j configuration file. Rename `conf/log4j2.properties.pattern-layout-template` to `conf/log4j2.properties`. This reverts to the default configuration prior to Spark 4.0, which utilizes [PatternLayout](https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternLayout) for logging all messages in plain text.
+
+MDC information is not included by default when with plain text logging. In order to print it in the logs, you can update the patternLayout in the file. For example, you can add `%X{task_name}` to print the task name in the logs.
 Moreover, you can use `spark.sparkContext.setLocalProperty(s"mdc.$name", "value")` to add user specific data into MDC.
 The key in MDC will be the string of `mdc.$name`.
 
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 5979006..95c7929 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -42,7 +42,9 @@
 
 - Since Spark 4.0, Spark uses the external shuffle service for deleting shuffle blocks for deallocated executors when the shuffle is no longer needed. To restore the legacy behavior, you can set `spark.shuffle.service.removeShuffle` to `false`.
 
-- Since Spark 4.0, the default log4j output of `spark-submit` has shifted from plain text to JSON lines to enhance analyzability. To revert to plain text output, you can rename the file `conf/log4j2.properties.pattern-layout-template` as `conf/log4j2.properties`, or use a custom log4j configuration file.
+- Starting with Spark 4.0, the default logging format for `spark-submit` has changed from plain text to JSON lines to improve log analysis. If you prefer plain text logs, you have two options:
+  - Set the Spark configuration `spark.log.structuredLogging.enabled` to `false`. 
+  - Use a custom log4j configuration file, such as renaming the template file `conf/log4j2.properties.pattern-layout-template` to `conf/log4j2.properties`.
 
 - Since Spark 4.0, Spark performs speculative executions less agressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`.