blob: a567478999e97bee8cb70c03a71f090d8cf93f4e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cython: profile=True
# cython: language_level=3
""" For internal use only. No backwards compatibility guarantees."""
cimport cython
from libc.stdint cimport int64_t, INT64_MAX
from libc.stdlib cimport calloc, free
cdef unsigned long long* POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 10e5,
10e6, 10e7, 10e8, 10e9, 10e10, 10e11,
10e12, 10e13, 10e14, 10e15, 10e16, 10e17,
10e18]
cdef int64_t get_log10_round_to_floor(int64_t element):
cdef int power = 0
while element >= POWER_TEN[power]:
power += 1
return power - 1
cdef class DataflowDistributionCounter(object):
"""Distribution Counter:
Contains value distribution statistics and methods for incrementing.
Currently using special bucketing strategy suitable for Dataflow
Attributes:
min: minimum value of all inputs.
max: maximum value of all inputs.
count: total count of all inputs.
sum: sum of all inputs.
buckets: histogram buckets of value counts for a
distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as input).
is_cythonized: mark whether DataflowDistributionCounter cythonized.
"""
def __init__(self):
self.min = INT64_MAX
self.max = 0
self.count = 0
self.sum = 0
self.buckets = <int64_t*> calloc(MAX_BUCKET_SIZE, sizeof(int64_t))
self.is_cythonized = True
def __dealloc__(self):
"""free allocated memory"""
free(self.buckets)
cpdef bint add_input(self, int64_t element) except -1:
if element < 0:
raise ValueError('Distribution counters support only non-negative value')
self.min = min(self.min, element)
self.max = max(self.max, element)
self.count += 1
self.sum += element
cdef int64_t bucket_index = self._fast_calculate_bucket_index(element)
self.buckets[bucket_index] += 1
cdef int64_t _fast_calculate_bucket_index(self, int64_t element):
"""Calculate the bucket index for the given element.
Declare calculate_bucket_index as cdef in order to improve performance,
since cpdef will have significant overhead.
"""
if element == 0:
return 0
cdef int64_t log10_floor = get_log10_round_to_floor(element)
cdef int64_t power_of_ten = POWER_TEN[log10_floor]
cdef int64_t bucket_offset = 0
if element < power_of_ten * 2:
bucket_offset = 0
elif element < power_of_ten * 5:
bucket_offset = 1
else:
bucket_offset = 2
return 1 + log10_floor * BUCKET_PER_TEN + bucket_offset
cpdef void translate_to_histogram(self, histogram):
"""Translate buckets into Histogram.
Args:
histogram: apache_beam.runners.dataflow.internal.clents.dataflow.Histogram
Ideally, only call this function when reporting counter to
dataflow service.
"""
cdef int first_bucket_offset = 0
cdef int last_bucket_offset = 0
cdef int index = 0
for index in range(0, MAX_BUCKET_SIZE):
if self.buckets[index] != 0:
first_bucket_offset = index
break
for index in range(MAX_BUCKET_SIZE - 1, -1, -1):
if self.buckets[index] != 0:
last_bucket_offset = index
break
histogram.firstBucketOffset = first_bucket_offset
histogram.bucketCounts = []
for index in range(first_bucket_offset, last_bucket_offset + 1):
histogram.bucketCounts.append(self.buckets[index])
cpdef bint add_inputs_for_test(self, elements) except -1:
"""Used for performance microbenchmark.
During runtime, add_input will be called through c-call, so we want to have
the same calling routine when running microbenchmark as application runtime.
Directly calling cpdef from def will cause significant overhead.
"""
for element in elements:
self.add_input(element)
cpdef int64_t calculate_bucket_index(self, int64_t element):
"""Used for unit tests.
cdef calculate_bucket_index cannot be called directly from def.
"""
return self._fast_calculate_bucket_index(element)
cpdef merge(self, accumulators):
raise NotImplementedError()