fix broadcast bug
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ArrowBroadcastExchangeExec.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ArrowBroadcastExchangeExec.scala index 7b8f36a..388acb6 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ArrowBroadcastExchangeExec.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ArrowBroadcastExchangeExec.scala
@@ -73,15 +73,7 @@ getTagValue(ArrowBroadcastExchangeExec.nativeExecutionTag).getOrElse(false) } - private lazy val nonNativeBroadcastExec: BroadcastExchangeExec = { - BroadcastExchangeExec(mode, ConvertToUnsafeRowExec(child)) - } - - override lazy val runId: UUID = if (isNative) { - UUID.randomUUID() - } else { - nonNativeBroadcastExec.runId - } + override lazy val runId: UUID = UUID.randomUUID() override lazy val metrics: Map[String, SQLMetric] = NativeSupports.getDefaultNativeMetrics(sparkContext) ++ Map( @@ -131,13 +123,13 @@ val conf = SparkSession.getActiveSession.map(_.sqlContext.conf).orNull val timeout: Long = conf.broadcastTimeout try { - relationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]] + nativeRelationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]] } catch { case ex: TimeoutException => logError(s"Could not execute broadcast in $timeout secs.", ex) - if (!relationFuture.isDone) { + if (!nativeRelationFuture.isDone) { sparkContext.cancelJobGroup(runId.toString) - relationFuture.cancel(true) + nativeRelationFuture.cancel(true) } throw new SparkException("Native broadcast exchange timed out.", ex) } @@ -238,7 +230,12 @@ } @transient - lazy val nativeRelationFuture: Future[Broadcast[Array[InternalRow]]] = { + private lazy val nonNativeBroadcastExec: BroadcastExchangeExec = { + BroadcastExchangeExec(mode, ConvertToUnsafeRowExec(child)) + } + + @transient + private lazy val nativeRelationFuture: Future[Broadcast[Array[InternalRow]]] = { SQLExecution.withThreadLocalCaptured[Broadcast[Array[InternalRow]]]( sqlContext.sparkSession, BroadcastExchangeExec.executionContext) {