blob: 67b22d2062b12270df28f50fa5e4d0164a656a1d [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.datastream import (CheckpointConfig, CheckpointingMode, ExternalizedCheckpointCleanup,
StreamExecutionEnvironment)
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
class CheckpointConfigTests(PyFlinkTestCase):
def setUp(self):
self.env = StreamExecutionEnvironment\
.get_execution_environment()
self.checkpoint_config = self.env.get_checkpoint_config()
def test_constant(self):
gateway = get_gateway()
JCheckpointConfig = gateway.jvm.org.apache.flink.streaming.api.environment.CheckpointConfig
self.assertEqual(CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS,
JCheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS)
self.assertEqual(CheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS,
JCheckpointConfig.DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS)
self.assertEqual(CheckpointConfig.DEFAULT_TIMEOUT, JCheckpointConfig.DEFAULT_TIMEOUT)
self.assertEqual(CheckpointConfig.DEFAULT_MODE,
CheckpointingMode._from_j_checkpointing_mode(
JCheckpointConfig.DEFAULT_MODE))
def test_is_checkpointing_enabled(self):
self.assertFalse(self.checkpoint_config.is_checkpointing_enabled())
self.env.enable_checkpointing(1000)
self.assertTrue(self.checkpoint_config.is_checkpointing_enabled())
def test_get_set_checkpointing_mode(self):
self.assertEqual(self.checkpoint_config.get_checkpointing_mode(),
CheckpointingMode.EXACTLY_ONCE)
self.checkpoint_config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
self.assertEqual(self.checkpoint_config.get_checkpointing_mode(),
CheckpointingMode.AT_LEAST_ONCE)
self.checkpoint_config.set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
self.assertEqual(self.checkpoint_config.get_checkpointing_mode(),
CheckpointingMode.EXACTLY_ONCE)
def test_get_set_checkpoint_interval(self):
self.assertEqual(self.checkpoint_config.get_checkpoint_interval(), -1)
self.checkpoint_config.set_checkpoint_interval(1000)
self.assertEqual(self.checkpoint_config.get_checkpoint_interval(), 1000)
def test_get_set_checkpoint_timeout(self):
self.assertEqual(self.checkpoint_config.get_checkpoint_timeout(), 600000)
self.checkpoint_config.set_checkpoint_timeout(300000)
self.assertEqual(self.checkpoint_config.get_checkpoint_timeout(), 300000)
def test_get_set_min_pause_between_checkpoints(self):
self.assertEqual(self.checkpoint_config.get_min_pause_between_checkpoints(), 0)
self.checkpoint_config.set_min_pause_between_checkpoints(100000)
self.assertEqual(self.checkpoint_config.get_min_pause_between_checkpoints(), 100000)
def test_get_set_max_concurrent_checkpoints(self):
self.assertEqual(self.checkpoint_config.get_max_concurrent_checkpoints(), 1)
self.checkpoint_config.set_max_concurrent_checkpoints(2)
self.assertEqual(self.checkpoint_config.get_max_concurrent_checkpoints(), 2)
def test_get_set_fail_on_checkpointing_errors(self):
self.assertTrue(self.checkpoint_config.is_fail_on_checkpointing_errors())
self.checkpoint_config.set_fail_on_checkpointing_errors(False)
self.assertFalse(self.checkpoint_config.is_fail_on_checkpointing_errors())
def test_get_set_externalized_checkpoints_cleanup(self):
self.assertFalse(self.checkpoint_config.is_externalized_checkpoints_enabled())
self.assertIsNone(self.checkpoint_config.get_externalized_checkpoint_cleanup())
self.checkpoint_config.enable_externalized_checkpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
self.assertTrue(self.checkpoint_config.is_externalized_checkpoints_enabled())
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(),
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
self.checkpoint_config.enable_externalized_checkpoints(
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(),
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
def test_get_set_prefer_checkpoint_for_recovery(self):
self.assertFalse(self.checkpoint_config.is_prefer_checkpoint_for_recovery())
self.checkpoint_config.set_prefer_checkpoint_for_recovery(True)
self.assertTrue(self.checkpoint_config.is_prefer_checkpoint_for_recovery())
def test_is_unaligned_checkpointing_enabled(self):
self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())
self.checkpoint_config.enable_unaligned_checkpoints()
self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())
self.checkpoint_config.disable_unaligned_checkpoints()
self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())
self.checkpoint_config.enable_unaligned_checkpoints(True)
self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())