[SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured logging framework
The PR aims to migrate `logInfo` in Core module with variables to structured logging framework.
### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46362 from zeotuan/coreInfo.
Lead-authored-by: Tuan Pham <Tuan.Pham@wisetechglobal.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index c127f9c..14e822c 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -26,13 +26,15 @@
}
/**
- * Various keys used for mapped diagnostic contexts(MDC) in logging.
- * All structured logging keys should be defined here for standardization.
+ * Various keys used for mapped diagnostic contexts(MDC) in logging. All structured logging keys
+ * should be defined here for standardization.
*/
object LogKeys {
case object ACCUMULATOR_ID extends LogKey
+ case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object ACTUAL_NUM_FILES extends LogKey
case object ACTUAL_PARTITION_COLUMN extends LogKey
+ case object ADDED_JARS extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
@@ -42,7 +44,10 @@
case object APP_NAME extends LogKey
case object APP_STATE extends LogKey
case object ARGS extends LogKey
+ case object AUTH_ENABLED extends LogKey
case object BACKUP_FILE extends LogKey
+ case object BARRIER_EPOCH extends LogKey
+ case object BARRIER_ID extends LogKey
case object BATCH_ID extends LogKey
case object BATCH_NAME extends LogKey
case object BATCH_TIMESTAMP extends LogKey
@@ -55,6 +60,7 @@
case object BOOT extends LogKey
case object BROADCAST extends LogKey
case object BROADCAST_ID extends LogKey
+ case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object BUCKET extends LogKey
case object BYTECODE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
@@ -62,6 +68,7 @@
case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
case object CALL_SITE_LONG_FORM extends LogKey
+ case object CALL_SITE_SHORT_FORM extends LogKey
case object CATALOG_NAME extends LogKey
case object CATEGORICAL_FEATURES extends LogKey
case object CHECKPOINT_FILE extends LogKey
@@ -142,11 +149,13 @@
case object DEPRECATED_KEY extends LogKey
case object DESCRIPTION extends LogKey
case object DESIRED_NUM_PARTITIONS extends LogKey
+ case object DESTINATION_PATH extends LogKey
case object DFS_FILE extends LogKey
case object DIFF_DELTA extends LogKey
case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
case object DRIVER_ID extends LogKey
case object DRIVER_LIBRARY_PATH_KEY extends LogKey
+ case object DRIVER_STATE extends LogKey
case object DROPPED_PARTITIONS extends LogKey
case object DURATION extends LogKey
case object EARLIEST_LOADED_VERSION extends LogKey
@@ -161,6 +170,7 @@
case object ERROR extends LogKey
case object ESTIMATOR_PARAMETER_MAP extends LogKey
case object EVALUATED_FILTERS extends LogKey
+ case object EVENT extends LogKey
case object EVENT_LOG_DESTINATION extends LogKey
case object EVENT_LOOP extends LogKey
case object EVENT_NAME extends LogKey
@@ -184,8 +194,10 @@
case object EXECUTOR_TIMEOUT extends LogKey
case object EXEC_AMOUNT extends LogKey
case object EXISTING_FILE extends LogKey
+ case object EXISTING_JARS extends LogKey
case object EXISTING_PATH extends LogKey
case object EXIT_CODE extends LogKey
+ case object EXPECTED_ANSWER extends LogKey
case object EXPECTED_NUM_FILES extends LogKey
case object EXPECTED_PARTITION_COLUMN extends LogKey
case object EXPIRY_TIMESTAMP extends LogKey
@@ -211,6 +223,7 @@
case object FILTER extends LogKey
case object FILTERS extends LogKey
case object FINAL_CONTEXT extends LogKey
+ case object FINAL_OUTPUT_PATH extends LogKey
case object FINAL_PATH extends LogKey
case object FINISH_TRIGGER_DURATION extends LogKey
case object FROM_OFFSET extends LogKey
@@ -252,6 +265,7 @@
case object IS_NETWORK_REQUEST_DONE extends LogKey
case object JAR_ENTRY extends LogKey
case object JAR_MESSAGE extends LogKey
+ case object JAVA_VERSION extends LogKey
case object JOB_ID extends LogKey
case object JOIN_CONDITION extends LogKey
case object JOIN_CONDITION_SUB_EXPR extends LogKey
@@ -280,6 +294,7 @@
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
+ case object LOG_KEY_FILE extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
case object LOWER_BOUND extends LogKey
@@ -326,13 +341,16 @@
case object NEW_FEATURE_COLUMN_NAME extends LogKey
case object NEW_LABEL_COLUMN_NAME extends LogKey
case object NEW_PATH extends LogKey
+ case object NEW_RDD_ID extends LogKey
case object NEW_STATE extends LogKey
case object NEW_VALUE extends LogKey
case object NODES extends LogKey
case object NODE_LOCATION extends LogKey
case object NON_BUILT_IN_CONNECTORS extends LogKey
case object NORM extends LogKey
+ case object NUM_ADDED_MASTERS extends LogKey
case object NUM_ADDED_PARTITIONS extends LogKey
+ case object NUM_ADDED_WORKERS extends LogKey
case object NUM_BIN extends LogKey
case object NUM_BYTES extends LogKey
case object NUM_BYTES_CURRENT extends LogKey
@@ -373,6 +391,7 @@
case object NUM_PRUNED extends LogKey
case object NUM_REPLICAS extends LogKey
case object NUM_REQUESTS extends LogKey
+ case object NUM_REQUEST_SYNC_TASK extends LogKey
case object NUM_RESOURCE_SLOTS extends LogKey
case object NUM_RETRIES extends LogKey
case object NUM_RIGHT_PARTITION_VALUES extends LogKey
@@ -397,7 +416,12 @@
case object OPTIONS extends LogKey
case object OP_ID extends LogKey
case object OP_TYPE extends LogKey
+ case object OS_ARCH extends LogKey
+ case object OS_NAME extends LogKey
+ case object OS_VERSION extends LogKey
case object OUTPUT extends LogKey
+ case object OUTPUT_LINE extends LogKey
+ case object OUTPUT_LINE_NUMBER extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
case object PAGE_SIZE extends LogKey
case object PARSE_MODE extends LogKey
@@ -438,6 +462,9 @@
case object PUSHED_FILTERS extends LogKey
case object PVC_METADATA_NAME extends LogKey
case object PYTHON_EXEC extends LogKey
+ case object PYTHON_VERSION extends LogKey
+ case object PYTHON_WORKER_MODULE extends LogKey
+ case object PYTHON_WORKER_RESPONSE extends LogKey
case object QUERY_CACHE_VALUE extends LogKey
case object QUERY_HINT extends LogKey
case object QUERY_ID extends LogKey
@@ -447,6 +474,8 @@
case object QUERY_PLAN_LENGTH_MAX extends LogKey
case object QUERY_RUN_ID extends LogKey
case object RANGE extends LogKey
+ case object RDD_CHECKPOINT_DIR extends LogKey
+ case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
case object RDD_ID extends LogKey
case object READ_LIMIT extends LogKey
@@ -466,6 +495,7 @@
case object REMOTE_ADDRESS extends LogKey
case object REMOVE_FROM_MASTER extends LogKey
case object REPORT_DETAILS extends LogKey
+ case object REQUESTER_SIZE extends LogKey
case object RESOURCE extends LogKey
case object RESOURCE_NAME extends LogKey
case object RESOURCE_PROFILE_ID extends LogKey
@@ -489,13 +519,18 @@
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEMA extends LogKey
case object SCHEMA2 extends LogKey
+ case object SERIALIZE_OUTPUT_LENGTH extends LogKey
+ case object SERVER_NAME extends LogKey
case object SERVICE_NAME extends LogKey
+ case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
case object SESSION_HOLD_INFO extends LogKey
case object SESSION_ID extends LogKey
case object SESSION_KEY extends LogKey
case object SHARD_ID extends LogKey
case object SHELL_COMMAND extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
+ case object SHUFFLE_DB_BACKEND_KEY extends LogKey
+ case object SHUFFLE_DB_BACKEND_NAME extends LogKey
case object SHUFFLE_ID extends LogKey
case object SHUFFLE_MERGE_ID extends LogKey
case object SHUFFLE_SERVICE_NAME extends LogKey
@@ -506,8 +541,10 @@
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
+ case object SOURCE_PATH extends LogKey
case object SPARK_DATA_STREAM extends LogKey
case object SPARK_PLAN_ID extends LogKey
+ case object SPARK_VERSION extends LogKey
case object SPILL_TIMES extends LogKey
case object SQL_TEXT extends LogKey
case object SRC_PATH extends LogKey
@@ -520,6 +557,7 @@
case object STATE_STORE_VERSION extends LogKey
case object STATUS extends LogKey
case object STDERR extends LogKey
+ case object STOP_SITE_SHORT_FORM extends LogKey
case object STORAGE_LEVEL extends LogKey
case object STORAGE_LEVEL_DESERIALIZED extends LogKey
case object STORAGE_LEVEL_REPLICATION extends LogKey
@@ -535,11 +573,14 @@
case object STREAMING_WRITE extends LogKey
case object STREAM_ID extends LogKey
case object STREAM_NAME extends LogKey
+ case object STREAM_SOURCE extends LogKey
case object SUBMISSION_ID extends LogKey
case object SUBSAMPLING_RATE extends LogKey
case object SUB_QUERY extends LogKey
case object TABLE_NAME extends LogKey
case object TABLE_TYPES extends LogKey
+ case object TARGET_NUM_EXECUTOR extends LogKey
+ case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
case object TARGET_PATH extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
@@ -548,6 +589,7 @@
case object TASK_SET_NAME extends LogKey
case object TASK_STATE extends LogKey
case object TEMP_FILE extends LogKey
+ case object TEMP_OUTPUT_PATH extends LogKey
case object TEMP_PATH extends LogKey
case object TEST_SIZE extends LogKey
case object THREAD extends LogKey
@@ -557,6 +599,7 @@
case object TIME extends LogKey
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
+ case object TIMESTAMP extends LogKey
case object TIME_UNITS extends LogKey
case object TIP extends LogKey
case object TOKEN_KIND extends LogKey
@@ -598,6 +641,7 @@
case object WAIT_SEND_TIME extends LogKey
case object WAIT_TIME extends LogKey
case object WATERMARK_CONSTRAINT extends LogKey
+ case object WEB_URL extends LogKey
case object WEIGHT extends LogKey
case object WEIGHTED_NUM extends LogKey
case object WORKER extends LogKey
diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
index 9422421..adce6c3 100644
--- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
@@ -23,7 +23,8 @@
import scala.collection.mutable.{ArrayBuffer, HashSet}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys._
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted}
import org.apache.spark.util.ThreadUtils
@@ -161,7 +162,8 @@
s"${request.numTasks} from Task $taskId, previously it was $numTasks.")
// Check whether the epoch from the barrier tasks matches current barrierEpoch.
- logInfo(s"Current barrier epoch for $barrierId is $barrierEpoch.")
+ logInfo(log"Current barrier epoch for ${MDC(BARRIER_ID, barrierId)}" +
+ log" is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
if (epoch != barrierEpoch) {
requester.sendFailure(new SparkException(s"The request to sync of $barrierId with " +
s"barrier epoch $barrierEpoch has already finished. Maybe task $taskId is not " +
@@ -176,14 +178,17 @@
// Add the requester to array of RPCCallContexts pending for reply.
requesters += requester
messages(request.partitionId) = request.message
- logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received update from Task " +
- s"$taskId, current progress: ${requesters.size}/$numTasks.")
+ logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
+ log" from ${MDC(BARRIER_ID, barrierId)} received update from Task" +
+ log" ${MDC(TASK_ID, taskId)}, current progress:" +
+ log" ${MDC(REQUESTER_SIZE, requesters.size)}/${MDC(NUM_REQUEST_SYNC_TASK, numTasks)}.")
if (requesters.size == numTasks) {
requesters.foreach(_.reply(messages.clone()))
// Finished current barrier() call successfully, clean up ContextBarrierState and
// increase the barrier epoch.
- logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received all updates from " +
- s"tasks, finished successfully.")
+ logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
+ log" from ${MDC(BARRIER_ID, barrierId)} received all updates from" +
+ log" tasks, finished successfully.")
barrierEpoch += 1
requesters.clear()
requestMethods.clear()
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index e083ece..c8d6000 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -26,7 +26,8 @@
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKeys._
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceInformation
@@ -56,19 +57,27 @@
// with the driver side epoch.
private var barrierEpoch = 0
+ private def logProgressInfo(msg: MessageWithContext, startTime: Option[Long]): Unit = {
+ val waitMsg = startTime.fold(log"")(st => log", waited " +
+ log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,")
+ logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" +
+ log" from Stage ${MDC(STAGE_ID, stageId())}" +
+ log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " +
+ msg + waitMsg +
+ log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
+ }
+
private def runBarrier(message: String, requestMethod: RequestMethod.Value): Array[String] = {
- logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
- s"has entered the global sync, current barrier epoch is $barrierEpoch.")
+ logProgressInfo(log"has entered the global sync", None)
logTrace("Current callSite: " + Utils.getCallSite())
val startTime = System.currentTimeMillis()
val timerTask = new TimerTask {
override def run(): Unit = {
- logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
- s"${stageAttemptNumber()}) waiting " +
- s"under the global sync since $startTime, has been waiting for " +
- s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
- s"current barrier epoch is $barrierEpoch.")
+ logProgressInfo(
+ log"waiting under the global sync since ${MDC(TIME, startTime)}",
+ Some(startTime)
+ )
}
}
// Log the update of global sync every 1 minute.
@@ -104,17 +113,11 @@
val messages = abortableRpcFuture.future.value.get.get
barrierEpoch += 1
- logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
- s"finished global sync successfully, waited for " +
- s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
- s"current barrier epoch is $barrierEpoch.")
+ logProgressInfo(log"finished global sync successfully", Some(startTime))
messages
} catch {
case e: SparkException =>
- logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
- s"${stageAttemptNumber()}) failed to perform global sync, waited for " +
- s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
- s"current barrier epoch is $barrierEpoch.")
+ logProgressInfo(log"failed to perform global sync", Some(startTime))
throw e
} finally {
timerTask.cancel()
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index d156d74..3bfa1ae 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -446,10 +446,12 @@
val delta = targetNum.delta
totalDelta += delta
if (delta > 0) {
- val executorsString = "executor" + { if (delta > 1) "s" else "" }
- logInfo(s"Requesting $delta new $executorsString because tasks are backlogged " +
- s"(new desired total will be ${numExecutorsTargetPerResourceProfileId(rpId)} " +
- s"for resource profile id: ${rpId})")
+ val executorsString = log" new executor" + { if (delta > 1) log"s" else log"" }
+ logInfo(log"Requesting ${MDC(TARGET_NUM_EXECUTOR_DELTA, delta)}" +
+ executorsString + log" because tasks are backlogged " +
+ log"(new desired total will be" +
+ log" ${MDC(TARGET_NUM_EXECUTOR, numExecutorsTargetPerResourceProfileId(rpId))} " +
+ log"for resource profile id: ${MDC(RESOURCE_PROFILE_ID, rpId)})")
numExecutorsToAddPerResourceProfileId(rpId) =
if (delta == numExecutorsToAddPerResourceProfileId(rpId)) {
numExecutorsToAddPerResourceProfileId(rpId) * 2
@@ -604,7 +606,8 @@
} else {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
}
- logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
+ logInfo(log"Executors ${MDC(EXECUTOR_IDS, executorsRemoved.mkString(","))}" +
+ log"removed due to idle timeout.")
executorsRemoved.toSeq
} else {
logWarning(log"Unable to reach the cluster manager to kill executor/s " +
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6f3a354..fdc2b0a 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -34,7 +34,7 @@
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
@@ -188,7 +188,8 @@
val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_))
mapStatusOpt match {
case Some(mapStatus) =>
- logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
+ logInfo(log"Updating map output for ${MDC(MAP_ID, mapId)}" +
+ log" to ${MDC(BLOCK_MANAGER_ID, bmAddress)}")
mapStatus.updateLocation(bmAddress)
invalidateSerializedMapOutputStatusCache()
case None =>
@@ -200,7 +201,8 @@
_numAvailableMapOutputs += 1
invalidateSerializedMapOutputStatusCache()
mapStatusesDeleted(index) = null
- logInfo(s"Recover ${mapStatus.mapId} ${mapStatus.location}")
+ logInfo(log"Recover ${MDC(MAP_ID, mapStatus.mapId)}" +
+ log" ${MDC(BLOCK_MANAGER_ID, mapStatus.location)}")
} else {
logWarning(log"Asked to update map output ${MDC(MAP_ID, mapId)} " +
log"for untracked map status.")
@@ -490,20 +492,24 @@
logDebug("init") // force eager creation of logger
+ private def logInfoMsg(msg: MessageWithContext, shuffleId: Int, context: RpcCallContext): Unit = {
+ val hostPort = context.senderAddress.hostPort
+ logInfo(log"Asked to send " +
+ msg +
+ log" locations for shuffle ${MDC(SHUFFLE_ID, shuffleId)} to ${MDC(HOST_PORT, hostPort)}")
+ }
+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
- val hostPort = context.senderAddress.hostPort
- logInfo(s"Asked to send map output locations for shuffle $shuffleId to $hostPort")
+ logInfoMsg(log"map output", shuffleId, context)
tracker.post(GetMapOutputMessage(shuffleId, context))
case GetMapAndMergeResultStatuses(shuffleId: Int) =>
- val hostPort = context.senderAddress.hostPort
- logInfo(s"Asked to send map/merge result locations for shuffle $shuffleId to $hostPort")
+ logInfoMsg(log"map/merge result", shuffleId, context)
tracker.post(GetMapAndMergeOutputMessage(shuffleId, context))
case GetShufflePushMergerLocations(shuffleId: Int) =>
- logInfo(s"Asked to send shuffle push merger locations for shuffle" +
- s" $shuffleId to ${context.senderAddress.hostPort}")
+ logInfoMsg(log"shuffle push merger", shuffleId, context)
tracker.post(GetShufflePushMergersMessage(shuffleId, context))
case StopMapOutputTracker =>
@@ -1422,13 +1428,15 @@
val mergeOutputStatuses = mergeStatuses.get(shuffleId).orNull
if (mapOutputStatuses == null || mergeOutputStatuses == null) {
- logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", fetching them")
+ logInfo(log"Don't have map/merge outputs for" +
+ log" shuffle ${MDC(SHUFFLE_ID, shuffleId)}, fetching them")
val startTimeNs = System.nanoTime()
fetchingLock.withLock(shuffleId) {
var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull
var fetchedMergeStatuses = mergeStatuses.get(shuffleId).orNull
if (fetchedMapStatuses == null || fetchedMergeStatuses == null) {
- logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+ logInfo(log"Doing the fetch; tracker endpoint = " +
+ log"${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}")
val fetchedBytes =
askTracker[(Array[Byte], Array[Byte])](GetMapAndMergeResultStatuses(shuffleId))
try {
@@ -1456,12 +1464,14 @@
} else {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
- logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
+ logInfo(log"Don't have map outputs for shuffle ${MDC(SHUFFLE_ID, shuffleId)}," +
+ log" fetching them")
val startTimeNs = System.nanoTime()
fetchingLock.withLock(shuffleId) {
var fetchedStatuses = mapStatuses.get(shuffleId).orNull
if (fetchedStatuses == null) {
- logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+ logInfo(log"Doing the fetch; tracker endpoint =" +
+ log" ${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}")
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
try {
fetchedStatuses =
@@ -1500,7 +1510,7 @@
def updateEpoch(newEpoch: Long): Unit = {
epochLock.synchronized {
if (newEpoch > epoch) {
- logInfo("Updating epoch to " + newEpoch + " and clearing cache")
+ logInfo(log"Updating epoch to ${MDC(EPOCH, newEpoch)} and clearing cache")
epoch = newEpoch
mapStatuses.clear()
mergeStatuses.clear()
@@ -1561,7 +1571,9 @@
oos.close()
}
val outArr = out.toByteArray
- logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arrSize)
+ logInfo(log"Broadcast outputstatuses size = " +
+ log"${MDC(BROADCAST_OUTPUT_STATUS_SIZE, outArr.length)}," +
+ log" actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, arrSize)}")
(outArr, bcast)
} else {
(chunkedByteBuf.toArray, null)
@@ -1594,8 +1606,10 @@
try {
// deserialize the Broadcast, pull .value array out of it, and then deserialize that
val bcast = deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]]
- logInfo("Broadcast outputstatuses size = " + bytes.length +
- ", actual size = " + bcast.value.foldLeft(0L)(_ + _.length))
+ val actualSize = bcast.value.foldLeft(0L)(_ + _.length)
+ logInfo(log"Broadcast outputstatuses size =" +
+ log" ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, bytes.length)}" +
+ log", actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, actualSize)}")
val bcastIn = new ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream()
// Important - ignore the DIRECT tag ! Start from offset 1
bcastIn.skip(1)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 385f9cb..0dbac45 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -199,10 +199,11 @@
this(master, appName, sparkHome, jars, Map())
// log out Spark Version in Spark driver log
- logInfo(s"Running Spark version $SPARK_VERSION")
- logInfo(s"OS info ${System.getProperty("os.name")}, ${System.getProperty("os.version")}, " +
- s"${System.getProperty("os.arch")}")
- logInfo(s"Java version ${System.getProperty("java.version")}")
+ logInfo(log"Running Spark version ${MDC(LogKeys.SPARK_VERSION, SPARK_VERSION)}")
+ logInfo(log"OS info ${MDC(LogKeys.OS_NAME, System.getProperty("os.name"))}," +
+ log" ${MDC(LogKeys.OS_VERSION, System.getProperty("os.version"))}, " +
+ log"${MDC(LogKeys.OS_ARCH, System.getProperty("os.arch"))}")
+ logInfo(log"Java version ${MDC(LogKeys.JAVA_VERSION, System.getProperty("java.version"))}")
/* ------------------------------------------------------------------------------------- *
| Private variables. These variables keep the internal state of the context, and are |
@@ -439,7 +440,7 @@
logResourceInfo(SPARK_DRIVER_PREFIX, _resources)
// log out spark.app.name in the Spark driver logs
- logInfo(s"Submitted application: $appName")
+ logInfo(log"Submitted application: ${MDC(LogKeys.APP_NAME, appName)}")
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
@@ -448,7 +449,7 @@
}
if (_conf.getBoolean("spark.logConf", false)) {
- logInfo("Spark configuration:\n" + _conf.toDebugString)
+ logInfo(log"Spark configuration:\n${MDC(LogKeys.CONFIG, _conf.toDebugString)}")
}
// Set Spark driver host and port system properties. This explicitly sets the configuration
@@ -1704,7 +1705,8 @@
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal, serializedOnly)
val callSite = getCallSite()
- logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
+ logInfo(log"Created broadcast ${MDC(LogKeys.BROADCAST_ID, bc.id)}" +
+ log" from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}")
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
@@ -1833,7 +1835,8 @@
addedFiles
.getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala)
.putIfAbsent(key, timestamp).isEmpty) {
- logInfo(s"Added file $path at $key with timestamp $timestamp")
+ logInfo(log"Added file ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)} with" +
+ log" timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}")
// Fetch the file locally so that closures which are run on the driver can still use the
// SparkFiles API to access files.
Utils.fetchFile(uri.toString, root, conf, hadoopConfiguration, timestamp, useCache = false)
@@ -1845,7 +1848,8 @@
.putIfAbsent(
Utils.getUriBuilder(new URI(key)).fragment(uri.getFragment).build().toString,
timestamp).isEmpty) {
- logInfo(s"Added archive $path at $key with timestamp $timestamp")
+ logInfo(log"Added archive ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)}" +
+ log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}")
// If the scheme is file, use URI to simply copy instead of downloading.
val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key)
val uriToDownload = Utils.getUriBuilder(uriToUse).fragment(null).build()
@@ -1855,7 +1859,9 @@
root,
if (uri.getFragment != null) uri.getFragment else source.getName)
logInfo(
- s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
+ log"Unpacking an archive ${MDC(LogKeys.PATH, path)}" +
+ log" from ${MDC(LogKeys.SOURCE_PATH, source.getAbsolutePath)}" +
+ log" to ${MDC(LogKeys.DESTINATION_PATH, dest.getAbsolutePath)}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
postEnvironmentUpdate()
@@ -2216,8 +2222,14 @@
.getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala)
.putIfAbsent(_, timestamp).isEmpty)
if (added.nonEmpty) {
- val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
- logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp")
+ val jarMessage = if (scheme != "ivy") {
+ log"Added JAR"
+ } else {
+ log"Added dependency jars of Ivy URI"
+ }
+ logInfo(jarMessage + log" ${MDC(LogKeys.PATH, path)}" +
+ log" at ${MDC(LogKeys.ADDED_JARS, added.mkString(","))}" +
+ log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}")
postEnvironmentUpdate()
}
if (existed.nonEmpty) {
@@ -2274,7 +2286,8 @@
*/
def stop(exitCode: Int): Unit = {
stopSite = Some(getCallSite())
- logInfo(s"SparkContext is stopping with exitCode $exitCode from ${stopSite.get.shortForm}.")
+ logInfo(log"SparkContext is stopping with exitCode ${MDC(LogKeys.EXIT_CODE, exitCode)}" +
+ log" from ${MDC(LogKeys.STOP_SITE_SHORT_FORM, stopSite.get.shortForm)}.")
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
}
@@ -2438,9 +2451,10 @@
}
val callSite = getCallSite()
val cleanedFunc = clean(func)
- logInfo("Starting job: " + callSite.shortForm)
+ logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}")
if (conf.getBoolean("spark.logLineage", false)) {
- logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
+ logInfo(log"RDD's recursive dependencies:\n" +
+ log"${MDC(LogKeys.RDD_DEBUG_STRING, rdd.toDebugString)}")
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
@@ -2559,13 +2573,14 @@
timeout: Long): PartialResult[R] = {
assertNotStopped()
val callSite = getCallSite()
- logInfo("Starting job: " + callSite.shortForm)
- val start = System.nanoTime
+ logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}")
+ val start = System.currentTimeMillis()
val cleanedFunc = clean(func)
val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout,
localProperties.get)
logInfo(
- "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
+ log"Job finished: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}," +
+ log" took ${MDC(LogKeys.TOTAL_TIME, System.currentTimeMillis() - start)}ms")
result
}
@@ -2793,7 +2808,8 @@
val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
listeners.foreach { listener =>
listenerBus.addToSharedQueue(listener)
- logInfo(s"Registered listener ${listener.getClass().getName()}")
+ logInfo(log"Registered listener" +
+ log"${MDC(LogKeys.CLASS_NAME, listener.getClass().getName())}")
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 46dab16..5e2b555 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -45,7 +45,7 @@
converterClass.map { cc =>
Try {
val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance()
- logInfo(s"Loaded converter: $cc")
+ logInfo(log"Loaded converter: ${MDC(CLASS_NAME, cc)}")
c
} match {
case Success(c) => c
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 5aa080b..d643983 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -37,7 +37,8 @@
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{HOST, PORT}
import org.apache.spark.internal.config.BUFFER_SIZE
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
@@ -733,7 +734,8 @@
private def openSocket(): Socket = synchronized {
if (socket == null || socket.isClosed) {
socket = new Socket(serverHost, serverPort)
- logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort")
+ logInfo(log"Connected to AccumulatorServer at host: ${MDC(HOST, serverHost)}" +
+ log" port: ${MDC(PORT, serverPort)}")
// send the secret just for the initial authentication when opening a new connection
socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 16f8714..b2571ff 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -32,7 +32,7 @@
import org.apache.spark._
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys._
+import org.apache.spark.internal.LogKeys.TASK_NAME
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python}
import org.apache.spark.internal.config.Python._
import org.apache.spark.rdd.InputFileBlockHolder
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 26c790a..fc6403f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -28,7 +28,8 @@
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{PATH, PYTHON_VERSION}
import org.apache.spark.util.ArrayImplicits.SparkArrayOps
import org.apache.spark.util.Utils
@@ -122,11 +123,11 @@
PythonUtils.sparkPythonPath,
sys.env.getOrElse("PYTHONPATH", ""))
val environment = Map("PYTHONPATH" -> pythonPath)
- logInfo(s"Python path $pythonPath")
+ logInfo(log"Python path ${MDC(PATH, pythonPath)}")
val processPythonVer = Process(pythonVersionCMD, None, environment.toSeq: _*)
val output = runCommand(processPythonVer)
- logInfo(s"Python version: ${output.getOrElse("Unable to determine")}")
+ logInfo(log"Python version: ${MDC(PYTHON_VERSION, output.getOrElse("Unable to determine"))}")
val pythonCode =
"""
diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index b238e2b..0ff2b79 100644
--- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -22,7 +22,8 @@
import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkEnv, SparkPythonException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{PYTHON_EXEC, PYTHON_WORKER_MODULE, PYTHON_WORKER_RESPONSE, SESSION_ID}
import org.apache.spark.internal.config.BUFFER_SIZE
import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
@@ -58,7 +59,8 @@
* to be used with the functions.
*/
def init(): (DataOutputStream, DataInputStream) = {
- logInfo(s"Initializing Python runner (session: $sessionId, pythonExec: $pythonExec)")
+ logInfo(log"Initializing Python runner (session: ${MDC(SESSION_ID, sessionId)}," +
+ log" pythonExec: ${MDC(PYTHON_EXEC, pythonExec)})")
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
@@ -95,7 +97,8 @@
val errMessage = PythonWorkerUtils.readUTF(dataIn)
throw streamingPythonRunnerInitializationFailure(resFromPython, errMessage)
}
- logInfo(s"Runner initialization succeeded (returned $resFromPython).")
+ logInfo(log"Runner initialization succeeded (returned" +
+ log" ${MDC(PYTHON_WORKER_RESPONSE, resFromPython)}).")
(dataOut, dataIn)
}
@@ -116,7 +119,8 @@
* Stops the Python worker.
*/
def stop(): Unit = {
- logInfo(s"Stopping streaming runner for sessionId: $sessionId, module: $workerModule.")
+ logInfo(log"Stopping streaming runner for sessionId: ${MDC(SESSION_ID, sessionId)}," +
+ log" module: ${MDC(PYTHON_WORKER_MODULE, workerModule)}.")
try {
pythonWorkerFactory.foreach { factory =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 50ce6f8..226a6dc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -164,11 +164,12 @@
// logs again when waitAppCompletion is set to true
if (!driverStatusReported) {
driverStatusReported = true
- logInfo(s"State of $submittedDriverID is ${state.get}")
+ logInfo(log"State of ${MDC(DRIVER_ID, submittedDriverID)}" +
+ log" is ${MDC(DRIVER_STATE, state.get)}")
// Worker node, if present
(workerId, workerHostPort, state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
- logInfo(s"Driver running on $hostPort ($id)")
+ logInfo(log"Driver running on ${MDC(HOST, hostPort)} (${MDC(WORKER_ID, id)})")
case _ =>
}
}
@@ -181,17 +182,18 @@
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
- logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
- s"exiting spark-submit JVM.")
+ logInfo(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" +
+ log" is ${MDC(DRIVER_STATE, state.get)}, exiting spark-submit JVM.")
System.exit(0)
case _ =>
if (!waitAppCompletion) {
- logInfo(s"spark-submit not configured to wait for completion, " +
- s"exiting spark-submit JVM.")
+ logInfo("spark-submit not configured to wait for completion, " +
+ " exiting spark-submit JVM.")
System.exit(0)
} else {
- logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
- s"continue monitoring driver status.")
+ logDebug(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" +
+ log" is ${MDC(DRIVER_STATE, state.get)}, " +
+ log"continue monitoring driver status.")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index a56fbd5..3ce5e2d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -23,7 +23,8 @@
import scala.jdk.CollectionConverters._
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.LogKeys.{AUTH_ENABLED, PORT, SHUFFLE_DB_BACKEND_KEY, SHUFFLE_DB_BACKEND_NAME}
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.network.TransportContext
import org.apache.spark.network.crypto.AuthServerBootstrap
@@ -86,8 +87,8 @@
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND)
val dbBackend = DBBackend.byName(shuffleDBName)
- logInfo(s"Use ${dbBackend.name()} as the implementation of " +
- s"${config.SHUFFLE_SERVICE_DB_BACKEND.key}")
+ logInfo(log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, dbBackend.name())} as the implementation of " +
+ log"${MDC(SHUFFLE_DB_BACKEND_KEY, config.SHUFFLE_SERVICE_DB_BACKEND.key)}")
new ExternalBlockHandler(conf,
findRegisteredExecutorsDBFile(dbBackend.fileName(registeredExecutorsDB)))
} else {
@@ -106,7 +107,8 @@
def start(): Unit = {
require(server == null, "Shuffle server already started")
val authEnabled = securityManager.isAuthenticationEnabled()
- logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)")
+ logInfo(log"Starting shuffle service on port ${MDC(PORT, port)}" +
+ log" (auth enabled = ${MDC(AUTH_ENABLED, authEnabled)})")
val bootstraps: Seq[TransportServerBootstrap] =
if (authEnabled) {
Seq(new AuthServerBootstrap(transportConf, securityManager))
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 4bd2de6..cc77765 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -173,7 +173,7 @@
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
- logInfo(s"Checkpointing took $checkpointDurationMs ms.")
+ logInfo(log"Checkpointing took ${MDC(TOTAL_TIME, checkpointDurationMs)} ms.")
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
@@ -220,7 +220,7 @@
} (catchBlock = {
val deleted = fs.delete(tempOutputPath, false)
if (!deleted) {
- logInfo(s"Failed to delete tempOutputPath $tempOutputPath.")
+ logInfo(log"Failed to delete tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}.")
}
}, finallyBlock = {
serializeStream.close()
@@ -228,12 +228,13 @@
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
- logInfo(s"Deleting tempOutputPath $tempOutputPath")
+ logInfo(log"Deleting tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}")
fs.delete(tempOutputPath, false)
throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), finalOutputPath)
} else {
// Some other copy of this task must've finished before us and renamed it
- logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
+ logInfo(log"Final output path" +
+ log" ${MDC(FINAL_OUTPUT_PATH, finalOutputPath)} already exists; not overwriting it")
if (!fs.delete(tempOutputPath, false)) {
logWarning(log"Error deleting ${MDC(PATH, tempOutputPath)}")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 0d1bc14..b468a38 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -23,7 +23,8 @@
import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{NEW_RDD_ID, RDD_CHECKPOINT_DIR, RDD_ID}
import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS
/**
@@ -66,7 +67,8 @@
}
}
- logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
+ logInfo(log"Done checkpointing RDD ${MDC(RDD_ID, rdd.id)}" +
+ log" to ${MDC(RDD_CHECKPOINT_DIR, cpDir)}, new parent is RDD ${MDC(NEW_RDD_ID, newRDD.id)}")
newRDD
}
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 3d0e477..f503be9 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -250,7 +250,8 @@
poolSize: Int = 200): ServerInfo = {
val stopTimeout = conf.get(UI_JETTY_STOP_TIMEOUT)
- logInfo(s"Start Jetty $hostName:$port for $serverName")
+ logInfo(log"Start Jetty ${MDC(HOST, hostName)}:${MDC(PORT, port)}" +
+ log" for ${MDC(SERVER_NAME, serverName)}")
// Start the server first, with no connectors.
val pool = new QueuedThreadPool(poolSize)
if (serverName.nonEmpty) {
@@ -558,7 +559,9 @@
*/
private def addFilters(handler: ServletContextHandler, securityMgr: SecurityManager): Unit = {
conf.get(UI_FILTERS).foreach { filter =>
- logInfo(s"Adding filter to ${handler.getContextPath()}: $filter")
+ logInfo(log"Adding filter to" +
+ log" ${MDC(SERVLET_CONTEXT_HANDLER_PATH, handler.getContextPath())}:" +
+ log" ${MDC(UI_FILTER, filter)}")
val oldParams = conf.getOption(s"spark.$filter.params").toSeq
.flatMap(Utils.stringToSeq)
.flatMap { param =>
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ac76f74..b8d422c 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -24,7 +24,7 @@
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.CLASS_NAME
+import org.apache.spark.internal.LogKeys.{CLASS_NAME, WEB_URL}
import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR
import org.apache.spark.internal.config.UI._
import org.apache.spark.scheduler._
@@ -164,7 +164,7 @@
/** Stop the server behind this web interface. Only valid after bind(). */
override def stop(): Unit = {
super.stop()
- logInfo(s"Stopped Spark web UI at $webUrl")
+ logInfo(log"Stopped Spark web UI at ${MDC(WEB_URL, webUrl)}")
}
override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = {
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 5149987..4f01cd6 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -27,7 +27,7 @@
import org.apache.spark.SparkEnv
import org.apache.spark.internal.{config, Logging, MDC}
-import org.apache.spark.internal.LogKeys.LISTENER
+import org.apache.spark.internal.LogKeys.{EVENT, LISTENER, TOTAL_TIME}
import org.apache.spark.scheduler.EventLoggingListener
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
@@ -132,8 +132,9 @@
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
- logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
- s"${elapsed / 1000000000d}s.")
+ logInfo(log"Process of event ${MDC(EVENT, redactEvent(event))} by" +
+ log"listener ${MDC(LISTENER, listenerName)} took " +
+ log"${MDC(TOTAL_TIME, elapsed / 1000000d)}ms.")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
index 098f8b6..b41166a 100644
--- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
@@ -80,7 +80,7 @@
action: => Boolean): Unit = synchronized {
try {
val handler = handlers.getOrElseUpdate(signal, {
- logInfo(s"Registering signal handler for $signal")
+ logInfo(log"Registering signal handler for ${MDC(SIGNAL, signal)}")
new ActionHandler(new Signal(signal))
})
handler.register(action)