blob: 1f899af76321acf24cf7ae7e9e506c564fe2f5b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.xa;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.junit.jupiter.api.Test;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* {@link JdbcXaSinkFunction} tests using Derby DB. Derby supports XA but doesn't use MVCC, so we
* can't check anything before all transactions are completed.
*/
class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
/**
* checkpoint > capture state > emit > snapshot > close > init(captured state), open > emit >
* checkpoint.
*/
@Test
void noDuplication() throws Exception {
sinkHelper.notifyCheckpointComplete(0);
TestXaSinkStateHandler newState = new TestXaSinkStateHandler();
newState.store(sinkHelper.getState().load(null));
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.close(); // todo: test without close
sinkHelper = buildSinkHelper(newState);
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP0);
xaHelper.assertDbContentsEquals(JdbcTestFixture.CP0);
}
@Test
void testTxEndedOnClose() throws Exception {
sinkHelper.emit(
TEST_DATA[0]); // don't snapshotState to prevent transaction from being prepared
sinkHelper.close();
xaHelper.assertPreparedTxCountEquals(0);
}
@Test
void testTxRollbackOnStartup() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
xaHelper.assertPreparedTxCountEquals(1);
sinkHelper.close();
xaHelper.assertPreparedTxCountEquals(1);
TestXaSinkStateHandler state =
new TestXaSinkStateHandler(); // forget about prepared tx, so it will not commit
// them and recover and rollback instead
xaHelper.assertPreparedTxCountEquals(1); // tx should still be there
buildAndInit(); // should cleanup on startup
xaHelper.assertPreparedTxCountEquals(0);
assertThat(xaHelper.countInDb()).isEqualTo(0);
}
@Test
void testRestoreWithNotificationMissing() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.close();
sinkHelper = buildSinkHelper(sinkHelper.getState());
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP1);
xaHelper.assertDbContentsEquals(JdbcTestFixture.CP0, JdbcTestFixture.CP1);
}
@Test
void testCommitUponStart() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.close();
buildAndInit(
0,
XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource()),
sinkHelper.getState());
xaHelper.assertDbContentsEquals(JdbcTestFixture.CP0);
}
/** RM may return {@link javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} error. */
@Test
void testEmptyCheckpoint() throws Exception {
sinkHelper.snapshotState(0);
}
@Test
void testTwoCheckpointsComplete1st() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
JdbcXaSinkTestHelper sinkHelper = this.sinkHelper;
long checkpointId = JdbcTestFixture.CP0.id;
sinkHelper.notifyCheckpointComplete(checkpointId);
xaHelper.cancelAllTx(); // cancel 2nd tx to prevent the following read from being blocked
xaHelper.assertDbContentsEquals(JdbcTestFixture.CP0);
}
@Test
void testTwoCheckpointsComplete2nd() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP1);
xaHelper.assertDbContentsEquals(
JdbcTestFixture.CP0,
JdbcTestFixture.CP1); // "both records should be inserted after the last snapshot
// completed."
}
@Test
void testTwoCheckpointsCompleteBoth() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP0.id);
sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP1.id);
xaHelper.assertDbContentsEquals(
JdbcTestFixture.CP0,
JdbcTestFixture.CP1); // "both records should be inserted after the last snapshot
// completed."
}
@Test
void testTwoCheckpointsCompleteBothOutOfOrder() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP1.id);
sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP0.id);
xaHelper.assertDbContentsEquals(
JdbcTestFixture.CP0,
JdbcTestFixture.CP1); // "both records should be inserted after the last snapshot
// completed."
}
@Test
void testRestore() throws Exception {
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP0);
sinkHelper.close();
sinkHelper = new JdbcXaSinkTestHelper(buildAndInit(), new TestXaSinkStateHandler());
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP1);
xaHelper.assertDbContentsEquals(JdbcTestFixture.CP0, JdbcTestFixture.CP1);
}
@Test
void testFailurePropagation() throws Exception {
/* big enough flush interval to cause error in snapshotState rather than in invoke*/
sinkHelper =
new JdbcXaSinkTestHelper(
buildAndInit(
Integer.MAX_VALUE,
XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource())),
new TestXaSinkStateHandler());
sinkHelper.emit(TEST_DATA[0]);
sinkHelper.emit(TEST_DATA[0]); // duplicate
assertThatThrownBy(() -> sinkHelper.snapshotState(0)).isInstanceOf(Exception.class);
}
}