[SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured logging framework

The PR aims to migrate `logInfo` in Core module with variables to structured logging framework.

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46362 from zeotuan/coreInfo.

Lead-authored-by: Tuan Pham <Tuan.Pham@wisetechglobal.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index c127f9c..14e822c 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -26,13 +26,15 @@
 }
 
 /**
- * Various keys used for mapped diagnostic contexts(MDC) in logging.
- * All structured logging keys should be defined here for standardization.
+ * Various keys used for mapped diagnostic contexts(MDC) in logging. All structured logging keys
+ * should be defined here for standardization.
  */
 object LogKeys {
   case object ACCUMULATOR_ID extends LogKey
+  case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
   case object ACTUAL_NUM_FILES extends LogKey
   case object ACTUAL_PARTITION_COLUMN extends LogKey
+  case object ADDED_JARS extends LogKey
   case object AGGREGATE_FUNCTIONS extends LogKey
   case object ALPHA extends LogKey
   case object ANALYSIS_ERROR extends LogKey
@@ -42,7 +44,10 @@
   case object APP_NAME extends LogKey
   case object APP_STATE extends LogKey
   case object ARGS extends LogKey
+  case object AUTH_ENABLED extends LogKey
   case object BACKUP_FILE extends LogKey
+  case object BARRIER_EPOCH extends LogKey
+  case object BARRIER_ID extends LogKey
   case object BATCH_ID extends LogKey
   case object BATCH_NAME extends LogKey
   case object BATCH_TIMESTAMP extends LogKey
@@ -55,6 +60,7 @@
   case object BOOT extends LogKey
   case object BROADCAST extends LogKey
   case object BROADCAST_ID extends LogKey
+  case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
   case object BUCKET extends LogKey
   case object BYTECODE_SIZE extends LogKey
   case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
@@ -62,6 +68,7 @@
   case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
   case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
   case object CALL_SITE_LONG_FORM extends LogKey
+  case object CALL_SITE_SHORT_FORM extends LogKey
   case object CATALOG_NAME extends LogKey
   case object CATEGORICAL_FEATURES extends LogKey
   case object CHECKPOINT_FILE extends LogKey
@@ -142,11 +149,13 @@
   case object DEPRECATED_KEY extends LogKey
   case object DESCRIPTION extends LogKey
   case object DESIRED_NUM_PARTITIONS extends LogKey
+  case object DESTINATION_PATH extends LogKey
   case object DFS_FILE extends LogKey
   case object DIFF_DELTA extends LogKey
   case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
   case object DRIVER_ID extends LogKey
   case object DRIVER_LIBRARY_PATH_KEY extends LogKey
+  case object DRIVER_STATE extends LogKey
   case object DROPPED_PARTITIONS extends LogKey
   case object DURATION extends LogKey
   case object EARLIEST_LOADED_VERSION extends LogKey
@@ -161,6 +170,7 @@
   case object ERROR extends LogKey
   case object ESTIMATOR_PARAMETER_MAP extends LogKey
   case object EVALUATED_FILTERS extends LogKey
+  case object EVENT extends LogKey
   case object EVENT_LOG_DESTINATION extends LogKey
   case object EVENT_LOOP extends LogKey
   case object EVENT_NAME extends LogKey
@@ -184,8 +194,10 @@
   case object EXECUTOR_TIMEOUT extends LogKey
   case object EXEC_AMOUNT extends LogKey
   case object EXISTING_FILE extends LogKey
+  case object EXISTING_JARS extends LogKey
   case object EXISTING_PATH extends LogKey
   case object EXIT_CODE extends LogKey
+  case object EXPECTED_ANSWER extends LogKey
   case object EXPECTED_NUM_FILES extends LogKey
   case object EXPECTED_PARTITION_COLUMN extends LogKey
   case object EXPIRY_TIMESTAMP extends LogKey
@@ -211,6 +223,7 @@
   case object FILTER extends LogKey
   case object FILTERS extends LogKey
   case object FINAL_CONTEXT extends LogKey
+  case object FINAL_OUTPUT_PATH extends LogKey
   case object FINAL_PATH extends LogKey
   case object FINISH_TRIGGER_DURATION extends LogKey
   case object FROM_OFFSET extends LogKey
@@ -252,6 +265,7 @@
   case object IS_NETWORK_REQUEST_DONE extends LogKey
   case object JAR_ENTRY extends LogKey
   case object JAR_MESSAGE extends LogKey
+  case object JAVA_VERSION extends LogKey
   case object JOB_ID extends LogKey
   case object JOIN_CONDITION extends LogKey
   case object JOIN_CONDITION_SUB_EXPR extends LogKey
@@ -280,6 +294,7 @@
   case object LOGICAL_PLAN_COLUMNS extends LogKey
   case object LOGICAL_PLAN_LEAVES extends LogKey
   case object LOG_ID extends LogKey
+  case object LOG_KEY_FILE extends LogKey
   case object LOG_OFFSET extends LogKey
   case object LOG_TYPE extends LogKey
   case object LOWER_BOUND extends LogKey
@@ -326,13 +341,16 @@
   case object NEW_FEATURE_COLUMN_NAME extends LogKey
   case object NEW_LABEL_COLUMN_NAME extends LogKey
   case object NEW_PATH extends LogKey
+  case object NEW_RDD_ID extends LogKey
   case object NEW_STATE extends LogKey
   case object NEW_VALUE extends LogKey
   case object NODES extends LogKey
   case object NODE_LOCATION extends LogKey
   case object NON_BUILT_IN_CONNECTORS extends LogKey
   case object NORM extends LogKey
+  case object NUM_ADDED_MASTERS extends LogKey
   case object NUM_ADDED_PARTITIONS extends LogKey
+  case object NUM_ADDED_WORKERS extends LogKey
   case object NUM_BIN extends LogKey
   case object NUM_BYTES extends LogKey
   case object NUM_BYTES_CURRENT extends LogKey
@@ -373,6 +391,7 @@
   case object NUM_PRUNED extends LogKey
   case object NUM_REPLICAS extends LogKey
   case object NUM_REQUESTS extends LogKey
+  case object NUM_REQUEST_SYNC_TASK extends LogKey
   case object NUM_RESOURCE_SLOTS extends LogKey
   case object NUM_RETRIES extends LogKey
   case object NUM_RIGHT_PARTITION_VALUES extends LogKey
@@ -397,7 +416,12 @@
   case object OPTIONS extends LogKey
   case object OP_ID extends LogKey
   case object OP_TYPE extends LogKey
+  case object OS_ARCH extends LogKey
+  case object OS_NAME extends LogKey
+  case object OS_VERSION extends LogKey
   case object OUTPUT extends LogKey
+  case object OUTPUT_LINE extends LogKey
+  case object OUTPUT_LINE_NUMBER extends LogKey
   case object OVERHEAD_MEMORY_SIZE extends LogKey
   case object PAGE_SIZE extends LogKey
   case object PARSE_MODE extends LogKey
@@ -438,6 +462,9 @@
   case object PUSHED_FILTERS extends LogKey
   case object PVC_METADATA_NAME extends LogKey
   case object PYTHON_EXEC extends LogKey
+  case object PYTHON_VERSION extends LogKey
+  case object PYTHON_WORKER_MODULE extends LogKey
+  case object PYTHON_WORKER_RESPONSE extends LogKey
   case object QUERY_CACHE_VALUE extends LogKey
   case object QUERY_HINT extends LogKey
   case object QUERY_ID extends LogKey
@@ -447,6 +474,8 @@
   case object QUERY_PLAN_LENGTH_MAX extends LogKey
   case object QUERY_RUN_ID extends LogKey
   case object RANGE extends LogKey
+  case object RDD_CHECKPOINT_DIR extends LogKey
+  case object RDD_DEBUG_STRING extends LogKey
   case object RDD_DESCRIPTION extends LogKey
   case object RDD_ID extends LogKey
   case object READ_LIMIT extends LogKey
@@ -466,6 +495,7 @@
   case object REMOTE_ADDRESS extends LogKey
   case object REMOVE_FROM_MASTER extends LogKey
   case object REPORT_DETAILS extends LogKey
+  case object REQUESTER_SIZE extends LogKey
   case object RESOURCE extends LogKey
   case object RESOURCE_NAME extends LogKey
   case object RESOURCE_PROFILE_ID extends LogKey
@@ -489,13 +519,18 @@
   case object SCHEDULER_POOL_NAME extends LogKey
   case object SCHEMA extends LogKey
   case object SCHEMA2 extends LogKey
+  case object SERIALIZE_OUTPUT_LENGTH extends LogKey
+  case object SERVER_NAME extends LogKey
   case object SERVICE_NAME extends LogKey
+  case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
   case object SESSION_HOLD_INFO extends LogKey
   case object SESSION_ID extends LogKey
   case object SESSION_KEY extends LogKey
   case object SHARD_ID extends LogKey
   case object SHELL_COMMAND extends LogKey
   case object SHUFFLE_BLOCK_INFO extends LogKey
+  case object SHUFFLE_DB_BACKEND_KEY extends LogKey
+  case object SHUFFLE_DB_BACKEND_NAME extends LogKey
   case object SHUFFLE_ID extends LogKey
   case object SHUFFLE_MERGE_ID extends LogKey
   case object SHUFFLE_SERVICE_NAME extends LogKey
@@ -506,8 +541,10 @@
   case object SMALLEST_CLUSTER_INDEX extends LogKey
   case object SNAPSHOT_VERSION extends LogKey
   case object SOCKET_ADDRESS extends LogKey
+  case object SOURCE_PATH extends LogKey
   case object SPARK_DATA_STREAM extends LogKey
   case object SPARK_PLAN_ID extends LogKey
+  case object SPARK_VERSION extends LogKey
   case object SPILL_TIMES extends LogKey
   case object SQL_TEXT extends LogKey
   case object SRC_PATH extends LogKey
@@ -520,6 +557,7 @@
   case object STATE_STORE_VERSION extends LogKey
   case object STATUS extends LogKey
   case object STDERR extends LogKey
+  case object STOP_SITE_SHORT_FORM extends LogKey
   case object STORAGE_LEVEL extends LogKey
   case object STORAGE_LEVEL_DESERIALIZED extends LogKey
   case object STORAGE_LEVEL_REPLICATION extends LogKey
@@ -535,11 +573,14 @@
   case object STREAMING_WRITE extends LogKey
   case object STREAM_ID extends LogKey
   case object STREAM_NAME extends LogKey
+  case object STREAM_SOURCE extends LogKey
   case object SUBMISSION_ID extends LogKey
   case object SUBSAMPLING_RATE extends LogKey
   case object SUB_QUERY extends LogKey
   case object TABLE_NAME extends LogKey
   case object TABLE_TYPES extends LogKey
+  case object TARGET_NUM_EXECUTOR extends LogKey
+  case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
   case object TARGET_PATH extends LogKey
   case object TASK_ATTEMPT_ID extends LogKey
   case object TASK_ID extends LogKey
@@ -548,6 +589,7 @@
   case object TASK_SET_NAME extends LogKey
   case object TASK_STATE extends LogKey
   case object TEMP_FILE extends LogKey
+  case object TEMP_OUTPUT_PATH extends LogKey
   case object TEMP_PATH extends LogKey
   case object TEST_SIZE extends LogKey
   case object THREAD extends LogKey
@@ -557,6 +599,7 @@
   case object TIME extends LogKey
   case object TIMEOUT extends LogKey
   case object TIMER extends LogKey
+  case object TIMESTAMP extends LogKey
   case object TIME_UNITS extends LogKey
   case object TIP extends LogKey
   case object TOKEN_KIND extends LogKey
@@ -598,6 +641,7 @@
   case object WAIT_SEND_TIME extends LogKey
   case object WAIT_TIME extends LogKey
   case object WATERMARK_CONSTRAINT extends LogKey
+  case object WEB_URL extends LogKey
   case object WEIGHT extends LogKey
   case object WEIGHTED_NUM extends LogKey
   case object WORKER extends LogKey
diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
index 9422421..adce6c3 100644
--- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
@@ -23,7 +23,8 @@
 
 import scala.collection.mutable.{ArrayBuffer, HashSet}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys._
 import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted}
 import org.apache.spark.util.ThreadUtils
