blob: d5b78cee0c318ae1294979516263fa90e7f7ec64 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import time
from abc import abstractmethod
from decimal import Decimal
from pyflink.table import AggregateFunction, MapView, ListView
MAX_LONG_VALUE = sys.maxsize
MIN_LONG_VALUE = -MAX_LONG_VALUE - 1
class AvgAggFunction(AggregateFunction):
def get_value(self, accumulator):
# sum / count
if accumulator[0] != 0:
return accumulator[1] / accumulator[0]
else:
return None
def create_accumulator(self):
# [count, sum]
return [0, 0]
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0] += 1
accumulator[1] += args[0]
def retract(self, accumulator, *args):
if args[0] is not None:
accumulator[0] -= 1
accumulator[1] -= args[0]
def merge(self, accumulator, accumulators):
for acc in accumulators:
if acc[1] is not None:
accumulator[0] += acc[0]
accumulator[1] += acc[1]
class Count1AggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
return [0]
def accumulate(self, accumulator, *args):
accumulator[0] += 1
def retract(self, accumulator, *args):
accumulator[0] -= 1
def merge(self, accumulator, accumulators):
for acc in accumulators:
accumulator[0] += acc[0]
class CountAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
return [0]
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0] += 1
def retract(self, accumulator, *args):
if args[0] is not None:
accumulator[0] -= 1
def merge(self, accumulator, accumulators):
for acc in accumulators:
accumulator[0] += acc[0]
class FirstValueAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
# [first_value]
return [None]
def accumulate(self, accumulator, *args):
if accumulator[0] is None and args[0] is not None:
accumulator[0] = args[0]
def retract(self, accumulator, *args):
raise NotImplementedError("This function does not support retraction.")
def merge(self, accumulator, accumulators):
raise NotImplementedError("This function does not support merge.")
class FirstValueWithRetractAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
# [first_value, first_order, value_to_order_map, order_to_value_map]
return [None, None, MapView(), MapView()]
def accumulate(self, accumulator, *args):
if args[0] is not None:
value = args[0]
prev_order = accumulator[1]
value_to_order_map = accumulator[2]
order_to_value_map = accumulator[3]
# calculate the order of current value
order = int(round(time.time() * 1000))
if value in value_to_order_map:
order_list = value_to_order_map[value]
else:
order_list = []
order_list.append(order)
value_to_order_map[value] = order_list
if prev_order is None or prev_order > order:
accumulator[0] = value
accumulator[1] = order
if order in order_to_value_map:
value_list = order_to_value_map[order]
else:
value_list = []
value_list.append(value)
order_to_value_map[order] = value_list
def retract(self, accumulator, *args):
if args[0] is not None:
value = args[0]
prev_value = accumulator[0]
prev_order = accumulator[1]
value_to_order_map = accumulator[2]
order_to_value_map = accumulator[3]
# calculate the order of current value
if value in value_to_order_map and value_to_order_map[value]:
order_list = value_to_order_map[value]
else:
# this data has not been accumulated
return
# get and remove current order in value_to_order_map
order = order_list.pop(0)
if order_list:
value_to_order_map[value] = order_list
else:
del value_to_order_map[value]
# remove current value in order_to_value_map
if order in order_to_value_map:
value_list = order_to_value_map[order]
else:
# this data has not been accumulated
return
if value in value_list:
value_list.remove(value)
if value_list:
order_to_value_map[order] = value_list
else:
del order_to_value_map[order]
if value == prev_value:
start_key = prev_order
next_key = MAX_LONG_VALUE
for key in order_to_value_map:
if start_key <= key < next_key:
next_key = key
if next_key != MAX_LONG_VALUE:
accumulator[0] = order_to_value_map[next_key][0]
accumulator[1] = next_key
else:
accumulator[0] = None
accumulator[1] = None
def merge(self, accumulator, accumulators):
raise NotImplementedError("This function does not support merge.")
class LastValueAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
# [last_value]
return [None]
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0] = args[0]
def retract(self, accumulator, *args):
raise NotImplementedError("This function does not support retraction.")
def merge(self, accumulator, accumulators):
raise NotImplementedError("This function does not support merge.")
class LastValueWithRetractAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
# [last_value, last_order, value_to_order_map, order_to_value_mapl9]
return [None, None, MapView(), MapView()]
def accumulate(self, accumulator, *args):
if args[0] is not None:
value = args[0]
prev_order = accumulator[1]
value_to_order_map = accumulator[2]
order_to_value_map = accumulator[3]
# calculate the order of current value
order = int(time.time() * 1000)
if value in value_to_order_map:
order_list = value_to_order_map[value]
else:
order_list = []
order_list.append(order)
value_to_order_map[value] = order_list
if prev_order is None or prev_order <= order:
accumulator[0] = value
accumulator[1] = order
if order in order_to_value_map:
value_list = order_to_value_map[order]
else:
value_list = []
value_list.append(value)
order_to_value_map[order] = value_list
def retract(self, accumulator, *args):
if args[0] is not None:
value = args[0]
prev_value = accumulator[0]
prev_order = accumulator[1]
value_to_order_map = accumulator[2]
order_to_value_map = accumulator[3]
# calculate the order of current value
if value in value_to_order_map and value_to_order_map[value]:
order_list = value_to_order_map[value]
else:
# this data has not been accumulated
return
# get and remove current order in value_to_order_map
order = order_list.pop(0)
if order_list:
value_to_order_map[value] = order_list
else:
del value_to_order_map[value]
if order in order_to_value_map:
value_list = order_to_value_map[order]
else:
return
if value in value_list:
value_list.remove(value)
if value_list:
order_to_value_map[order] = value_list
else:
del order_to_value_map[order]
if value == prev_value:
start_key = prev_order
next_key = MIN_LONG_VALUE
for key in order_to_value_map:
if start_key >= key > next_key:
next_key = key
if next_key != MIN_LONG_VALUE:
values = order_to_value_map[next_key]
accumulator[0] = values[len(values) - 1]
accumulator[1] = next_key
else:
accumulator[0] = None
accumulator[1] = None
def merge(self, accumulator, accumulators):
raise NotImplementedError("This function does not support merge.")
class ListAggFunction(AggregateFunction):
def get_value(self, accumulator):
if accumulator[1]:
return accumulator[0].join(accumulator[1])
else:
return None
def create_accumulator(self):
# delimiter, values
return [',', []]
def accumulate(self, accumulator, *args):
if args[0] is not None:
if len(args) > 1:
accumulator[0] = args[1]
accumulator[1].append(args[0])
def retract(self, accumulator, *args):
raise NotImplementedError("This function does not support retraction.")
class ListAggWithRetractAggFunction(AggregateFunction):
def get_value(self, accumulator):
values = [i for i in accumulator[0]]
if values:
return ','.join(values)
else:
return None
def create_accumulator(self):
# [list, retract_list]
return [ListView(), ListView()]
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0].add(args[0])
def retract(self, accumulator, *args):
if args[0] is not None:
values = [i for i in accumulator[0]]
try:
values.remove(args[0])
accumulator[0].clear()
accumulator[0].add_all(values)
except ValueError:
accumulator[1].add(args[0])
def merge(self, accumulator, accumulators):
for acc in accumulators:
buffer = [e for e in acc[0]]
retract_buffer = [e for e in acc[1]]
if buffer or retract_buffer:
for e in accumulator[0]:
buffer.append(e)
for e in accumulator[1]:
retract_buffer.append(e)
# merge list & retract list
new_retract_buffer = []
for e in retract_buffer:
if e in buffer:
buffer.remove(e)
else:
new_retract_buffer.append(e)
accumulator[0].clear()
accumulator[0].add_all(buffer)
accumulator[1].clear()
accumulator[1].add_all(new_retract_buffer)
class ListAggWsWithRetractAggFunction(AggregateFunction):
def get_value(self, accumulator):
values = [i for i in accumulator[0]]
if values:
return accumulator[2].join(values)
else:
return None
def create_accumulator(self):
# [list, retract_list, delimiter]
return [ListView(), ListView(), ',']
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[2] = args[1]
accumulator[0].add(args[0])
def retract(self, accumulator, *args):
if args[0] is not None:
accumulator[2] = args[1]
values = [i for i in accumulator[0]]
if args[0] in values:
values.remove(args[0])
accumulator[0].clear()
accumulator[0].add_all(values)
else:
accumulator[1].add(args[0])
def merge(self, accumulator, accumulators):
for acc in accumulators:
buffer = [e for e in acc[0]]
retract_buffer = [e for e in acc[1]]
if buffer or retract_buffer:
accumulator[2] = acc[2]
for e in accumulator[0]:
buffer.append(e)
for e in accumulator[1]:
retract_buffer.append(e)
# merge list & retract list
new_retract_buffer = []
for e in retract_buffer:
if e in buffer:
buffer.remove(e)
else:
new_retract_buffer.append(e)
accumulator[0].clear()
accumulator[0].add_all(buffer)
accumulator[1].clear()
accumulator[1].add_all(retract_buffer)
class MaxAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
return [None]
def accumulate(self, accumulator, *args):
if args[0] is not None:
if accumulator[0] is None or args[0] > accumulator[0]:
accumulator[0] = args[0]
def retract(self, accumulator, *args):
raise NotImplementedError("This function does not support retraction.")
def merge(self, accumulator, accumulators):
for acc in accumulators:
if acc[0] is not None:
if accumulator[0] is None or acc[0] > accumulator[0]:
accumulator[0] = acc[0]
class MaxWithRetractAggFunction(AggregateFunction):
def get_value(self, accumulator):
if accumulator[1] > 0:
return accumulator[0]
else:
return None
def create_accumulator(self):
# [max, map_size, value_to_count_map]
return [None, 0, MapView()]
def accumulate(self, accumulator, *args):
if args[0] is not None:
value = args[0]
if accumulator[1] == 0 or accumulator[0] < value:
accumulator[0] = value
if value in accumulator[2]:
count = accumulator[2][value]
else:
count = 0
count += 1
if count == 0:
del accumulator[2][value]
else:
accumulator[2][value] = count
if count == 1:
accumulator[1] += 1
def retract(self, accumulator, *args):
if args[0] is not None:
value = args[0]
if value in accumulator[2]:
count = accumulator[2][value]
else:
count = 0
count -= 1
if count == 0:
del accumulator[2][value]
accumulator[1] -= 1
if accumulator[1] == 0:
accumulator[0] = None
return
if value == accumulator[0]:
self.update_max(accumulator)
@staticmethod
def update_max(acc):
has_max = False
for value in acc[2]:
if not has_max or acc[0] < value:
acc[0] = value
has_max = True
# The behavior of deleting expired data in the state backend is uncertain.
# so `mapSize` data may exist, while `map` data may have been deleted
# when both of them are expired.
if not has_max:
acc[0] = None
# we should also override max value, because it may have an old value.
acc[1] = 0
def merge(self, acc, accumulators):
need_update_max = False
for a in accumulators:
# set max element
if acc[1] == 0 or (a[1] > 0 and a[0] is not None and acc[0] < a[0]):
acc[0] = a[0]
# merge the count for each key
for value, count in a[2].items():
if value in acc[2]:
this_count = acc[2][value]
else:
this_count = 0
merged_count = count + this_count
if merged_count == 0:
# remove it when count is increased from -1 to 0
del acc[2][value]
# origin is > 0, and retract to 0
if this_count > 0:
acc[1] -= 1
if value == acc[0]:
need_update_max = True
elif merged_count < 0:
acc[2][value] = merged_count
if this_count > 0:
# origin is > 0, and retract to < 0
acc[1] -= 1
if value == acc[0]:
need_update_max = True
else: # merged_count > 0
acc[2][value] = merged_count
if this_count <= 0:
# origin is <= 0, and accumulate to > 0
acc[1] += 1
if need_update_max:
self.update_max(acc)
class MinAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
# [min]
return [None]
def accumulate(self, accumulator, *args):
if args[0] is not None:
if accumulator[0] is None or accumulator[0] > args[0]:
accumulator[0] = args[0]
def retract(self, accumulator, *args):
raise NotImplementedError("This function does not support retraction.")
def merge(self, accumulator, accumulators):
for acc in accumulators:
if acc[0] is not None:
if accumulator[0] is None or accumulator[0] > acc[0]:
accumulator[0] = acc[0]
class MinWithRetractAggFunction(AggregateFunction):
def get_value(self, accumulator):
if accumulator[1] > 0:
return accumulator[0]
else:
return None
def create_accumulator(self):
# [min, map_size, value_to_count_map]
return [None, 0, MapView()]
def accumulate(self, accumulator, *args):
if args[0] is not None:
value = args[0]
if accumulator[1] == 0 or accumulator[0] > value:
accumulator[0] = value
if value in accumulator[2]:
count = accumulator[2][value]
else:
count = 0
count += 1
if count == 0:
del accumulator[2][value]
else:
accumulator[2][value] = count
if count == 1:
accumulator[1] += 1
def retract(self, accumulator, *args):
if args[0] is not None:
value = args[0]
if value in accumulator[2]:
count = accumulator[2][value]
else:
count = 0
count -= 1
if count == 0:
del accumulator[2][value]
accumulator[1] -= 1
if accumulator[1] == 0:
accumulator[0] = None
return
if value == accumulator[0]:
self.update_min(accumulator)
@staticmethod
def update_min(acc):
has_max = False
for value in acc[2]:
if not has_max or acc[0] > value:
acc[0] = value
has_max = True
# The behavior of deleting expired data in the state backend is uncertain.
# so `mapSize` data may exist, while `map` data may have been deleted
# when both of them are expired.
if not has_max:
acc[0] = None
# we should also override min value, because it may have an old value.
acc[1] = 0
def merge(self, acc, accumulators):
need_update_min = False
for a in accumulators:
# set min element
if acc[1] == 0 or (a[1] > 0 and a[0] is not None and acc[0] > a[0]):
acc[0] = a[0]
# merge the count for each key
for value, count in a[2].items():
if value in acc[2]:
this_count = acc[2][value]
else:
this_count = 0
merged_count = count + this_count
if merged_count == 0:
# remove it when count is increased from -1 to 0
del acc[2][value]
# origin is > 0, and retract to 0
if this_count > 0:
acc[1] -= 1
if value == acc[0]:
need_update_min = True
elif merged_count < 0:
acc[2][value] = merged_count
if this_count > 0:
# origin is > 0, and retract to < 0
acc[1] -= 1
if value == acc[0]:
need_update_min = True
else: # merged_count > 0
acc[2][value] = merged_count
if this_count <= 0:
# origin is <= 0, and accumulate to > 0
acc[1] += 1
if need_update_min:
self.update_min(acc)
class Sum0AggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
@abstractmethod
def create_accumulator(self):
pass
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0] += args[0]
def retract(self, accumulator, *args):
if args[0] is not None:
accumulator[0] -= args[0]
def merge(self, accumulator, accumulators):
for acc in accumulators:
accumulator[0] += acc[0]
class IntSum0AggFunction(Sum0AggFunction):
def create_accumulator(self):
# [sum]
return [0]
class FloatSum0AggFunction(Sum0AggFunction):
def create_accumulator(self):
# [sum]
return [0.0]
class DecimalSum0AggFunction(Sum0AggFunction):
def create_accumulator(self):
# [sum]
return [Decimal('0')]
class SumAggFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
# [sum]
return [None]
def accumulate(self, accumulator, *args):
if args[0] is not None:
if accumulator[0] is None:
accumulator[0] = args[0]
else:
accumulator[0] += args[0]
def retract(self, accumulator, *args):
raise NotImplementedError("This function does not support retraction.")
def merge(self, accumulator, accumulators):
for acc in accumulators:
if acc[0] is not None:
if accumulator[0] is None:
accumulator[0] = acc[0]
else:
accumulator[0] += acc[0]
class SumWithRetractAggFunction(AggregateFunction):
def get_value(self, accumulator):
if accumulator[1] == 0:
return None
else:
return accumulator[0]
def create_accumulator(self):
# [sum, count]
return [0, 0]
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0] += args[0]
accumulator[1] += 1
def retract(self, accumulator, *args):
if args[0] is not None:
accumulator[0] -= args[0]
accumulator[1] -= 1
def merge(self, accumulator, accumulators):
for acc in accumulators:
if acc[0] is not None:
accumulator[0] += acc[0]
accumulator[1] += acc[1]