blob: b50cdebbdeb3807d22fd12d07c72a05c0e0e8516 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# cython: profile=True
""" For internal use only. No backwards compatibility guarantees."""
cimport cython
from libc.stdint cimport int64_t, INT64_MAX
from libc.stdlib cimport calloc, free
cdef unsigned long long* POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 10e5,
10e6, 10e7, 10e8, 10e9, 10e10, 10e11,
10e12, 10e13, 10e14, 10e15, 10e16, 10e17,
cdef int64_t get_log10_round_to_floor(int64_t element):
cdef int power = 0
while element >= POWER_TEN[power]:
power += 1
return power - 1
cdef class DataflowDistributionCounter(object):
"""Distribution Counter:
Contains value distribution statistics and methods for incrementing.
Currently using special bucketing strategy suitable for Dataflow
min: minimum value of all inputs.
max: maximum value of all inputs.
count: total count of all inputs.
sum: sum of all inputs.
buckets: histogram buckets of value counts for a
distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as input).
is_cythonized: mark whether DataflowDistributionCounter cythonized.
def __init__(self):
self.min = INT64_MAX
self.max = 0
self.count = 0
self.sum = 0
self.buckets = <int64_t*> calloc(MAX_BUCKET_SIZE, sizeof(int64_t))
self.is_cythonized = True
def __dealloc__(self):
"""free allocated memory"""
cpdef bint add_input(self, int64_t element) except -1:
if element < 0:
raise ValueError('Distribution counters support only non-negative value')
self.min = min(self.min, element)
self.max = max(self.max, element)
self.count += 1
self.sum += element
cdef int64_t bucket_index = self._fast_calculate_bucket_index(element)
self.buckets[bucket_index] += 1
cdef int64_t _fast_calculate_bucket_index(self, int64_t element):
"""Calculate the bucket index for the given element.
Declare calculate_bucket_index as cdef in order to improve performance,
since cpdef will have significant overhead.
if element == 0:
return 0
cdef int64_t log10_floor = get_log10_round_to_floor(element)
cdef int64_t power_of_ten = POWER_TEN[log10_floor]
cdef int64_t bucket_offset = 0
if element < power_of_ten * 2:
bucket_offset = 0
elif element < power_of_ten * 5:
bucket_offset = 1
bucket_offset = 2
return 1 + log10_floor * BUCKET_PER_TEN + bucket_offset
cpdef void translate_to_histogram(self, histogram):
"""Translate buckets into Histogram.
histogram: apache_beam.runners.dataflow.internal.clents.dataflow.Histogram
Ideally, only call this function when reporting counter to
dataflow service.
cdef int first_bucket_offset = 0
cdef int last_bucket_offset = 0
cdef int index = 0
for index in range(0, MAX_BUCKET_SIZE):
if self.buckets[index] != 0:
first_bucket_offset = index
for index in range(MAX_BUCKET_SIZE - 1, -1, -1):
if self.buckets[index] != 0:
last_bucket_offset = index
histogram.firstBucketOffset = first_bucket_offset
histogram.bucketCounts = []
for index in range(first_bucket_offset, last_bucket_offset + 1):
cpdef bint add_inputs_for_test(self, elements) except -1:
"""Used for performance microbenchmark.
During runtime, add_input will be called through c-call, so we want to have
the same calling routine when running microbenchmark as application runtime.
Directly calling cpdef from def will cause significant overhead.
for element in elements:
cpdef int64_t calculate_bucket_index(self, int64_t element):
"""Used for unit tests.
cdef calculate_bucket_index cannot be called directly from def.
return self._fast_calculate_bucket_index(element)
cpdef merge(self, accumulators):
raise NotImplementedError()