blob: 980abab47c46ece66ec917288ea4f74f4f54a6b8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""For internal use only; no backwards-compatibility guarantees."""
from __future__ import absolute_import
from builtins import object
from builtins import range
globals()['INT64_MAX'] = 2**63 - 1
globals()['INT64_MIN'] = -2**63
POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 10e5,
10e6, 10e7, 10e8, 10e9, 10e10, 10e11,
10e12, 10e13, 10e14, 10e15, 10e16, 10e17,
10e18]
def get_log10_round_to_floor(element):
power = 0
while element >= POWER_TEN[power]:
power += 1
return power - 1
class DataflowDistributionCounter(object):
"""Pure python DataflowDistributionCounter in case Cython not available.
Please avoid using python mode if possible, since it's super slow
Cythonized DatadflowDistributionCounter defined in
apache_beam.transforms.cy_dataflow_distribution_counter.
Currently using special bucketing strategy suitable for Dataflow
Attributes:
min: minimum value of all inputs.
max: maximum value of all inputs.
count: total count of all inputs.
sum: sum of all inputs.
buckets: histogram buckets of value counts for a
distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as input).
is_cythonized: mark whether DataflowDistributionCounter cythonized.
"""
# Assume the max input is sys.maxint, then the possible max bucket size is 59
MAX_BUCKET_SIZE = 59
# 3 buckets for every power of ten -> 1, 2, 5
BUCKET_PER_TEN = 3
def __init__(self):
global INT64_MAX # pylint: disable=global-variable-not-assigned
self.min = INT64_MAX
self.max = 0
self.count = 0
self.sum = 0
self.buckets = [0] * self.MAX_BUCKET_SIZE
self.is_cythonized = False
def add_input(self, element):
if element < 0:
raise ValueError('Distribution counters support only non-negative value')
self.min = min(self.min, element)
self.max = max(self.max, element)
self.count += 1
self.sum += element
bucket_index = self.calculate_bucket_index(element)
self.buckets[bucket_index] += 1
def calculate_bucket_index(self, element):
"""Calculate the bucket index for the given element."""
if element == 0:
return 0
log10_floor = get_log10_round_to_floor(element)
power_of_ten = POWER_TEN[log10_floor]
if element < power_of_ten * 2:
bucket_offset = 0
elif element < power_of_ten * 5:
bucket_offset = 1
else:
bucket_offset = 2
return 1 + log10_floor * self.BUCKET_PER_TEN + bucket_offset
def translate_to_histogram(self, histogram):
"""Translate buckets into Histogram.
Args:
histogram: apache_beam.runners.dataflow.internal.clents.dataflow.Histogram
Ideally, only call this function when reporting counter to
dataflow service.
"""
first_bucket_offset = 0
last_bucket_offset = 0
for index in range(0, self.MAX_BUCKET_SIZE):
if self.buckets[index] != 0:
first_bucket_offset = index
break
for index in range(self.MAX_BUCKET_SIZE - 1, -1, -1):
if self.buckets[index] != 0:
last_bucket_offset = index
break
histogram.firstBucketOffset = first_bucket_offset
histogram.bucketCounts = (
self.buckets[first_bucket_offset:last_bucket_offset + 1])
def merge(self, accumulators):
raise NotImplementedError()