| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.codegen.agg |
| |
| import java.lang.{Long => JLong} |
| import org.apache.calcite.tools.RelBuilder |
| import org.apache.flink.table.api.scala._ |
| import org.apache.flink.table.api.TableException |
| import org.apache.flink.table.api.dataview.MapView |
| import org.apache.flink.table.api.types.{DataType, InternalType, RowType} |
| import org.apache.flink.table.codegen.CodeGenUtils._ |
| import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator._ |
| import org.apache.flink.table.codegen.CodeGenUtils.newName |
| import org.apache.flink.table.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression} |
| import org.apache.flink.table.codegen.GeneratedExpression._ |
| import org.apache.flink.table.dataformat.GenericRow |
| import org.apache.flink.table.expressions.Expression |
| import org.apache.flink.table.plan.util.DistinctInfo |
| import org.apache.flink.util.Preconditions |
| import org.apache.flink.util.Preconditions.checkArgument |
| |
| /** |
| * It is for code generate distinct aggregate. The distinct aggregate buffer is a MapView which |
| * is used to store the unique keys and the frequency of appearance. When a key is been seen the |
| * first time, we will trigger the inner aggregate function's accumulate() function. |
| * |
| * @param ctx the code gen context |
| * @param distinctInfo the distinct information |
| * @param distinctIndex the index of this distinct in all distincts |
| * @param innerAggCodeGens the code generator of inner aggregate |
| * @param filterExpressions filter argument access expression, none if no filter |
| * @param mergedAccOffset the mergedAcc may come from local aggregate, |
| * this is the first buffer offset in the row |
| * @param aggBufferOffset the offset in the buffers of this aggregate |
| * @param aggBufferSize the total size of aggregate buffers |
| * @param hasNamespace whether the accumulators state has namespace |
| * @param mergedAccOnHeap whether the merged accumulator is on heap, otherwise is on state |
| * @param consumeRetraction whether the distinct consumes retraction |
| * @param inputFieldCopy copy input field element if true (only mutable type will be copied) |
| * @param relBuilder the rel builder to translate expressions to calcite rex nodes |
| */ |
| class DistinctAggCodeGen( |
| ctx: CodeGeneratorContext, |
| distinctInfo: DistinctInfo, |
| distinctIndex: Int, |
| innerAggCodeGens: Array[AggCodeGen], |
| filterExpressions: Array[Option[Expression]], |
| mergedAccOffset: Int, |
| aggBufferOffset: Int, |
| aggBufferSize: Int, |
| hasNamespace: Boolean, |
| needMerge: Boolean, |
| mergedAccOnHeap: Boolean, |
| consumeRetraction: Boolean, |
| inputFieldCopy: Boolean, |
| relBuilder: RelBuilder) extends AggCodeGen { |
| |
| val MAP_VIEW: String = className[MapView[_, _]] |
| val MAP_ENTRY: String = className[java.util.Map.Entry[_, _]] |
| val ITERABLE: String = className[java.lang.Iterable[_]] |
| |
| val aggCount: Int = innerAggCodeGens.length |
| val externalAccType: DataType = distinctInfo.accType |
| val keyType: DataType = distinctInfo.keyType |
| val keyTypeTerm: String = keyType.getTypeClass.getCanonicalName |
| val distinctAccTerm: String = s"distinct_view_$distinctIndex" |
| val distinctBackupAccTerm: String = s"distinct_backup_view_$distinctIndex" |
| |
| val isValueChangedTerm: String = s"is_distinct_value_changed_$distinctIndex" |
| val isValueEmptyTerm: String = s"is_distinct_value_empty_$distinctIndex" |
| val valueGenerator: DistinctValueGenerator = createDistinctValueGenerator() |
| |
| addReusableDistinctAccumulator() |
| |
| /** |
| * Add the distinct accumulator to the member variable and open close methods. |
| */ |
| private def addReusableDistinctAccumulator(): Unit = { |
| // sanity check |
| if (distinctInfo.excludeAcc) { |
| // it only works in incremental mode when the distinct acc is excluded |
| // the distinct mapview must works on state mode when incremental mode |
| Preconditions.checkState(distinctInfo.dataViewSpec.nonEmpty) |
| } |
| |
| val enableBackupDataView = needMerge && !mergedAccOnHeap |
| |
| // add state mapview to member field |
| addReusableStateDataViews( |
| ctx, |
| distinctInfo.dataViewSpec.toArray, |
| hasNamespace, |
| enableBackupDataView) |
| |
| |
| // add distinctAccTerm to member field |
| ctx.addReusableMember(s"private $MAP_VIEW $distinctAccTerm;") |
| if (enableBackupDataView) { |
| ctx.addReusableMember(s"private $MAP_VIEW $distinctBackupAccTerm;") |
| } |
| |
| // when dataview works on state, assign the stateDataView to accTerm in open method |
| distinctInfo.dataViewSpec match { |
| case Some(spec) => |
| val dataviewTerm = createDataViewTerm(spec) |
| ctx.addReusableOpenStatement(s"$distinctAccTerm = $dataviewTerm;") |
| if (enableBackupDataView) { |
| val dataviewBackupTerm = createDataViewBackupTerm(spec) |
| ctx.addReusableOpenStatement(s"$distinctBackupAccTerm = $dataviewBackupTerm;") |
| } |
| case None => // do nothing |
| } |
| } |
| |
| override def createAccumulator(generator: ExprCodeGenerator): Seq[GeneratedExpression] = { |
| if (distinctInfo.excludeAcc) { |
| // when the distinct acc is excluded, no need to create distinct accumulator |
| Seq() |
| } else { |
| val accTerm = newName("distinct_acc") |
| val code = s"$MAP_VIEW $accTerm = new $MAP_VIEW();" |
| Seq(GeneratedExpression(accTerm, NEVER_NULL, code, externalAccType.toInternalType)) |
| } |
| } |
| |
| override def setAccumulator(generator: ExprCodeGenerator): String = { |
| generateAccumulatorAccess( |
| ctx, |
| generator.input1Type, |
| generator.input1Term, |
| aggBufferOffset, |
| useStateDataView = true, |
| useBackupDataView = false) |
| // return empty because the access code is set in ctx's ReusableInputUnboxingExprs |
| "" |
| } |
| |
| override def resetAccumulator(generator: ExprCodeGenerator): String = { |
| if (distinctInfo.excludeAcc) { |
| "" |
| } else { |
| s"$distinctAccTerm.clear();" |
| } |
| } |
| |
| override def getAccumulator(generator: ExprCodeGenerator): Seq[GeneratedExpression] = { |
| if (distinctInfo.excludeAcc) { |
| // when the distinct acc is excluded, the accumulator result shouldn't include distinct acc |
| Seq() |
| } else { |
| Seq(GeneratedExpression( |
| distinctAccTerm, |
| NEVER_NULL, |
| NO_CODE, |
| externalAccType.toInternalType)) |
| } |
| } |
| |
| override def accumulate(generator: ExprCodeGenerator): String = { |
| val keyExpr = generateKeyExpression(ctx, generator) |
| val key = keyExpr.resultTerm |
| val accumulateCode = innerAggCodeGens.map(_.accumulate(generator)) |
| val valueTerm = newName("value") |
| val valueTypeTerm = valueGenerator.valueTypeTerm |
| val filterResults = filterExpressions.map { |
| case None => None |
| case Some(f) => Some(generator.generateExpression(f.toRexNode(relBuilder)).resultTerm) |
| } |
| |
| val head = |
| s""" |
| |${keyExpr.code} |
| |$valueTypeTerm $valueTerm = ($valueTypeTerm) $distinctAccTerm.get($key); |
| |if ($valueTerm == null) { |
| | $valueTerm = ${valueGenerator.initialValue}; |
| |} |
| """.stripMargin |
| |
| val body = if (consumeRetraction) { |
| // input contains retraction, due to local/global, the value might be empty, and need remove |
| s""" |
| |$head |
| |boolean $isValueEmptyTerm = true; |
| |${valueGenerator.foreachAccumulate(valueTerm, accumulateCode, filterResults)} |
| |if ($isValueEmptyTerm) { |
| | $distinctAccTerm.remove($key); |
| |} else { |
| | $distinctAccTerm.put($key, $valueTerm); |
| |} |
| """.stripMargin |
| } else { |
| // input contains only append messages, update value only when value changed |
| s""" |
| |$head |
| |boolean $isValueChangedTerm = false; |
| |${valueGenerator.foreachAccumulate(valueTerm, accumulateCode, filterResults)} |
| |if ($isValueChangedTerm) { |
| | $distinctAccTerm.put($key, $valueTerm); |
| |} |
| """.stripMargin |
| } |
| |
| if (filterResults.exists(_.isDefined)) { |
| val condition = filterResults.flatten.mkString(" || ") |
| s""" |
| |if ($condition) { |
| | $body |
| |} |
| """.stripMargin |
| } else { |
| body |
| } |
| } |
| |
| override def retract(generator: ExprCodeGenerator): String = { |
| if (!consumeRetraction) { |
| throw new TableException("This should never happen, please file a issue.") |
| } |
| val keyExpr = generateKeyExpression(ctx, generator) |
| val key = keyExpr.resultTerm |
| val retractCodes = innerAggCodeGens.map(_.retract(generator)) |
| val valueTerm = newName("value") |
| val valueTypeTerm = valueGenerator.valueTypeTerm |
| val filterResults = filterExpressions.map { |
| case None => None |
| case Some(f) => Some(generator.generateExpression(f.toRexNode(relBuilder)).resultTerm) |
| } |
| |
| val head = |
| s""" |
| |${keyExpr.code} |
| |$valueTypeTerm $valueTerm = ($valueTypeTerm) $distinctAccTerm.get($key); |
| |if ($valueTerm == null) { |
| | $valueTerm = ${valueGenerator.initialValue}; |
| |} |
| """.stripMargin |
| |
| val body = |
| s""" |
| |$head |
| |boolean $isValueEmptyTerm = true; |
| |${valueGenerator.foreachRetract(valueTerm, retractCodes, filterResults)} |
| |if ($isValueEmptyTerm) { |
| | $distinctAccTerm.remove($key); |
| |} else { |
| | $distinctAccTerm.put($key, $valueTerm); |
| |} |
| """.stripMargin |
| |
| if (filterResults.exists(_.isDefined)) { |
| val condition = filterResults.flatten.mkString(" || ") |
| s""" |
| |if ($condition) { |
| | $body |
| |} |
| """.stripMargin |
| } else { |
| body |
| } |
| } |
| |
| override def merge(generator: ExprCodeGenerator): String = { |
| // generate other MapView acc field |
| val otherAccExpr = generateAccumulatorAccess( |
| ctx, |
| generator.input1Type, |
| generator.input1Term, |
| mergedAccOffset + aggBufferOffset, |
| useStateDataView = !mergedAccOnHeap, |
| useBackupDataView = true) |
| |
| val keyTerm = newName(DISTINCT_KEY_TERM) |
| val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL, nullCheck = true) |
| .bindInput(keyType.toInternalType, inputTerm = keyTerm) |
| val accumulateCodes = innerAggCodeGens.map(_.accumulate(exprGenerator)) |
| val retractCodes = if (consumeRetraction) { |
| innerAggCodeGens.map(_.retract(exprGenerator)) |
| } else { |
| innerAggCodeGens.map(_ => |
| "throw new RuntimeException(\"This distinct aggregate do not consume" + |
| " retractions, " + |
| "but received retract message, which should never happen.\");") |
| } |
| |
| val otherAccTerm = otherAccExpr.resultTerm |
| val otherEntries = newName("otherEntries") |
| val valueTypeTerm = valueGenerator.valueTypeTerm |
| val thisValue = "thisValue" |
| val otherValue = "otherValue" |
| |
| s""" |
| |$ITERABLE<$MAP_ENTRY> $otherEntries = ($ITERABLE<$MAP_ENTRY>) $otherAccTerm.entries(); |
| |if ($otherEntries != null) { |
| | for ($MAP_ENTRY entry: $otherEntries) { |
| | $keyTypeTerm $keyTerm = ($keyTypeTerm) entry.getKey(); |
| | ${ctx.reuseInputUnboxingCode(Set(keyTerm))} |
| | $valueTypeTerm $otherValue = ($valueTypeTerm) entry.getValue(); |
| | $valueTypeTerm $thisValue = ($valueTypeTerm) $distinctAccTerm.get($keyTerm); |
| | if ($thisValue == null) { |
| | $thisValue = ${valueGenerator.initialValue}; |
| | } |
| | boolean $isValueChangedTerm = false; |
| | boolean $isValueEmptyTerm = false; |
| | ${valueGenerator.foreachMerge(thisValue, otherValue, accumulateCodes, retractCodes)} |
| | if ($isValueEmptyTerm) { |
| | $distinctAccTerm.remove($keyTerm); |
| | } else if ($isValueChangedTerm) { // value is not empty and is changed, do update |
| | $distinctAccTerm.put($keyTerm, $thisValue); |
| | } |
| | } // end foreach |
| |} // end otherEntries != null |
| """.stripMargin |
| } |
| |
| override def getValue(generator: ExprCodeGenerator): GeneratedExpression = { |
| throw new TableException( |
| "Distinct shouldn't return result value, this is a bug, please file a issue.") |
| } |
| |
| override def checkNeededMethods( |
| needAccumulate: Boolean, |
| needRetract: Boolean, |
| needMerge: Boolean, |
| needReset: Boolean): Unit = { |
| if (needMerge) { |
| // see merge method for more information |
| innerAggCodeGens |
| .foreach(_.checkNeededMethods(needAccumulate = true, needRetract = consumeRetraction)) |
| } else { |
| innerAggCodeGens |
| .foreach(_.checkNeededMethods(needAccumulate, needRetract, needMerge, needReset)) |
| } |
| } |
| |
| |
| private def generateKeyExpression( |
| ctx: CodeGeneratorContext, |
| generator: ExprCodeGenerator): GeneratedExpression = { |
| val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess( |
| ctx, |
| generator.input1Type, |
| generator.input1Term, |
| _, |
| nullableInput = false, |
| nullCheck = true).copyResultIfNeeded(ctx, inputFieldCopy)) |
| |
| // the key expression of MapView |
| if (fieldExprs.length > 1) { |
| val keyTerm = newName(DISTINCT_KEY_TERM) |
| val valueType = new RowType( |
| fieldExprs.map(_.resultType): _*) |
| |
| // always create a new result row |
| generator.generateResultExpression( |
| fieldExprs, |
| valueType, |
| classOf[GenericRow], |
| outRow = keyTerm, |
| reusedOutRow = false) |
| } else { |
| val fieldExpr = fieldExprs.head |
| val keyTerm = newName(DISTINCT_KEY_TERM) |
| val bType = boxedTypeTermForType(fieldExpr.resultType) |
| val code = |
| s""" |
| |${fieldExpr.code} |
| |$bType $keyTerm = ($bType) ${fieldExpr.resultTerm}; |
| |if (${fieldExpr.nullTerm}) { |
| | $keyTerm = null; |
| |} |
| """.stripMargin |
| GeneratedExpression(keyTerm, fieldExpr.nullTerm, code, fieldExpr.resultType) |
| } |
| } |
| |
| /** |
| * This method is mainly the same as CodeGenUtils.generateFieldAccess(), the only difference is |
| * that this method using UpdatableRow to wrap BaseRow to handle DataViews. |
| */ |
| private def generateAccumulatorAccess( |
| ctx: CodeGeneratorContext, |
| inputType: InternalType, |
| inputTerm: String, |
| index: Int, |
| useStateDataView: Boolean, |
| useBackupDataView: Boolean, |
| nullableInput: Boolean = false, |
| nullCheck: Boolean = true): GeneratedExpression = { |
| |
| // if input has been used before, we can reuse the code that |
| // has already been generated |
| val inputExpr = ctx.getReusableInputUnboxingExprs(inputTerm, index) match { |
| // input access and unboxing has already been generated |
| case Some(expr) => expr |
| |
| // generate input access and unboxing if necessary |
| case None => |
| |
| val expr = if (distinctInfo.dataViewSpec.nonEmpty && useStateDataView) { |
| ctx.addAllReusableFields(Set(s"$BASE_ROW $CURRENT_KEY = ctx.currentKey();")) |
| val spec = distinctInfo.dataViewSpec.get |
| val dataViewTerm = if (useBackupDataView) { |
| createDataViewBackupTerm(spec) |
| } else { |
| createDataViewTerm(spec) |
| } |
| val resultTerm = if (useBackupDataView) { |
| distinctBackupAccTerm |
| } else { |
| distinctAccTerm |
| } |
| |
| val code = if (hasNamespace) { |
| val expr = generateFieldAccess(ctx, inputType, inputTerm, index, nullCheck = true) |
| s""" |
| |// when namespace is null, the dataview is used in heap, no key and namespace set |
| |if ($NAMESPACE_TERM != null) { |
| | $dataViewTerm.setCurrentKey($CURRENT_KEY); |
| | $dataViewTerm.setCurrentNamespace($NAMESPACE_TERM); |
| | $resultTerm = $dataViewTerm; |
| |} else { |
| | ${expr.code} |
| | $resultTerm = ${expr.resultTerm}; |
| |} |
| """.stripMargin |
| } else { |
| s""" |
| |$dataViewTerm.setCurrentKey($CURRENT_KEY); |
| |$resultTerm = $dataViewTerm; |
| """.stripMargin |
| } |
| GeneratedExpression(resultTerm, NEVER_NULL, code, externalAccType.toInternalType) |
| } else { |
| val expr = generateFieldAccess(ctx, inputType, inputTerm, index, nullCheck = true) |
| if (useBackupDataView) { |
| // this is called in the merge method |
| expr |
| } else { |
| val code = |
| s""" |
| |${expr.code} |
| |$distinctAccTerm = ${expr.resultTerm}; |
| """.stripMargin |
| GeneratedExpression(distinctAccTerm, NEVER_NULL, code, expr.resultType) |
| } |
| } |
| |
| ctx.addReusableInputUnboxingExprs(inputTerm, index, expr) |
| expr |
| } |
| // hide the generated code as it will be executed only once |
| GeneratedExpression(inputExpr.resultTerm, inputExpr.nullTerm, "", inputExpr.resultType) |
| } |
| |
| // ---------------------------- Distinct Value Code Generator --------------------------- |
| |
| /** |
| * The [[DistinctValueGenerator]] is an abstraction for generating codes about the |
| * distinct value. |
| * |
| * The value of distinct state maybe long or long[] depends on the input stream |
| * and the number of the distinct aggregates. |
| * |
| * 1. when the input is not a retraction stream, and the distinct agg number <= 64, |
| * then long is used as the value, each bit indicate whether each condition is satisfied. |
| * |
| * 2. when the input is not a retraction stream, and the distinct agg number > 64, |
| * then long[] is used as the value, each bit indicate whether each condition is satisfied. |
| * |
| * 3. when the input is a retraction stream, and the distinct agg number == 1, |
| * then long is used as the value, the long indicates the number of elements |
| * satisfy the aggregate condition. |
| * |
| * 4. when the input is a retraction stream, and the distinct agg number > 1, |
| * then long[] is used as the value, each long indicate the number of elements |
| * * satisfy each aggregate condition. |
| */ |
| trait DistinctValueGenerator { |
| /** the type of value of distinct state */ |
| def valueTypeTerm: String |
| |
| /** the default value of value of distinct state */ |
| def initialValue: String |
| |
| /** Accumulate the value of distinct state, |
| * and generates each accumulate codes for every aggregates. */ |
| def foreachAccumulate( |
| valueTerm: String, |
| innerAccumulateCodes: Array[String], |
| filterResults: Array[Option[String]]): String |
| |
| /** |
| * Retract the value of distinct state, |
| * and generates each retract codes for every aggregates. */ |
| def foreachRetract( |
| valueTerm: String, |
| innerRetractCodes: Array[String], |
| filterResults: Array[Option[String]]): String |
| |
| /** Merge the value of distinct state, |
| * and generates accumulate/retract codes when needed. */ |
| def foreachMerge( |
| thisValueTerm: String, |
| otherValueTerm: String, |
| innerAccumulateCodes: Array[String], |
| innerRetractCodes: Array[String]): String |
| } |
| |
| /** Create a [[DistinctValueGenerator]] instance for current [[DistinctAggCodeGen]] */ |
| private def createDistinctValueGenerator(): DistinctValueGenerator = { |
| if (!consumeRetraction) { |
| if (aggCount <= JLong.SIZE) { |
| new LongValueWithoutRetractionGenerator |
| } else { |
| new LongArrayValueWithoutRetractionGenerator |
| } |
| } else { |
| if (aggCount <= 1) { |
| new LongValueWithRetractionGenerator |
| } else { |
| new LongArrayValueWithRetractionGenerator |
| } |
| } |
| } |
| |
| /** The generator used in non-retraction stream and number of aggregate <= 64 */ |
| class LongValueWithoutRetractionGenerator extends DistinctValueGenerator { |
| checkArgument(aggCount <= JLong.SIZE) |
| |
| override def valueTypeTerm: String = "java.lang.Long" |
| |
| override def initialValue: String = "0L" |
| |
| override def foreachAccumulate( |
| valueTerm: String, |
| innerAccumulateCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| |
| val codes = for (index <- filterResults.indices) yield { |
| val existedTerm = newName("existed") |
| val code = |
| s""" |
| |long $existedTerm = ((long) $valueTerm) & (1L << $index); |
| |if ($existedTerm == 0) { // not existed |
| | $valueTerm = ((long) $valueTerm) | (1L << $index); |
| | $isValueChangedTerm = true; |
| | ${innerAccumulateCodes(index)} |
| |} |
| """.stripMargin |
| filterResults(index) match { |
| case None => code |
| case Some(f) => |
| s""" |
| |if ($f) { |
| | $code |
| |} |
| """.stripMargin |
| } |
| } |
| |
| codes.mkString("\n") |
| } |
| |
| override def foreachRetract( |
| valueTerm: String, |
| innerRetractCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| throw new TableException("LongValueAppendGenerator do not support retract, " + |
| "this method should never be called, please file a issue.") |
| } |
| |
| override def foreachMerge( |
| thisValueTerm: String, |
| otherValueTerm: String, |
| innerAccumulateCodes: Array[String], |
| innerRetractCodes: Array[String] /* retract code is not used here */ ): String = { |
| |
| val codes = for (index <- innerAccumulateCodes.indices) yield { |
| val existedTerm = newName("existed") |
| s""" |
| |long $existedTerm = ((long) $thisValueTerm) & (1L << $index); |
| |if ($existedTerm == 0) { // not existed |
| | long otherExisted = ((long) $otherValueTerm) & (1L << $index); |
| | if (otherExisted != 0) { // existed in other |
| | $isValueChangedTerm = true; |
| | // do accumulate |
| | ${innerAccumulateCodes(index)} |
| | } |
| |} |
| """.stripMargin |
| } |
| |
| s""" |
| |${codes.mkString("\n")} |
| |$thisValueTerm = ((long) $thisValueTerm) | ((long) $otherValueTerm); |
| |$isValueEmptyTerm = false; |
| """.stripMargin |
| } |
| } |
| |
| /** The generator used in non-retraction stream and number of aggregate > 64 */ |
| class LongArrayValueWithoutRetractionGenerator extends DistinctValueGenerator { |
| checkArgument(aggCount > JLong.SIZE) |
| |
| override def valueTypeTerm: String = "long[]" |
| |
| override def initialValue: String = s"new long[${aggCount / JLong.SIZE + 1}]" |
| |
| override def foreachAccumulate( |
| valueTerm: String, |
| innerAccumulateCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| |
| val codes = for (index <- filterResults.indices) yield { |
| val existedTerm = newName("existed") |
| val arrayIndex = index / JLong.SIZE |
| val bitIndex = index % JLong.SIZE |
| val code = |
| s""" |
| |long $existedTerm = $valueTerm[$arrayIndex] & (1L << $bitIndex); |
| |if ($existedTerm == 0) { // not existed |
| | $isValueChangedTerm = true; |
| | $valueTerm[$arrayIndex] = $valueTerm[$arrayIndex] | (1L << $bitIndex); |
| | ${innerAccumulateCodes(index)} |
| |} |
| """.stripMargin |
| filterResults(index) match { |
| case None => code |
| case Some(f) => |
| s""" |
| |if ($f) { |
| | $code |
| |} |
| """.stripMargin |
| } |
| } |
| |
| codes.mkString("\n") |
| } |
| |
| override def foreachRetract( |
| valueTerm: String, |
| innerRetractCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| throw new TableException("LongArrayValueAppendGenerator do not support retract, " + |
| "this method should never be called, please file a issue.") |
| } |
| |
| override def foreachMerge( |
| thisValueTerm: String, |
| otherValueTerm: String, |
| innerAccumulateCodes: Array[String], |
| innerRetractCodes: Array[String]): String = { |
| val codes = for (index <- innerAccumulateCodes.indices) yield { |
| val existedTerm = newName("thisExisted") |
| val arrayIndex = index / JLong.SIZE |
| val bitIndex = index % JLong.SIZE |
| s""" |
| |long $existedTerm = $thisValueTerm[$arrayIndex] & (1L << $bitIndex); |
| |if ($existedTerm == 0) { // not existed |
| | long otherExisted = $otherValueTerm[$arrayIndex] & (1L << $bitIndex); |
| | if (otherExisted != 0) { // existed in other |
| | $isValueChangedTerm = true; |
| | // do accumulate |
| | ${innerAccumulateCodes(index)} |
| | } |
| |} |
| """.stripMargin |
| } |
| |
| val setValueCodes = for (index <- 0 until (aggCount / JLong.SIZE + 1)) yield { |
| s"$thisValueTerm[$index] |= $otherValueTerm[$index];" |
| } |
| |
| s""" |
| |${codes.mkString("\n")} |
| |${setValueCodes.mkString("\n")} |
| |$isValueEmptyTerm = false; |
| """.stripMargin |
| } |
| } |
| |
| /** The generator used in retraction stream and only one aggregate */ |
| class LongValueWithRetractionGenerator extends DistinctValueGenerator { |
| checkArgument(aggCount == 1) |
| |
| override def valueTypeTerm: String = "java.lang.Long" |
| |
| override def initialValue: String = "0L" |
| |
| override def foreachAccumulate( |
| countTerm: String, |
| innerAccumulateCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| foreachAction(isAccumulate = true, countTerm, innerAccumulateCodes, filterResults) |
| } |
| |
| override def foreachRetract( |
| countTerm: String, |
| innerRetractCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| foreachAction(isAccumulate = false, countTerm, innerRetractCodes, filterResults) |
| } |
| |
| private def foreachAction( |
| isAccumulate: Boolean, |
| countTerm: String, |
| innerCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| |
| val code = if (isAccumulate) { |
| s""" |
| |$countTerm += 1; |
| |if ($countTerm == 1) { // cnt is 0 before |
| | ${innerCodes.head} |
| |} |
| """.stripMargin |
| } else { |
| s""" |
| |$countTerm -= 1; |
| |if ($countTerm == 0) { // cnt is +1 before |
| | ${innerCodes.head} |
| |} |
| """.stripMargin |
| } |
| |
| filterResults.head match { |
| case None => |
| s""" |
| |$code |
| |$isValueEmptyTerm = $countTerm == 0L; |
| """.stripMargin |
| case Some(f) => |
| s""" |
| |if ($f) { |
| | $code |
| |} |
| |$isValueEmptyTerm = $countTerm == 0L; |
| """.stripMargin |
| } |
| } |
| |
| override def foreachMerge( |
| thisCountTerm: String, |
| otherCountTerm: String, |
| innerAccumulateCodes: Array[String], |
| innerRetractCodes: Array[String]): String = { |
| |
| val mergedCntTerm = newName("mergedCnt") |
| s""" |
| |long $mergedCntTerm = $thisCountTerm + $otherCountTerm; |
| |if ($mergedCntTerm == 0) { |
| | $isValueEmptyTerm = true; |
| | if ($thisCountTerm > 0) { |
| | // origin is > 0, and retract to 0, do retract |
| | ${innerRetractCodes.mkString("\n")} |
| | } |
| |} else if ($mergedCntTerm < 0) { |
| | if ($thisCountTerm > 0) { |
| | // origin is > 0, and retract to < 0, do retract |
| | ${innerRetractCodes.mkString("\n")} |
| | } |
| |} else if ($mergedCntTerm > 0) { |
| | if ($thisCountTerm <= 0) { |
| | // origin is <= 0, and accumulate to > 0, do accumulate |
| | ${innerAccumulateCodes.mkString("\n")} |
| | } |
| |} |
| |$thisCountTerm = $mergedCntTerm; |
| |$isValueChangedTerm = true; |
| """.stripMargin |
| } |
| } |
| |
| /** The generator used in retraction stream and number of aggregate > 1 */ |
| class LongArrayValueWithRetractionGenerator extends DistinctValueGenerator { |
| checkArgument(aggCount > 1) |
| |
| override def valueTypeTerm: String = "long[]" |
| |
| override def initialValue: String = s"new long[$aggCount]" |
| |
| override def foreachAccumulate( |
| valueTerm: String, |
| innerAccumulateCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| foreachAction(isAccumulate = true, valueTerm, innerAccumulateCodes, filterResults) |
| } |
| |
| override def foreachRetract( |
| valueTerm: String, |
| innerRetractCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| foreachAction(isAccumulate = false, valueTerm, innerRetractCodes, filterResults) |
| } |
| |
| private def foreachAction( |
| isAccumulate: Boolean, |
| valueTerm: String, |
| innerCodes: Array[String], |
| filterResults: Array[Option[String]]): String = { |
| |
| val codes = for (index <- filterResults.indices) yield { |
| val countTerm = newName("count") |
| val code = if (isAccumulate) { |
| s""" |
| |long $countTerm = $valueTerm[$index] + 1; |
| |$valueTerm[$index] = $countTerm; |
| |if ($countTerm == 1) { // cnt is 0 before |
| | ${innerCodes(index)} |
| |} |
| """.stripMargin |
| } else { |
| s""" |
| |long $countTerm = $valueTerm[$index] - 1; |
| |$valueTerm[$index] = $countTerm; |
| |if ($countTerm == 0) { // cnt is +1 before |
| | ${innerCodes(index)} |
| |} |
| """.stripMargin |
| } |
| filterResults(index) match { |
| case None => code |
| case Some(f) => |
| s""" |
| |if ($f) { |
| | $code |
| |} |
| """.stripMargin |
| } |
| } |
| |
| val isEmptyCode = |
| s""" |
| |for (long cnt : $valueTerm) { |
| | if (cnt != 0) { |
| | $isValueEmptyTerm = false; |
| | break; |
| | } |
| |} |
| """.stripMargin |
| |
| s""" |
| |${codes.mkString("\n")} |
| |$isEmptyCode |
| """.stripMargin |
| } |
| |
| override def foreachMerge( |
| thisValueTerm: String, |
| otherValueTerm: String, |
| innerAccumulateCodes: Array[String], |
| innerRetractCodes: Array[String]): String = { |
| |
| val codes = for (index <- innerAccumulateCodes.indices) yield { |
| val thisCountTerm = newName("thisCnt") |
| val mergedCntTerm = newName("mergedCnt") |
| s""" |
| |long $thisCountTerm = $thisValueTerm[$index]; |
| |long $mergedCntTerm = $thisCountTerm + $otherValueTerm[$index]; |
| |if ($mergedCntTerm == 0) { |
| | if ($thisCountTerm > 0) { |
| | // origin is > 0, and retract to 0, do retract |
| | ${innerRetractCodes(index)} |
| | } |
| |} else if ($mergedCntTerm < 0) { |
| | if ($thisCountTerm > 0) { |
| | // origin is > 0, and retract to < 0, do retract |
| | ${innerRetractCodes(index)} |
| | } |
| |} else if ($mergedCntTerm > 0) { |
| | if ($thisCountTerm <= 0) { |
| | // origin is <= 0, and accumulate to > 0, do accumulate |
| | ${innerAccumulateCodes(index)} |
| | } |
| |} |
| |$thisValueTerm[$index] = $mergedCntTerm; |
| """.stripMargin |
| } |
| |
| val isEmptyCode = |
| s""" |
| |for (long cnt : $thisValueTerm) { |
| | if (cnt != 0) { |
| | $isValueEmptyTerm = false; |
| | break; |
| | } |
| |} |
| |$isValueChangedTerm = true; |
| """.stripMargin |
| |
| s""" |
| |${codes.mkString("\n")} |
| |$isEmptyCode |
| """.stripMargin |
| } |
| } |
| } |