blob: 730d149e9d4cde0d7b24250df57940e2dbb78c27 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for state caching."""
# pytype: skip-file
from __future__ import absolute_import
import logging
import unittest
from apache_beam.metrics.monitoring_infos import LATEST_INT64_TYPE
from apache_beam.metrics.monitoring_infos import SUM_INT64_TYPE
from apache_beam.runners.worker.statecache import StateCache
class StateCacheTest(unittest.TestCase):
def test_empty_cache_get(self):
cache = self.get_cache(5)
self.assertEqual(cache.get("key", 'cache_token'), None)
with self.assertRaises(Exception):
# Invalid cache token provided
self.assertEqual(cache.get("key", None), None)
self.verify_metrics(cache, {'get': 1, 'put': 0, 'extend': 0,
'miss': 1, 'hit': 0, 'clear': 0,
'evict': 0,
'size': 0, 'capacity': 5})
def test_put_get(self):
cache = self.get_cache(5)
cache.put("key", "cache_token", "value")
self.assertEqual(cache.size(), 1)
self.assertEqual(cache.get("key", "cache_token"), "value")
self.assertEqual(cache.get("key", "cache_token2"), None)
with self.assertRaises(Exception):
self.assertEqual(cache.get("key", None), None)
self.verify_metrics(cache, {'get': 2, 'put': 1, 'extend': 0,
'miss': 1, 'hit': 1, 'clear': 0,
'evict': 0,
'size': 1, 'capacity': 5})
def test_overwrite(self):
cache = self.get_cache(2)
cache.put("key", "cache_token", "value")
cache.put("key", "cache_token2", "value2")
self.assertEqual(cache.size(), 1)
self.assertEqual(cache.get("key", "cache_token"), None)
self.assertEqual(cache.get("key", "cache_token2"), "value2")
self.verify_metrics(cache, {'get': 2, 'put': 2, 'extend': 0,
'miss': 1, 'hit': 1, 'clear': 0,
'evict': 0,
'size': 1, 'capacity': 2})
def test_extend(self):
cache = self.get_cache(3)
cache.put("key", "cache_token", ['val'])
# test extend for existing key
cache.extend("key", "cache_token", ['yet', 'another', 'val'])
self.assertEqual(cache.size(), 1)
self.assertEqual(cache.get("key", "cache_token"),
['val', 'yet', 'another', 'val'])
# test extend without existing key
cache.extend("key2", "cache_token", ['another', 'val'])
self.assertEqual(cache.size(), 2)
self.assertEqual(cache.get("key2", "cache_token"), ['another', 'val'])
# test eviction in case the cache token changes
cache.extend("key2", "new_token", ['new_value'])
self.assertEqual(cache.get("key2", "new_token"), None)
self.assertEqual(cache.size(), 1)
self.verify_metrics(cache, {'get': 3, 'put': 1, 'extend': 3,
'miss': 1, 'hit': 2, 'clear': 0,
'evict': 1,
'size': 1, 'capacity': 3})
def test_clear(self):
cache = self.get_cache(5)
cache.clear("new-key", "cache_token")
cache.put("key", "cache_token", ["value"])
self.assertEqual(cache.size(), 2)
self.assertEqual(cache.get("new-key", "new_token"), None)
self.assertEqual(cache.get("key", "cache_token"), ['value'])
# test clear without existing key/token
cache.clear("non-existing", "token")
self.assertEqual(cache.size(), 3)
self.assertEqual(cache.get("non-existing", "token"), [])
# test eviction in case the cache token changes
cache.clear("new-key", "wrong_token")
self.assertEqual(cache.size(), 2)
self.assertEqual(cache.get("new-key", "cache_token"), None)
self.assertEqual(cache.get("new-key", "wrong_token"), None)
self.verify_metrics(cache, {'get': 5, 'put': 1, 'extend': 0,
'miss': 3, 'hit': 2, 'clear': 3,
'evict': 1,
'size': 2, 'capacity': 5})
def test_max_size(self):
cache = self.get_cache(2)
cache.put("key", "cache_token", "value")
cache.put("key2", "cache_token", "value")
self.assertEqual(cache.size(), 2)
cache.put("key2", "cache_token", "value")
self.assertEqual(cache.size(), 2)
cache.put("key", "cache_token", "value")
self.assertEqual(cache.size(), 2)
self.verify_metrics(cache, {'get': 0, 'put': 4, 'extend': 0,
'miss': 0, 'hit': 0, 'clear': 0,
'evict': 0,
'size': 2, 'capacity': 2})
def test_evict_all(self):
cache = self.get_cache(5)
cache.put("key", "cache_token", "value")
cache.put("key2", "cache_token", "value2")
self.assertEqual(cache.size(), 2)
cache.evict_all()
self.assertEqual(cache.size(), 0)
self.assertEqual(cache.get("key", "cache_token"), None)
self.assertEqual(cache.get("key2", "cache_token"), None)
self.verify_metrics(cache, {'get': 2, 'put': 2, 'extend': 0,
'miss': 2, 'hit': 0, 'clear': 0,
'evict': 0,
'size': 0, 'capacity': 5})
def test_lru(self):
cache = self.get_cache(5)
cache.put("key", "cache_token", "value")
cache.put("key2", "cache_token2", "value2")
cache.put("key3", "cache_token", "value0")
cache.put("key3", "cache_token", "value3")
cache.put("key4", "cache_token4", "value4")
cache.put("key5", "cache_token", "value0")
cache.put("key5", "cache_token", ["value5"])
self.assertEqual(cache.size(), 5)
self.assertEqual(cache.get("key", "cache_token"), "value")
self.assertEqual(cache.get("key2", "cache_token2"), "value2")
self.assertEqual(cache.get("key3", "cache_token"), "value3")
self.assertEqual(cache.get("key4", "cache_token4"), "value4")
self.assertEqual(cache.get("key5", "cache_token"), ["value5"])
# insert another key to trigger cache eviction
cache.put("key6", "cache_token2", "value7")
self.assertEqual(cache.size(), 5)
# least recently used key should be gone ("key")
self.assertEqual(cache.get("key", "cache_token"), None)
# trigger a read on "key2"
cache.get("key2", "cache_token")
# insert another key to trigger cache eviction
cache.put("key7", "cache_token", "value7")
self.assertEqual(cache.size(), 5)
# least recently used key should be gone ("key3")
self.assertEqual(cache.get("key3", "cache_token"), None)
# trigger a put on "key2"
cache.put("key2", "cache_token", "put")
self.assertEqual(cache.size(), 5)
# insert another key to trigger cache eviction
cache.put("key8", "cache_token", "value8")
self.assertEqual(cache.size(), 5)
# least recently used key should be gone ("key4")
self.assertEqual(cache.get("key4", "cache_token"), None)
# make "key5" used by appending to it
cache.extend("key5", "cache_token", ["another"])
# least recently used key should be gone ("key6")
self.assertEqual(cache.get("key6", "cache_token"), None)
self.verify_metrics(cache, {'get': 10, 'put': 11, 'extend': 1,
'miss': 5, 'hit': 5, 'clear': 0,
'evict': 0,
'size': 5, 'capacity': 5})
def test_is_cached_enabled(self):
cache = self.get_cache(1)
self.assertEqual(cache.is_cache_enabled(), True)
self.verify_metrics(cache, {})
cache = self.get_cache(0)
self.assertEqual(cache.is_cache_enabled(), False)
self.verify_metrics(cache, {})
def verify_metrics(self, cache, expected_metrics):
infos = cache.get_monitoring_infos()
# Reconstruct metrics dictionary from monitoring infos
metrics = {
info.urn.rsplit(':', 1)[1]: info.metric.counter_data.int64_value
for info in infos
if "_total" not in info.urn and info.type == LATEST_INT64_TYPE
}
self.assertDictEqual(metrics, expected_metrics)
# Metrics and total metrics should be identical for a single bundle.
# The following two gauges are not part of the total metrics:
try:
del metrics['capacity']
del metrics['size']
except KeyError:
pass
total_metrics = {
info.urn.rsplit(':', 1)[1].rsplit("_total")[0]:
info.metric.counter_data.int64_value
for info in infos
if "_total" in info.urn and info.type == SUM_INT64_TYPE
}
self.assertDictEqual(metrics, total_metrics)
@staticmethod
def get_cache(size):
cache = StateCache(size)
cache.initialize_metrics()
return cache
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()