[SPARK-53961][SQL][TESTS] Fix `FileStreamSinkSuite` flakiness by using `walkFileTree` instead of `walk`
### What changes were proposed in this pull request?
This PR aims to fix `FileStreamSinkSuite` flakiness by using `walkFileTree` instead of `walk`.
### Why are the changes needed?
`Files.walk` is flaky like the following when the directory has a race condition. `walkFileTree` has more robust error handling.
https://github.com/apache/spark/blob/2bb73fbdeb19f0a972786d3ea33d3263bf84ab66/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala#L543-L547
```
[info] - cleanup complete but invalid output for aborted job *** FAILED *** (438 milliseconds)
     [info]   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: ***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output #output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet
     [info]   at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
     [info]   at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
     [info]   at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
     [info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
     [info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
     [info]   at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
     [info]   at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
     [info]   at scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480)
     [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
     [info]   at scala.collection.mutable.Growable.addAll(Growable.scala:61)
     [info]   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
     [info]   at scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
     [info]   at scala.collection.immutable.Set$.from(Set.scala:362)
     [info]   at scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
     [info]   at scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
     [info]   at scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
     [info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537)
```
### Does this PR introduce _any_ user-facing change?
No, this is a test case change.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52671 from dongjoon-hyun/SPARK-53961.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index b0aa71a..4c06a01 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -19,6 +19,7 @@
 
 import java.io.{File, IOException}
 import java.nio.file.{Files, Paths}
+import java.nio.file.attribute.BasicFileAttributes
 import java.util.Locale
 
 import scala.collection.mutable.ArrayBuffer
@@ -534,10 +535,25 @@
         }
 
         import PendingCommitFilesTrackingManifestFileCommitProtocol._
-        val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
-          .filter(_.toString.endsWith(".parquet"))
-          .map(_.getFileName.toString)
-          .toSet
+        import java.nio.file.{Path, _}
+        val outputFileNames = scala.collection.mutable.Set.empty[String]
+        Files.walkFileTree(
+          outputDir.toPath,
+          new SimpleFileVisitor[Path] {
+            override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
+              val fileName = file.getFileName.toString
+              if (fileName.endsWith(".parquet")) outputFileNames += fileName
+              FileVisitResult.CONTINUE
+            }
+            override def visitFileFailed(file: Path, exc: IOException): FileVisitResult = {
+              exc match {
+                case _: NoSuchFileException =>
+                  FileVisitResult.CONTINUE
+                case _ =>
+                  FileVisitResult.TERMINATE
+              }
+            }
+          })
         val trackingFileNames = tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet
 
         // there would be possible to have race condition: