blob: 1c54acc8da1823871fda4ed4180ed323de9c8713 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.functions.aggfunctions
import java.lang.{Long => JLong}
import java.lang.{Iterable => JIterable}
import java.util.{Map => JMap}
import org.apache.flink.table.api.dataview.MapView
import org.apache.flink.types.Row
/**
* Wraps an accumulator and adds a map to filter distinct values.
*
* @param distinctValueMap the [[MapView]] that stores the distinct filter hash map.
*/
class DistinctAccumulator(var distinctValueMap: MapView[Row, JLong]) {
def this() {
this(new MapView[Row, JLong]())
}
def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator]
override def equals(that: Any): Boolean =
that match {
case that: DistinctAccumulator => that.canEqual(this) &&
this.distinctValueMap == that.distinctValueMap
case _ => false
}
/**
* Checks if the parameters are unique and adds the parameters to the distinct map.
* Returns true if the parameters are unique (haven't been in the map yet), false otherwise.
*
* @param params the parameters to check.
* @return true if the parameters are unique (haven't been in the map yet), false otherwise.
*/
def add(params: Row): Boolean = {
val currentCnt = distinctValueMap.get(params)
if (currentCnt != null) {
distinctValueMap.put(params, currentCnt + 1L)
false
} else {
distinctValueMap.put(params, 1L)
true
}
}
/**
* Checks if the parameters are unique and adds the parameters to the distinct map.
* Returns true if the parameters are unique (haven't been in the map yet), false otherwise.
*
* @param params the parameters to check.
* @return true if the parameters are unique (haven't been in the map yet), false otherwise.
*/
def add(params: Row, count: JLong): Boolean = {
val currentCnt = distinctValueMap.get(params)
if (currentCnt != null) {
distinctValueMap.put(params, currentCnt + count)
false
} else {
distinctValueMap.put(params, count)
true
}
}
/**
* Removes one instance of the parameters from the distinct map and checks if this was the last
* instance.
* Returns true if no instances of the parameters remain in the map, false otherwise.
*
* @param params the parameters to check.
* @return true if no instances of the parameters remain in the map, false otherwise.
*/
def remove(params: Row): Boolean = {
val currentCnt = distinctValueMap.get(params)
if (currentCnt == 1) {
distinctValueMap.remove(params)
true
} else {
distinctValueMap.put(params, currentCnt - 1L)
false
}
}
def reset(): Unit = {
distinctValueMap.clear()
}
def elements(): JIterable[JMap.Entry[Row, JLong]] = {
distinctValueMap.map.entrySet()
}
}