title: “Howto test a batch source with the new Source framework” date: “2023-05-12T08:00:00.000Z” authors:
The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article . Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.
Example Cassandra SplitSerializer and SplitEnumeratorStateSerializer
In the previous article, we created serializers for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde we create an object, serialize it using the serializer and then deserialize it using the same serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need to be implemented for the serialized objects.
Of course, we also need to unit test low level processing such as query building for example or any processing that does not require a running backend.
For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:
Example Cassandra SourceITCase
For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called differently if the test belongs to somewhere else. The class extends SourceTestSuiteBase . This test suite provides all the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for batch and streaming sources, so for our batch source case here, the tests below need to be disabled as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase and annotating them with @Disabled
:
Of course we can add our own integration tests cases for example tests on limits, tests on low level splitting or any test that requires a running backend. But for most cases we only need to provide Flink test environment classes to configure the ITCase:
We add this annotated field to our ITCase and we're done
@TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();
Example Cassandra TestEnvironment
To test the connector we need a backend to run the connector against. This TestEnvironment provides everything related to the backend: the container, its configuration, the session to connect to it, and all the elements bound to the whole test case (table space, initialization requests ...)
We add this annotated field to our ITCase
@TestExternalSystem MyBackendTestEnvironment backendTestEnvironment = new MyBackendTestEnvironment();
To integrate with JUnit5 BackendTestEnvironment implements TestResource . This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc...) by implementing startup() and tearDown() methods. For that we advise the use of testContainers that relies on docker images to provide a real backend instance (not a mock) that is representative for integration tests. Several backends are supported out of the box by testContainers. We need to configure test containers that way:
In big data execution engines, there are 2 levels of guarantee regarding source and sinks:
By the following code we verify that the source supports exactly once semantics:
@TestSemantics CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
That being said, we could encounter a problem while running the tests : the default assertions in the Flink source test framework assume that the data is read in the same order it was written. This is untrue for most big data backends where ordering is usually not deterministic. To support unordered checks and still use all the framework provided tests, we need to override SourceTestSuiteBase#checkResultWithSemantic in out ITCase:
@Override protected void checkResultWithSemantic( CloseableIterator<Pojo> resultIterator, List<List<Pojo>> testData, CheckpointingMode semantic, Integer limit) { if (limit != null) { Runnable runnable = () -> CollectIteratorAssertions.assertUnordered(resultIterator) .withNumRecordsLimit(limit) .matchesRecordsFromSource(testData, semantic); assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); } else { CollectIteratorAssertions.assertUnordered(resultIterator) .matchesRecordsFromSource(testData, semantic); } }
This is a copy-paste of the parent method where CollectIteratorAssertions.assertOrdered() is replaced by CollectIteratorAssertions.assertUnordered().
The test context provides Flink with means to interact with the backend, like inserting test data, creating tables or constructing the source. It is scoped to the test case (and not to the test suite).
It is linked to the ITCase through a factory of TestContext as shown below.
@TestContext TestContextFactory contextFactory = new TestContextFactory(testEnvironment);
TestContext implements DataStreamSourceExternalContext:
Lately, the Flink community has externalized all the connectors to external repositories that are sub-repositories of the official Apache Flink repository. This is mainly to decouple the release of Flink to the release of the connectors. To distribute the created source, we need to follow this official wiki page .
This concludes the series of articles about creating a batch source with the new Flink framework. This was needed as, apart from the javadocs, the documentation about testing is missing for now. I hope you enjoyed reading and I hope the Flink community will receive a source PR from you soon :)