@@ -161,7 +162,8 @@
         s"${request.numTasks} from Task $taskId, previously it was $numTasks.")
 
       // Check whether the epoch from the barrier tasks matches current barrierEpoch.
-      logInfo(s"Current barrier epoch for $barrierId is $barrierEpoch.")
+      logInfo(log"Current barrier epoch for ${MDC(BARRIER_ID, barrierId)}" +
+        log" is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
       if (epoch != barrierEpoch) {
         requester.sendFailure(new SparkException(s"The request to sync of $barrierId with " +
           s"barrier epoch $barrierEpoch has already finished. Maybe task $taskId is not " +
@@ -176,14 +178,17 @@
         // Add the requester to array of RPCCallContexts pending for reply.
         requesters += requester
         messages(request.partitionId) = request.message
-        logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received update from Task " +
-          s"$taskId, current progress: ${requesters.size}/$numTasks.")
+        logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
+          log" from ${MDC(BARRIER_ID, barrierId)} received update from Task" +
+          log" ${MDC(TASK_ID, taskId)}, current progress:" +
+          log" ${MDC(REQUESTER_SIZE, requesters.size)}/${MDC(NUM_REQUEST_SYNC_TASK, numTasks)}.")
         if (requesters.size == numTasks) {
           requesters.foreach(_.reply(messages.clone()))
           // Finished current barrier() call successfully, clean up ContextBarrierState and
           // increase the barrier epoch.
-          logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received all updates from " +
-            s"tasks, finished successfully.")
+          logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
+            log" from ${MDC(BARRIER_ID, barrierId)} received all updates from" +
+            log" tasks, finished successfully.")
           barrierEpoch += 1
           requesters.clear()
           requestMethods.clear()
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index e083ece..c8d6000 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -26,7 +26,8 @@
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKeys._
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.resource.ResourceInformation
@@ -56,19 +57,27 @@
   // with the driver side epoch.
   private var barrierEpoch = 0
 
+  private def logProgressInfo(msg: MessageWithContext, startTime: Option[Long]): Unit = {
+    val waitMsg = startTime.fold(log"")(st => log", waited " +
+      log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,")
+    logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" +
+      log" from Stage ${MDC(STAGE_ID, stageId())}" +
+      log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " +
+      msg + waitMsg +
+      log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
+  }
+
   private def runBarrier(message: String, requestMethod: RequestMethod.Value): Array[String] = {
-    logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
-      s"has entered the global sync, current barrier epoch is $barrierEpoch.")
+    logProgressInfo(log"has entered the global sync", None)
     logTrace("Current callSite: " + Utils.getCallSite())
 
     val startTime = System.currentTimeMillis()
     val timerTask = new TimerTask {
       override def run(): Unit = {
-        logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
-          s"${stageAttemptNumber()}) waiting " +
-          s"under the global sync since $startTime, has been waiting for " +
-          s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
-          s"current barrier epoch is $barrierEpoch.")
+        logProgressInfo(
+          log"waiting under the global sync since ${MDC(TIME, startTime)}",
+          Some(startTime)
+        )
       }
     }
     // Log the update of global sync every 1 minute.
@@ -104,17 +113,11 @@
       val messages = abortableRpcFuture.future.value.get.get
 
       barrierEpoch += 1
-      logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
-        s"finished global sync successfully, waited for " +
-        s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
-        s"current barrier epoch is $barrierEpoch.")
+      logProgressInfo(log"finished global sync successfully", Some(startTime))
       messages
     } catch {
       case e: SparkException =>
-        logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
-          s"${stageAttemptNumber()}) failed to perform global sync, waited for " +
-          s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
-          s"current barrier epoch is $barrierEpoch.")
+        logProgressInfo(log"failed to perform global sync", Some(startTime))
         throw e
     } finally {
       timerTask.cancel()
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index d156d74..3bfa1ae 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -446,10 +446,12 @@
           val delta = targetNum.delta
           totalDelta += delta
           if (delta > 0) {
-            val executorsString = "executor" + { if (delta > 1) "s" else "" }
-            logInfo(s"Requesting $delta new $executorsString because tasks are backlogged " +
-              s"(new desired total will be ${numExecutorsTargetPerResourceProfileId(rpId)} " +
-              s"for resource profile id: ${rpId})")
+            val executorsString = log" new executor" + { if (delta > 1) log"s" else log"" }
+            logInfo(log"Requesting ${MDC(TARGET_NUM_EXECUTOR_DELTA, delta)}" +
+              executorsString + log" because tasks are backlogged " +
+              log"(new desired total will be" +
+              log" ${MDC(TARGET_NUM_EXECUTOR, numExecutorsTargetPerResourceProfileId(rpId))} " +
+              log"for resource profile id: ${MDC(RESOURCE_PROFILE_ID, rpId)})")
             numExecutorsToAddPerResourceProfileId(rpId) =
               if (delta == numExecutorsToAddPerResourceProfileId(rpId)) {
                 numExecutorsToAddPerResourceProfileId(rpId) * 2
@@ -604,7 +606,8 @@
       } else {
         executorMonitor.executorsKilled(executorsRemoved.toSeq)
       }
-      logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
+      logInfo(log"Executors ${MDC(EXECUTOR_IDS, executorsRemoved.mkString(","))}" +
+        log"removed due to idle timeout.")
       executorsRemoved.toSeq
     } else {
       logWarning(log"Unable to reach the cluster manager to kill executor/s " +
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6f3a354..fdc2b0a 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -34,7 +34,7 @@
 import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.internal.config._
 import org.apache.spark.io.CompressionCodec
@@ -188,7 +188,8 @@
       val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_))
       mapStatusOpt match {
         case Some(mapStatus) =>
-          logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
+          logInfo(log"Updating map output for ${MDC(MAP_ID, mapId)}" +
+            log" to ${MDC(BLOCK_MANAGER_ID, bmAddress)}")
           mapStatus.updateLocation(bmAddress)
           invalidateSerializedMapOutputStatusCache()
         case None =>
@@ -200,7 +201,8 @@
             _numAvailableMapOutputs += 1
             invalidateSerializedMapOutputStatusCache()
             mapStatusesDeleted(index) = null
-            logInfo(s"Recover ${mapStatus.mapId} ${mapStatus.location}")
+            logInfo(log"Recover ${MDC(MAP_ID, mapStatus.mapId)}" +
+              log" ${MDC(BLOCK_MANAGER_ID, mapStatus.location)}")
           } else {
             logWarning(log"Asked to update map output ${MDC(MAP_ID, mapId)} " +
               log"for untracked map status.")
@@ -490,20 +492,24 @@
 
   logDebug("init") // force eager creation of logger
 
+  private def logInfoMsg(msg: MessageWithContext, shuffleId: Int, context: RpcCallContext): Unit = {
+    val hostPort = context.senderAddress.hostPort
+    logInfo(log"Asked to send " +
+      msg +
+      log" locations for shuffle ${MDC(SHUFFLE_ID, shuffleId)} to ${MDC(HOST_PORT, hostPort)}")
+  }
+
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
     case GetMapOutputStatuses(shuffleId: Int) =>
-      val hostPort = context.senderAddress.hostPort
-      logInfo(s"Asked to send map output locations for shuffle $shuffleId to $hostPort")
+      logInfoMsg(log"map output", shuffleId, context)
       tracker.post(GetMapOutputMessage(shuffleId, context))
 
     case GetMapAndMergeResultStatuses(shuffleId: Int) =>
-      val hostPort = context.senderAddress.hostPort
-      logInfo(s"Asked to send map/merge result locations for shuffle $shuffleId to $hostPort")
+      logInfoMsg(log"map/merge result", shuffleId, context)
       tracker.post(GetMapAndMergeOutputMessage(shuffleId, context))
 
     case GetShufflePushMergerLocations(shuffleId: Int) =>
-      logInfo(s"Asked to send shuffle push merger locations for shuffle" +
-        s" $shuffleId to ${context.senderAddress.hostPort}")
+      logInfoMsg(log"shuffle push merger", shuffleId, context)
       tracker.post(GetShufflePushMergersMessage(shuffleId, context))
 
     case StopMapOutputTracker =>
@@ -1422,13 +1428,15 @@
       val mergeOutputStatuses = mergeStatuses.get(shuffleId).orNull
 
       if (mapOutputStatuses == null || mergeOutputStatuses == null) {
-        logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", fetching them")
+        logInfo(log"Don't have map/merge outputs for" +
+          log" shuffle ${MDC(SHUFFLE_ID, shuffleId)}, fetching them")
         val startTimeNs = System.nanoTime()
         fetchingLock.withLock(shuffleId) {
           var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull
           var fetchedMergeStatuses = mergeStatuses.get(shuffleId).orNull
           if (fetchedMapStatuses == null || fetchedMergeStatuses == null) {
-            logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+            logInfo(log"Doing the fetch; tracker endpoint = " +
+              log"${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}")
             val fetchedBytes =
               askTracker[(Array[Byte], Array[Byte])](GetMapAndMergeResultStatuses(shuffleId))
             try {
@@ -1456,12 +1464,14 @@
     } else {
       val statuses = mapStatuses.get(shuffleId).orNull
       if (statuses == null) {
-        logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
+        logInfo(log"Don't have map outputs for shuffle ${MDC(SHUFFLE_ID, shuffleId)}," +
+          log" fetching them")
         val startTimeNs = System.nanoTime()
         fetchingLock.withLock(shuffleId) {
           var fetchedStatuses = mapStatuses.get(shuffleId).orNull
           if (fetchedStatuses == null) {
-            logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+            logInfo(log"Doing the fetch; tracker endpoint =" +
+              log" ${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}")
             val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
             try {
               fetchedStatuses =
@@ -1500,7 +1510,7 @@
   def updateEpoch(newEpoch: Long): Unit = {
     epochLock.synchronized {
       if (newEpoch > epoch) {
-        logInfo("Updating epoch to " + newEpoch + " and clearing cache")
+        logInfo(log"Updating epoch to ${MDC(EPOCH, newEpoch)} and clearing cache")
         epoch = newEpoch
         mapStatuses.clear()
         mergeStatuses.clear()
@@ -1561,7 +1571,9 @@
         oos.close()
       }
       val outArr = out.toByteArray
-      logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arrSize)
+      logInfo(log"Broadcast outputstatuses size = " +
+        log"${MDC(BROADCAST_OUTPUT_STATUS_SIZE, outArr.length)}," +
+        log" actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, arrSize)}")
       (outArr, bcast)
     } else {
       (chunkedByteBuf.toArray, null)
@@ -1594,8 +1606,10 @@
         try {
           // deserialize the Broadcast, pull .value array out of it, and then deserialize that
           val bcast = deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]]
-          logInfo("Broadcast outputstatuses size = " + bytes.length +
-            ", actual size = " + bcast.value.foldLeft(0L)(_ + _.length))
+          val actualSize = bcast.value.foldLeft(0L)(_ + _.length)
+          logInfo(log"Broadcast outputstatuses size =" +
+            log" ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, bytes.length)}" +
+            log", actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, actualSize)}")
           val bcastIn = new ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream()
           // Important - ignore the DIRECT tag ! Start from offset 1
           bcastIn.skip(1)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 385f9cb..0dbac45 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -199,10 +199,11 @@
     this(master, appName, sparkHome, jars, Map())
 
   // log out Spark Version in Spark driver log
-  logInfo(s"Running Spark version $SPARK_VERSION")
-  logInfo(s"OS info ${System.getProperty("os.name")}, ${System.getProperty("os.version")}, " +
-    s"${System.getProperty("os.arch")}")
-  logInfo(s"Java version ${System.getProperty("java.version")}")
+  logInfo(log"Running Spark version ${MDC(LogKeys.SPARK_VERSION, SPARK_VERSION)}")
+  logInfo(log"OS info ${MDC(LogKeys.OS_NAME, System.getProperty("os.name"))}," +
+    log" ${MDC(LogKeys.OS_VERSION, System.getProperty("os.version"))}, " +
+    log"${MDC(LogKeys.OS_ARCH, System.getProperty("os.arch"))}")
+  logInfo(log"Java version ${MDC(LogKeys.JAVA_VERSION, System.getProperty("java.version"))}")
 
   /* ------------------------------------------------------------------------------------- *
    | Private variables. These variables keep the internal state of the context, and are    |
@@ -439,7 +440,7 @@
     logResourceInfo(SPARK_DRIVER_PREFIX, _resources)
 
     // log out spark.app.name in the Spark driver logs
-    logInfo(s"Submitted application: $appName")
+    logInfo(log"Submitted application: ${MDC(LogKeys.APP_NAME, appName)}")
 
     // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
     if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
@@ -448,7 +449,7 @@
     }
 
     if (_conf.getBoolean("spark.logConf", false)) {
-      logInfo("Spark configuration:\n" + _conf.toDebugString)
+      logInfo(log"Spark configuration:\n${MDC(LogKeys.CONFIG, _conf.toDebugString)}")
     }
 
     // Set Spark driver host and port system properties. This explicitly sets the configuration
@@ -1704,7 +1705,8 @@
       "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
     val bc = env.broadcastManager.newBroadcast[T](value, isLocal, serializedOnly)
     val callSite = getCallSite()
-    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
+    logInfo(log"Created broadcast ${MDC(LogKeys.BROADCAST_ID, bc.id)}" +
+      log" from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}")
     cleaner.foreach(_.registerBroadcastForCleanup(bc))
     bc
   }
@@ -1833,7 +1835,8 @@
         addedFiles
           .getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala)
           .putIfAbsent(key, timestamp).isEmpty) {
-      logInfo(s"Added file $path at $key with timestamp $timestamp")
+      logInfo(log"Added file ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)} with" +
+        log" timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}")
       // Fetch the file locally so that closures which are run on the driver can still use the
       // SparkFiles API to access files.
       Utils.fetchFile(uri.toString, root, conf, hadoopConfiguration, timestamp, useCache = false)
@@ -1845,7 +1848,8 @@
           .putIfAbsent(
           Utils.getUriBuilder(new URI(key)).fragment(uri.getFragment).build().toString,
           timestamp).isEmpty) {
-      logInfo(s"Added archive $path at $key with timestamp $timestamp")
+      logInfo(log"Added archive ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)}" +
+        log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}")
       // If the scheme is file, use URI to simply copy instead of downloading.
       val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key)
       val uriToDownload = Utils.getUriBuilder(uriToUse).fragment(null).build()
@@ -1855,7 +1859,9 @@
         root,
         if (uri.getFragment != null) uri.getFragment else source.getName)
       logInfo(
-        s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
+        log"Unpacking an archive ${MDC(LogKeys.PATH, path)}" +
+          log" from ${MDC(LogKeys.SOURCE_PATH, source.getAbsolutePath)}" +
+          log" to ${MDC(LogKeys.DESTINATION_PATH, dest.getAbsolutePath)}")
       Utils.deleteRecursively(dest)
       Utils.unpack(source, dest)
       postEnvironmentUpdate()
@@ -2216,8 +2222,14 @@
           .getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala)
           .putIfAbsent(_, timestamp).isEmpty)
         if (added.nonEmpty) {
-          val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
-          logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp")
+          val jarMessage = if (scheme != "ivy") {
+            log"Added JAR"
+          } else {
+            log"Added dependency jars of Ivy URI"
+          }
+          logInfo(jarMessage + log" ${MDC(LogKeys.PATH, path)}" +
+            log" at ${MDC(LogKeys.ADDED_JARS, added.mkString(","))}" +
+            log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}")
           postEnvironmentUpdate()
         }
         if (existed.nonEmpty) {
@@ -2274,7 +2286,8 @@
    */
   def stop(exitCode: Int): Unit = {
     stopSite = Some(getCallSite())
-    logInfo(s"SparkContext is stopping with exitCode $exitCode from ${stopSite.get.shortForm}.")
+    logInfo(log"SparkContext is stopping with exitCode ${MDC(LogKeys.EXIT_CODE, exitCode)}" +
+      log" from ${MDC(LogKeys.STOP_SITE_SHORT_FORM, stopSite.get.shortForm)}.")
     if (LiveListenerBus.withinListenerThread.value) {
       throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
     }
@@ -2438,9 +2451,10 @@
     }
     val callSite = getCallSite()
     val cleanedFunc = clean(func)
-    logInfo("Starting job: " + callSite.shortForm)
+    logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}")
     if (conf.getBoolean("spark.logLineage", false)) {
-      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
+      logInfo(log"RDD's recursive dependencies:\n" +
+        log"${MDC(LogKeys.RDD_DEBUG_STRING, rdd.toDebugString)}")
     }
     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
     progressBar.foreach(_.finishAll())
@@ -2559,13 +2573,14 @@
       timeout: Long): PartialResult[R] = {
     assertNotStopped()
     val callSite = getCallSite()
-    logInfo("Starting job: " + callSite.shortForm)
-    val start = System.nanoTime
+    logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}")
+    val start = System.currentTimeMillis()
     val cleanedFunc = clean(func)
     val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout,
       localProperties.get)
     logInfo(
-      "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
+      log"Job finished: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}," +
+        log" took ${MDC(LogKeys.TOTAL_TIME, System.currentTimeMillis() - start)}ms")
     result
   }
 
@@ -2793,7 +2808,8 @@
         val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
         listeners.foreach { listener =>
           listenerBus.addToSharedQueue(listener)
-          logInfo(s"Registered listener ${listener.getClass().getName()}")
+          logInfo(log"Registered listener" +
+            log"${MDC(LogKeys.CLASS_NAME, listener.getClass().getName())}")
         }
       }
     } catch {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 46dab16..5e2b555 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -45,7 +45,7 @@
     converterClass.map { cc =>
       Try {
         val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance()
-        logInfo(s"Loaded converter: $cc")
+        logInfo(log"Loaded converter: ${MDC(CLASS_NAME, cc)}")
         c
       } match {
         case Success(c) => c
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 5aa080b..d643983 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -37,7 +37,8 @@
 import org.apache.spark.api.python.PythonFunction.PythonAccumulator
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.input.PortableDataStream
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{HOST, PORT}
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
@@ -733,7 +734,8 @@
   private def openSocket(): Socket = synchronized {
     if (socket == null || socket.isClosed) {
       socket = new Socket(serverHost, serverPort)
-      logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort")
+      logInfo(log"Connected to AccumulatorServer at host: ${MDC(HOST, serverHost)}" +
+        log" port: ${MDC(PORT, serverPort)}")
       // send the secret just for the initial authentication when opening a new connection
       socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
     }
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 16f8714..b2571ff 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -32,7 +32,7 @@
 import org.apache.spark._
 import org.apache.spark.api.python.PythonFunction.PythonAccumulator
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys._
+import org.apache.spark.internal.LogKeys.TASK_NAME
 import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python}
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.rdd.InputFileBlockHolder
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 26c790a..fc6403f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -28,7 +28,8 @@
 
 import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{PATH, PYTHON_VERSION}
 import org.apache.spark.util.ArrayImplicits.SparkArrayOps
 import org.apache.spark.util.Utils
 
@@ -122,11 +123,11 @@
         PythonUtils.sparkPythonPath,
         sys.env.getOrElse("PYTHONPATH", ""))
       val environment = Map("PYTHONPATH" -> pythonPath)
-      logInfo(s"Python path $pythonPath")
+      logInfo(log"Python path ${MDC(PATH, pythonPath)}")
 
       val processPythonVer = Process(pythonVersionCMD, None, environment.toSeq: _*)
       val output = runCommand(processPythonVer)
-      logInfo(s"Python version: ${output.getOrElse("Unable to determine")}")
+      logInfo(log"Python version: ${MDC(PYTHON_VERSION, output.getOrElse("Unable to determine"))}")
 
       val pythonCode =
         """
diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index b238e2b..0ff2b79 100644
--- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -22,7 +22,8 @@
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkEnv, SparkPythonException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{PYTHON_EXEC, PYTHON_WORKER_MODULE, PYTHON_WORKER_RESPONSE, SESSION_ID}
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
 
@@ -58,7 +59,8 @@
    * to be used with the functions.
    */
   def init(): (DataOutputStream, DataInputStream) = {
-    logInfo(s"Initializing Python runner (session: $sessionId, pythonExec: $pythonExec)")
+    logInfo(log"Initializing Python runner (session: ${MDC(SESSION_ID, sessionId)}," +
+      log" pythonExec: ${MDC(PYTHON_EXEC, pythonExec)})")
     val env = SparkEnv.get
 
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
@@ -95,7 +97,8 @@
       val errMessage = PythonWorkerUtils.readUTF(dataIn)
       throw streamingPythonRunnerInitializationFailure(resFromPython, errMessage)
     }
-    logInfo(s"Runner initialization succeeded (returned $resFromPython).")
+    logInfo(log"Runner initialization succeeded (returned" +
+      log" ${MDC(PYTHON_WORKER_RESPONSE, resFromPython)}).")
 
     (dataOut, dataIn)
   }
@@ -116,7 +119,8 @@
    * Stops the Python worker.
    */
   def stop(): Unit = {
-    logInfo(s"Stopping streaming runner for sessionId: $sessionId, module: $workerModule.")
+    logInfo(log"Stopping streaming runner for sessionId: ${MDC(SESSION_ID, sessionId)}," +
+      log" module: ${MDC(PYTHON_WORKER_MODULE, workerModule)}.")
 
     try {
       pythonWorkerFactory.foreach { factory =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 50ce6f8..226a6dc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -164,11 +164,12 @@
       // logs again when waitAppCompletion is set to true
       if (!driverStatusReported) {
         driverStatusReported = true
-        logInfo(s"State of $submittedDriverID is ${state.get}")
+        logInfo(log"State of ${MDC(DRIVER_ID, submittedDriverID)}" +
+          log" is ${MDC(DRIVER_STATE, state.get)}")
         // Worker node, if present
         (workerId, workerHostPort, state) match {
           case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
-            logInfo(s"Driver running on $hostPort ($id)")
+            logInfo(log"Driver running on ${MDC(HOST, hostPort)} (${MDC(WORKER_ID, id)})")
           case _ =>
         }
       }
@@ -181,17 +182,18 @@
           state.get match {
             case DriverState.FINISHED | DriverState.FAILED |
                  DriverState.ERROR | DriverState.KILLED =>
-              logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
-                s"exiting spark-submit JVM.")
+              logInfo(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" +
+                log" is ${MDC(DRIVER_STATE, state.get)}, exiting spark-submit JVM.")
               System.exit(0)
             case _ =>
               if (!waitAppCompletion) {
-                logInfo(s"spark-submit not configured to wait for completion, " +
-                  s"exiting spark-submit JVM.")
+                logInfo("spark-submit not configured to wait for completion, " +
+                  " exiting spark-submit JVM.")
                 System.exit(0)
               } else {
-                logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
-                  s"continue monitoring driver status.")
+                logDebug(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" +
+                  log" is ${MDC(DRIVER_STATE, state.get)}, " +
+                  log"continue monitoring driver status.")
               }
           }
       }
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index a56fbd5..3ce5e2d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -23,7 +23,8 @@
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKeys.{AUTH_ENABLED, PORT, SHUFFLE_DB_BACKEND_KEY, SHUFFLE_DB_BACKEND_NAME}
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.crypto.AuthServerBootstrap
@@ -86,8 +87,8 @@
     if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
       val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND)
       val dbBackend = DBBackend.byName(shuffleDBName)
-      logInfo(s"Use ${dbBackend.name()} as the implementation of " +
-        s"${config.SHUFFLE_SERVICE_DB_BACKEND.key}")
+      logInfo(log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, dbBackend.name())} as the implementation of " +
+        log"${MDC(SHUFFLE_DB_BACKEND_KEY, config.SHUFFLE_SERVICE_DB_BACKEND.key)}")
       new ExternalBlockHandler(conf,
         findRegisteredExecutorsDBFile(dbBackend.fileName(registeredExecutorsDB)))
     } else {
@@ -106,7 +107,8 @@
   def start(): Unit = {
     require(server == null, "Shuffle server already started")
     val authEnabled = securityManager.isAuthenticationEnabled()
-    logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)")
+    logInfo(log"Starting shuffle service on port ${MDC(PORT, port)}" +
+      log" (auth enabled = ${MDC(AUTH_ENABLED, authEnabled)})")
     val bootstraps: Seq[TransportServerBootstrap] =
       if (authEnabled) {
         Seq(new AuthServerBootstrap(transportConf, securityManager))
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 4bd2de6..cc77765 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -173,7 +173,7 @@
 
     val checkpointDurationMs =
       TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
-    logInfo(s"Checkpointing took $checkpointDurationMs ms.")
+    logInfo(log"Checkpointing took ${MDC(TOTAL_TIME, checkpointDurationMs)} ms.")
 
     val newRDD = new ReliableCheckpointRDD[T](
       sc, checkpointDirPath.toString, originalRDD.partitioner)
@@ -220,7 +220,7 @@
     } (catchBlock = {
       val deleted = fs.delete(tempOutputPath, false)
       if (!deleted) {
-        logInfo(s"Failed to delete tempOutputPath $tempOutputPath.")
+        logInfo(log"Failed to delete tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}.")
       }
     }, finallyBlock = {
       serializeStream.close()
@@ -228,12 +228,13 @@
 
     if (!fs.rename(tempOutputPath, finalOutputPath)) {
       if (!fs.exists(finalOutputPath)) {
-        logInfo(s"Deleting tempOutputPath $tempOutputPath")
+        logInfo(log"Deleting tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}")
         fs.delete(tempOutputPath, false)
         throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), finalOutputPath)
       } else {
         // Some other copy of this task must've finished before us and renamed it
-        logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
+        logInfo(log"Final output path" +
+          log" ${MDC(FINAL_OUTPUT_PATH, finalOutputPath)} already exists; not overwriting it")
         if (!fs.delete(tempOutputPath, false)) {
           logWarning(log"Error deleting ${MDC(PATH, tempOutputPath)}")
         }
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 0d1bc14..b468a38 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -23,7 +23,8 @@
 
 import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{NEW_RDD_ID, RDD_CHECKPOINT_DIR, RDD_ID}
 import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS
 
 /**
@@ -66,7 +67,8 @@
       }
     }
 
-    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
+    logInfo(log"Done checkpointing RDD ${MDC(RDD_ID, rdd.id)}" +
+      log" to ${MDC(RDD_CHECKPOINT_DIR, cpDir)}, new parent is RDD ${MDC(NEW_RDD_ID, newRDD.id)}")
     newRDD
   }
 
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 3d0e477..f503be9 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -250,7 +250,8 @@
       poolSize: Int = 200): ServerInfo = {
 
     val stopTimeout = conf.get(UI_JETTY_STOP_TIMEOUT)
-    logInfo(s"Start Jetty $hostName:$port for $serverName")
+    logInfo(log"Start Jetty ${MDC(HOST, hostName)}:${MDC(PORT, port)}" +
+      log" for ${MDC(SERVER_NAME, serverName)}")
     // Start the server first, with no connectors.
     val pool = new QueuedThreadPool(poolSize)
     if (serverName.nonEmpty) {
@@ -558,7 +559,9 @@
    */
   private def addFilters(handler: ServletContextHandler, securityMgr: SecurityManager): Unit = {
     conf.get(UI_FILTERS).foreach { filter =>
-      logInfo(s"Adding filter to ${handler.getContextPath()}: $filter")
+      logInfo(log"Adding filter to" +
+        log" ${MDC(SERVLET_CONTEXT_HANDLER_PATH, handler.getContextPath())}:" +
+        log" ${MDC(UI_FILTER, filter)}")
       val oldParams = conf.getOption(s"spark.$filter.params").toSeq
         .flatMap(Utils.stringToSeq)
         .flatMap { param =>
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ac76f74..b8d422c 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -24,7 +24,7 @@
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.CLASS_NAME
+import org.apache.spark.internal.LogKeys.{CLASS_NAME, WEB_URL}
 import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.scheduler._
@@ -164,7 +164,7 @@
   /** Stop the server behind this web interface. Only valid after bind(). */
   override def stop(): Unit = {
     super.stop()
-    logInfo(s"Stopped Spark web UI at $webUrl")
+    logInfo(log"Stopped Spark web UI at ${MDC(WEB_URL, webUrl)}")
   }
 
   override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = {
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 5149987..4f01cd6 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -27,7 +27,7 @@
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKeys.LISTENER
+import org.apache.spark.internal.LogKeys.{EVENT, LISTENER, TOTAL_TIME}
 import org.apache.spark.scheduler.EventLoggingListener
 import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
 
@@ -132,8 +132,9 @@
         if (maybeTimerContext != null) {
           val elapsed = maybeTimerContext.stop()
           if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
-            logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
-              s"${elapsed / 1000000000d}s.")
+            logInfo(log"Process of event ${MDC(EVENT, redactEvent(event))} by" +
+              log"listener ${MDC(LISTENER, listenerName)} took " +
+              log"${MDC(TOTAL_TIME, elapsed / 1000000d)}ms.")
           }
         }
       }
diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
index 098f8b6..b41166a 100644
--- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
@@ -80,7 +80,7 @@
       action: => Boolean): Unit = synchronized {
     try {
       val handler = handlers.getOrElseUpdate(signal, {
-        logInfo(s"Registering signal handler for $signal")
+        logInfo(log"Registering signal handler for ${MDC(SIGNAL, signal)}")
         new ActionHandler(new Signal(signal))
       })
       handler.register(action)