blob: 12c638a342b1a77b69252af59d0d14397520d109 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import filecmp
import os
from pyflink.gen_protos import generate_proto_files
from pyflink.testing.test_case_utils import PyFlinkTestCase
class FlinkFnExecutionTests(PyFlinkTestCase):
"""
Tests whether flink_fn_exeution_pb2.py and flink_fn_exeution_pb2.pyi is synced with flink-fn-execution.proto.
"""
flink_fn_execution_pb2_file_name = "flink_fn_execution_pb2.py"
flink_fn_execution_pb2i_file_name = "flink_fn_execution_pb2.pyi"
gen_protos_script = "gen_protos.py"
flink_fn_execution_proto_file_name = "flink-fn-execution.proto"
def check_file_content(self, file_name):
expected = os.path.join(self.tempdir, file_name)
actual = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', file_name)
self.assertTrue(filecmp.cmp(expected, actual),
'File %s should be re-generated by executing %s as %s has changed.'
% (file_name,
self.gen_protos_script,
self.flink_fn_execution_proto_file_name))
def test_flink_fn_execution_pb2_synced(self):
generate_proto_files('True', self.tempdir)
self.check_file_content(self.flink_fn_execution_pb2_file_name)
self.check_file_content(self.flink_fn_execution_pb2i_file_name)
def test_state_ttl_config_proto(self):
from pyflink.datastream.state import StateTtlConfig
from pyflink.common.time import Time
state_ttl_config = StateTtlConfig \
.new_builder(Time.milliseconds(1000)) \
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
.cleanup_full_snapshot() \
.cleanup_incrementally(10, True) \
.cleanup_in_rocksdb_compact_filter(1000) \
.build()
state_ttl_config_proto = state_ttl_config._to_proto()
state_ttl_config = StateTtlConfig._from_proto(state_ttl_config_proto)
self.assertEqual(state_ttl_config.get_ttl(), Time.milliseconds(1000))
self.assertEqual(
state_ttl_config.get_update_type(), StateTtlConfig.UpdateType.OnCreateAndWrite)
self.assertEqual(
state_ttl_config.get_state_visibility(),
StateTtlConfig.StateVisibility.NeverReturnExpired)
self.assertEqual(
state_ttl_config.get_ttl_time_characteristic(),
StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
cleanup_strategies = state_ttl_config.get_cleanup_strategies()
self.assertTrue(cleanup_strategies.is_cleanup_in_background())
self.assertTrue(cleanup_strategies.in_full_snapshot())
incremental_cleanup_strategy = cleanup_strategies.get_incremental_cleanup_strategy()
self.assertEqual(incremental_cleanup_strategy.get_cleanup_size(), 10)
self.assertTrue(incremental_cleanup_strategy.run_cleanup_for_every_record())
rocksdb_compact_filter_cleanup_strategy = \
cleanup_strategies.get_rocksdb_compact_filter_cleanup_strategy()
self.assertEqual(
rocksdb_compact_filter_cleanup_strategy.get_query_time_after_num_entries(), 1000)