blob: d789084867804bbd00fbca9de1b09f58b452948c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import unittest
from mock import patch
from apache_beam.io.components.adaptive_throttler import AdaptiveThrottler
class AdaptiveThrottlerTest(unittest.TestCase):
START_TIME = 1500000000000
SAMPLE_PERIOD = 60000
BUCKET = 1000
OVERLOAD_RATIO = 2
def setUp(self):
self._throttler = AdaptiveThrottler(
AdaptiveThrottlerTest.SAMPLE_PERIOD,
AdaptiveThrottlerTest.BUCKET,
AdaptiveThrottlerTest.OVERLOAD_RATIO)
# As far as practical, keep these tests aligned with
# AdaptiveThrottlerTest.java.
def test_no_initial_throttling(self):
self.assertEqual(
0,
self._throttler._throttling_probability(
AdaptiveThrottlerTest.START_TIME))
def test_no_throttling_if_no_errors(self):
for t in range(AdaptiveThrottlerTest.START_TIME,
AdaptiveThrottlerTest.START_TIME + 20):
self.assertFalse(self._throttler.throttle_request(t))
self._throttler.successful_request(t)
self.assertEqual(
0,
self._throttler._throttling_probability(
AdaptiveThrottlerTest.START_TIME + 20))
def test_no_throttling_after_errors_expire(self):
for t in range(AdaptiveThrottlerTest.START_TIME,
AdaptiveThrottlerTest.START_TIME +
AdaptiveThrottlerTest.SAMPLE_PERIOD,
100):
self._throttler.throttle_request(t)
# And no sucessful_request
self.assertLess(
0,
self._throttler._throttling_probability(
AdaptiveThrottlerTest.START_TIME +
AdaptiveThrottlerTest.SAMPLE_PERIOD))
for t in range(AdaptiveThrottlerTest.START_TIME +
AdaptiveThrottlerTest.SAMPLE_PERIOD,
AdaptiveThrottlerTest.START_TIME +
AdaptiveThrottlerTest.SAMPLE_PERIOD * 2,
100):
self._throttler.throttle_request(t)
self._throttler.successful_request(t)
self.assertEqual(
0,
self._throttler._throttling_probability(
AdaptiveThrottlerTest.START_TIME +
AdaptiveThrottlerTest.SAMPLE_PERIOD * 2))
@patch('random.Random')
def test_throttling_after_errors(self, mock_random):
mock_random().uniform.side_effect = [x / 10.0 for x in range(0, 10)] * 2
self._throttler = AdaptiveThrottler(
AdaptiveThrottlerTest.SAMPLE_PERIOD,
AdaptiveThrottlerTest.BUCKET,
AdaptiveThrottlerTest.OVERLOAD_RATIO)
for t in range(AdaptiveThrottlerTest.START_TIME,
AdaptiveThrottlerTest.START_TIME + 20):
throttled = self._throttler.throttle_request(t)
# 1/3rd of requests succeeding.
if t % 3 == 1:
self._throttler.successful_request(t)
if t > AdaptiveThrottlerTest.START_TIME + 10:
# Roughly 1/3rd succeeding, 1/3rd failing, 1/3rd throttled.
self.assertAlmostEqual(
0.33, self._throttler._throttling_probability(t), delta=0.1)
# Given the mocked random numbers, we expect 10..13 to be throttled and
# 14+ to be unthrottled.
self.assertEqual(t < AdaptiveThrottlerTest.START_TIME + 14, throttled)
if __name__ == '__main__':
unittest.main()