blob: cf412da334e73e9baf2cf30afa7568c6ac55dc7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.stream.Stream;
@Disabled("Temporary fast fix, reason: JdbcDatabaseContainer: ClassNotFoundException: com.mysql.jdbc.Driver")
class XaGroupOpsImplIT {
private static final Logger LOGGER = LoggerFactory.getLogger(XaGroupOpsImplIT.class);
private MySQLContainer<?> mc;
private XaGroupOps xaGroupOps;
private SemanticXidGenerator xidGenerator;
private JdbcConnectionOptions jdbcConnectionOptions;
private XaFacade xaFacade;
private XAResource xaResource;
@BeforeEach
void before() throws Exception {
// Non-root users need to grant XA_RECOVER_ADMIN permission
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
.withUsername("root")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(mc)).join();
jdbcConnectionOptions = JdbcConnectionOptions.builder()
.withUrl(mc.getJdbcUrl())
.withUsername(mc.getUsername())
.withPassword(mc.getPassword())
.withXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource")
.build();
xidGenerator = new SemanticXidGenerator();
xidGenerator.open();
xaFacade = new XaFacadeImplAutoLoad(jdbcConnectionOptions);
xaFacade.open();
xaGroupOps = new XaGroupOpsImpl(xaFacade);
XADataSource xaDataSource = (XADataSource) DataSourceUtils.buildCommonDataSource(jdbcConnectionOptions);
xaResource = xaDataSource.getXAConnection().getXAResource();
}
@Test
void testRecoverAndRollback() throws Exception {
JobContext jobContext = new JobContext();
SinkWriter.Context writerContext1 = new DefaultSinkWriterContext(1);
Xid xid1 =
xidGenerator.generateXid(jobContext, writerContext1, System.currentTimeMillis());
Xid xid2 =
xidGenerator.generateXid(jobContext, writerContext1, System.currentTimeMillis() + 1);
xaFacade.start(xid1);
xaFacade.endAndPrepare(xid1);
xaFacade.start(xid2);
xaFacade.endAndPrepare(xid2);
Assertions.assertTrue(checkPreparedXid(xid1));
Assertions.assertTrue(checkPreparedXid(xid2));
xaGroupOps.recoverAndRollback(jobContext, writerContext1, xidGenerator, xid2);
Assertions.assertFalse(checkPreparedXid(xid1));
Assertions.assertTrue(checkPreparedXid(xid2));
}
private boolean checkPreparedXid(Xid xidCrr) throws XAException {
Xid[] recover = xaResource.recover(TMSTARTRSCAN);
for (int i = 0; i < recover.length; i++) {
XidImpl xid = new XidImpl(recover[i].getFormatId(), recover[i].getGlobalTransactionId(),
recover[i].getBranchQualifier());
if (xid.equals(xidCrr)) {
return true;
}
}
return false;
}
@AfterEach
public void closePostgreSqlContainer() {
if (mc != null) {
mc.stop();
}
}
}