[SPARK-35689][SS][3.0] Add log warn when keyWithIndexToValue returns null value

### What changes were proposed in this pull request?

This patch adds log warn when `keyWithIndexToValue` returns null value in `SymmetricHashJoinStateManager`.

This is the backport of #32828 to branch-3.0.

### Why are the changes needed?

Once we get null from state store in SymmetricHashJoinStateManager, it is better to add meaningful logging for the case. It is better for debugging.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #32890 from viirya/fix-ss-joinstatemanager-followup-3.0.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 05b3e17..8c2c244 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -24,7 +24,7 @@
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec}
 import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
 import org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.KeyToValuePair
@@ -267,6 +267,10 @@
           if (valuePairAtMaxIndex != null) {
             keyWithIndexToValue.put(currentKey, index, valuePairAtMaxIndex.value,
               valuePairAtMaxIndex.matched)
+          } else {
+            val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+            logWarning(s"`keyWithIndexToValue` returns a null value for index ${numValues - 1} " +
+              s"at current key $projectedKey.")
           }
         }
         keyWithIndexToValue.remove(currentKey, numValues - 1)
@@ -280,6 +284,12 @@
     }
   }
 
+  // Unsafe row to internal row projection for key of `keyWithIndexToValue`.
+  lazy private val keyProjection = SafeProjection.create(keySchema)
+
+  /** Projects the key of unsafe row to internal row for printable log message. */
+  def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow = keyProjection(currentKey)
+
   /** Commit all the changes to all the state stores */
   def commit(): Unit = {
     keyToNumValues.commit()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
index ce1eabe..830abce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import java.sql.Timestamp
 import java.util.UUID
 
 import org.apache.hadoop.conf.Configuration
@@ -30,6 +31,7 @@
 import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter {
 
@@ -44,6 +46,28 @@
     }
   }
 
+  SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
+    test(s"SPARK-35689: StreamingJoinStateManager V${version} - " +
+      "printable key of keyWithIndexToValue") {
+
+      val keyExprs = Seq[Expression](
+        Literal(false),
+        Literal(10.0),
+        Literal("string"),
+        Literal(Timestamp.valueOf("2021-6-8 10:25:50")))
+      val keyGen = UnsafeProjection.create(keyExprs.map(_.dataType).toArray)
+
+      withJoinStateManager(inputValueAttribs, keyExprs, version) { manager =>
+        val currentKey = keyGen.apply(new GenericInternalRow(Array[Any](
+          false, 10.0, UTF8String.fromString("string"),
+          Timestamp.valueOf("2021-6-8 10:25:50").getTime)))
+
+        val projectedRow = manager.getInternalRowOfKeyWithIndex(currentKey)
+        assert(s"$projectedRow" == "[false,10.0,string,1623173150000]")
+      }
+    }
+  }
+
   private def testAllOperations(stateFormatVersion: Int): Unit = {
     withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) { manager =>
       implicit val mgr = manager