[SPARK-35689][SS][3.0] Add log warn when keyWithIndexToValue returns null value
### What changes were proposed in this pull request?
This patch adds log warn when `keyWithIndexToValue` returns null value in `SymmetricHashJoinStateManager`.
This is the backport of #32828 to branch-3.0.
### Why are the changes needed?
Once we get null from state store in SymmetricHashJoinStateManager, it is better to add meaningful logging for the case. It is better for debugging.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #32890 from viirya/fix-ss-joinstatemanager-followup-3.0.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 05b3e17..8c2c244 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -24,7 +24,7 @@
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec}
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
import org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.KeyToValuePair
@@ -267,6 +267,10 @@
if (valuePairAtMaxIndex != null) {
keyWithIndexToValue.put(currentKey, index, valuePairAtMaxIndex.value,
valuePairAtMaxIndex.matched)
+ } else {
+ val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+ logWarning(s"`keyWithIndexToValue` returns a null value for index ${numValues - 1} " +
+ s"at current key $projectedKey.")
}
}
keyWithIndexToValue.remove(currentKey, numValues - 1)
@@ -280,6 +284,12 @@
}
}
+ // Unsafe row to internal row projection for key of `keyWithIndexToValue`.
+ lazy private val keyProjection = SafeProjection.create(keySchema)
+
+ /** Projects the key of unsafe row to internal row for printable log message. */
+ def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow = keyProjection(currentKey)
+
/** Commit all the changes to all the state stores */
def commit(): Unit = {
keyToNumValues.commit()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
index ce1eabe..830abce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming.state
+import java.sql.Timestamp
import java.util.UUID
import org.apache.hadoop.conf.Configuration
@@ -30,6 +31,7 @@
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter {
@@ -44,6 +46,28 @@
}
}
+ SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
+ test(s"SPARK-35689: StreamingJoinStateManager V${version} - " +
+ "printable key of keyWithIndexToValue") {
+
+ val keyExprs = Seq[Expression](
+ Literal(false),
+ Literal(10.0),
+ Literal("string"),
+ Literal(Timestamp.valueOf("2021-6-8 10:25:50")))
+ val keyGen = UnsafeProjection.create(keyExprs.map(_.dataType).toArray)
+
+ withJoinStateManager(inputValueAttribs, keyExprs, version) { manager =>
+ val currentKey = keyGen.apply(new GenericInternalRow(Array[Any](
+ false, 10.0, UTF8String.fromString("string"),
+ Timestamp.valueOf("2021-6-8 10:25:50").getTime)))
+
+ val projectedRow = manager.getInternalRowOfKeyWithIndex(currentKey)
+ assert(s"$projectedRow" == "[false,10.0,string,1623173150000]")
+ }
+ }
+ }
+
private def testAllOperations(stateFormatVersion: Int): Unit = {
withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) { manager =>
implicit val mgr = manager