[SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with variables to structured logging framework
### What changes were proposed in this pull request?
The pr aims to
1.migrate `error/warn/info` in module `core` with variables to `structured logging framework` for java side.
2.convert all dependencies on `org.slf4j.Logger & org.slf4j.LoggerFactory` to `org.apache.spark.internal.Logger & org.apache.spark.internal.LoggerFactory`, in order to completely `prohibit` importing `org.slf4j.Logger & org.slf4j.LoggerFactory` in java code later.
### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46390 from panbingkun/core_java_sl.
Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
diff --git a/common/utils/src/main/java/org/apache/spark/internal/Logger.java b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
index 2b4dd3b..d8ab264 100644
--- a/common/utils/src/main/java/org/apache/spark/internal/Logger.java
+++ b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
@@ -34,6 +34,10 @@
this.slf4jLogger = slf4jLogger;
}
+ public boolean isErrorEnabled() {
+ return slf4jLogger.isErrorEnabled();
+ }
+
public void error(String msg) {
slf4jLogger.error(msg);
}
@@ -58,6 +62,10 @@
}
}
+ public boolean isWarnEnabled() {
+ return slf4jLogger.isWarnEnabled();
+ }
+
public void warn(String msg) {
slf4jLogger.warn(msg);
}
@@ -82,6 +90,10 @@
}
}
+ public boolean isInfoEnabled() {
+ return slf4jLogger.isInfoEnabled();
+ }
+
public void info(String msg) {
slf4jLogger.info(msg);
}
@@ -106,6 +118,10 @@
}
}
+ public boolean isDebugEnabled() {
+ return slf4jLogger.isDebugEnabled();
+ }
+
public void debug(String msg) {
slf4jLogger.debug(msg);
}
@@ -126,6 +142,10 @@
slf4jLogger.debug(msg, throwable);
}
+ public boolean isTraceEnabled() {
+ return slf4jLogger.isTraceEnabled();
+ }
+
public void trace(String msg) {
slf4jLogger.trace(msg);
}
@@ -146,7 +166,6 @@
slf4jLogger.trace(msg, throwable);
}
-
private void withLogContext(
String pattern,
MDC[] mdcs,
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index d4e1d9f..c127f9c 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -168,6 +168,7 @@
case object EXCEPTION extends LogKey
case object EXECUTE_INFO extends LogKey
case object EXECUTE_KEY extends LogKey
+ case object EXECUTION_MEMORY_SIZE extends LogKey
case object EXECUTION_PLAN_LEAVES extends LogKey
case object EXECUTOR_BACKEND extends LogKey
case object EXECUTOR_DESIRED_COUNT extends LogKey
@@ -302,6 +303,7 @@
case object MAX_SLOTS extends LogKey
case object MAX_SPLIT_BYTES extends LogKey
case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey
+ case object MEMORY_CONSUMER extends LogKey
case object MEMORY_POOL_NAME extends LogKey
case object MEMORY_SIZE extends LogKey
case object MERGE_DIR_NAME extends LogKey
@@ -342,6 +344,7 @@
case object NUM_CONCURRENT_WRITER extends LogKey
case object NUM_CORES extends LogKey
case object NUM_DROPPED_PARTITIONS extends LogKey
+ case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
case object NUM_EVENTS extends LogKey
case object NUM_EXAMPLES extends LogKey
case object NUM_EXECUTOR_CORES extends LogKey
@@ -375,6 +378,8 @@
case object NUM_RIGHT_PARTITION_VALUES extends LogKey
case object NUM_SEQUENCES extends LogKey
case object NUM_SLOTS extends LogKey
+ case object NUM_SPILL_INFOS extends LogKey
+ case object NUM_SPILL_WRITERS extends LogKey
case object NUM_TASKS extends LogKey
case object NUM_TASK_CPUS extends LogKey
case object NUM_VERSIONS_RETAIN extends LogKey
@@ -394,6 +399,7 @@
case object OP_TYPE extends LogKey
case object OUTPUT extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
+ case object PAGE_SIZE extends LogKey
case object PARSE_MODE extends LogKey
case object PARTITIONED_FILE_READER extends LogKey
case object PARTITIONER extends LogKey
@@ -502,6 +508,7 @@
case object SOCKET_ADDRESS extends LogKey
case object SPARK_DATA_STREAM extends LogKey
case object SPARK_PLAN_ID extends LogKey
+ case object SPILL_TIMES extends LogKey
case object SQL_TEXT extends LogKey
case object SRC_PATH extends LogKey
case object STAGE_ATTEMPT extends LogKey
@@ -516,6 +523,7 @@
case object STORAGE_LEVEL extends LogKey
case object STORAGE_LEVEL_DESERIALIZED extends LogKey
case object STORAGE_LEVEL_REPLICATION extends LogKey
+ case object STORAGE_MEMORY_SIZE extends LogKey
case object STORE_ID extends LogKey
case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey
case object STREAMING_DATA_SOURCE_NAME extends LogKey
@@ -543,6 +551,7 @@
case object TEMP_PATH extends LogKey
case object TEST_SIZE extends LogKey
case object THREAD extends LogKey
+ case object THREAD_ID extends LogKey
case object THREAD_NAME extends LogKey
case object TID extends LogKey
case object TIME extends LogKey
diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
index 33dfa44..6d0bbc8 100644
--- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
+++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
@@ -13,13 +13,6 @@
*/
package org.apache.spark.io;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.apache.spark.util.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -30,6 +23,16 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
+import org.apache.spark.util.ThreadUtils;
/**
* {@link InputStream} implementation which asynchronously reads ahead from the underlying input
@@ -205,7 +208,7 @@
try {
underlyingInputStream.close();
} catch (IOException e) {
- logger.warn(e.getMessage(), e);
+ logger.warn("{}", e, MDC.of(LogKeys.ERROR$.MODULE$, e.getMessage()));
}
}
}
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 8335261..aeabd35 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -29,9 +29,11 @@
import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
@@ -244,10 +246,12 @@
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
- logger.error("error while calling spill() on " + consumerToSpill, e);
+ logger.error("error while calling spill() on {}", e,
+ MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
- logger.error("error while calling spill() on " + consumerToSpill, e);
+ logger.error("error while calling spill() on {}", e,
+ MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + consumerToSpill + " : "
+ e.getMessage());
@@ -270,24 +274,29 @@
* Dump the memory usage of all consumers.
*/
public void showMemoryUsage() {
- logger.info("Memory used in task " + taskAttemptId);
+ logger.info("Memory used in task {}",
+ MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
synchronized (this) {
long memoryAccountedForByConsumers = 0;
for (MemoryConsumer c: consumers) {
long totalMemUsage = c.getUsed();
memoryAccountedForByConsumers += totalMemUsage;
if (totalMemUsage > 0) {
- logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
+ logger.info("Acquired by {}: {}",
+ MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, c),
+ MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(totalMemUsage)));
}
}
long memoryNotAccountedFor =
memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
logger.info(
"{} bytes of memory were used by task {} but are not associated with specific consumers",
- memoryNotAccountedFor, taskAttemptId);
+ MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, memoryNotAccountedFor),
+ MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
logger.info(
"{} bytes of memory are used for execution and {} bytes of memory are used for storage",
- memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed());
+ MDC.of(LogKeys.EXECUTION_MEMORY_SIZE$.MODULE$, memoryManager.executionMemoryUsed()),
+ MDC.of(LogKeys.STORAGE_MEMORY_SIZE$.MODULE$, memoryManager.storageMemoryUsed()));
}
}
@@ -333,7 +342,8 @@
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
- logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
+ logger.warn("Failed to allocate a page ({} bytes), try again.",
+ MDC.of(LogKeys.PAGE_SIZE$.MODULE$, acquired));
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index d067c87..284d1dd 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -33,9 +33,11 @@
import scala.collection.Iterator;
import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
@@ -223,7 +225,8 @@
writePartitionedDataWithStream(file, writer);
}
if (!file.delete()) {
- logger.error("Unable to delete file for partition {}", i);
+ logger.error("Unable to delete file for partition {}",
+ MDC.of(LogKeys.PARTITION_ID$.MODULE$, i));
}
}
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index b097089..8fe432c 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -23,17 +23,19 @@
import java.util.LinkedList;
import java.util.zip.Checksum;
-import org.apache.spark.SparkException;
import scala.Tuple2;
import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.internal.config.package$;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
@@ -159,11 +161,11 @@
if (!isFinalFile) {
logger.info(
"Task {} on Thread {} spilling sort data of {} to disk ({} {} so far)",
- taskContext.taskAttemptId(),
- Thread.currentThread().getId(),
- Utils.bytesToString(getMemoryUsage()),
- spills.size(),
- spills.size() != 1 ? " times" : " time");
+ MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskContext.taskAttemptId()),
+ MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
+ MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(getMemoryUsage())),
+ MDC.of(LogKeys.NUM_SPILL_INFOS$.MODULE$, spills.size()),
+ MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spills.size() != 1 ? "times" : "time"));
}
// This call performs the actual sort.
@@ -349,7 +351,8 @@
}
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
- logger.error("Unable to delete spill file {}", spill.file.getPath());
+ logger.error("Unable to delete spill file {}",
+ MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath()));
}
}
}
@@ -416,8 +419,8 @@
// for tests
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
- logger.info("Spilling data because number of spilledRecords crossed the threshold " +
- numElementsForSpillThreshold);
+ logger.info("Spilling data because number of spilledRecords crossed the threshold {}" +
+ MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold));
spill();
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index f5949d6..6da9d3d 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -35,12 +35,14 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.spark.*;
import org.apache.spark.annotation.Private;
import org.apache.spark.internal.config.package$;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.NioBufferedFileInputStream;
@@ -226,7 +228,8 @@
sorter = null;
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
- logger.error("Error while deleting spill file {}", spill.file.getPath());
+ logger.error("Error while deleting spill file {}",
+ MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath()));
}
}
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index efe508d..fbf4abc 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -26,10 +26,11 @@
import java.nio.channels.WritableByteChannel;
import java.util.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.spark.SparkConf;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
@@ -123,7 +124,8 @@
public void abort(Throwable error) throws IOException {
cleanUp();
if (outputTempFile != null && outputTempFile.exists() && !outputTempFile.delete()) {
- log.warn("Failed to delete temporary shuffle file at {}", outputTempFile.getAbsolutePath());
+ log.warn("Failed to delete temporary shuffle file at {}",
+ MDC.of(LogKeys.PATH$.MODULE$, outputTempFile.getAbsolutePath()));
}
}
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 35c5efc..3506e2a 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -25,11 +25,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.spark.SparkEnv;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.MDC;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
@@ -392,7 +394,8 @@
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
- logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
+ logger.error("Was unable to delete spill file {}",
+ MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
}
}
}
@@ -893,7 +896,8 @@
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
- logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
+ logger.error("Was unable to delete spill file {}",
+ MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
}
}
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 2f9e1a9..0be312d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -28,11 +28,13 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.MDC;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
@@ -217,10 +219,10 @@
}
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
- Thread.currentThread().getId(),
- Utils.bytesToString(getMemoryUsage()),
- spillWriters.size(),
- spillWriters.size() > 1 ? " times" : " time");
+ MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
+ MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(getMemoryUsage())),
+ MDC.of(LogKeys.NUM_SPILL_WRITERS$.MODULE$, spillWriters.size()),
+ MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spillWriters.size() > 1 ? "times" : "time"));
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
@@ -335,7 +337,8 @@
File file = spill.getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
- logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
+ logger.error("Was unable to delete spill file {}",
+ MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
}
}
}
@@ -476,8 +479,8 @@
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
- logger.info("Spilling data because number of spilledRecords crossed the threshold " +
- numElementsForSpillThreshold);
+ logger.info("Spilling data because number of spilledRecords crossed the threshold {}",
+ MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold));
spill();
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index cf29835..4eff6a7 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -23,13 +23,13 @@
import org.apache.spark.TaskContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.internal.config.ConfigEntry;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.io.NioBufferedFileInputStream;
import org.apache.spark.io.ReadAheadInputStream;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.unsafe.Platform;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.*;