blob: 12d3762d1f1eba769bf0804b8256e642e18a5070 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.xa;
import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.junit.jupiter.api.Test;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Collection;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** {@link XaFacadeImpl} tests. */
class JdbcXaFacadeImplTest extends JdbcTestBase {
private static final Xid XID =
new Xid() {
@Override
public int getFormatId() {
return 0;
}
@Override
public byte[] getGlobalTransactionId() {
return "01234".getBytes();
}
@Override
public byte[] getBranchQualifier() {
return "56789".getBytes();
}
};
@Test
void testRecover() throws Exception {
try (XaFacade f = XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource())) {
f.open();
assertThat(f.recover()).isEmpty();
f.start(XID);
// insert some data to prevent database from ignoring the transaction
try (Connection c = f.getConnection()) {
try (Statement s = c.createStatement()) {
s.executeUpdate(JdbcTestFixture.getInsertQuery());
}
}
f.endAndPrepare(XID);
}
try (XaFacade f = XaFacadeImpl.fromXaDataSource(getMetadata().buildXaDataSource())) {
f.open();
Collection<Xid> recovered = f.recover();
recovered.forEach(f::rollback);
assertThat(recovered).hasSize(1);
}
}
@Test
void testClose() throws Exception {
// some drivers (derby, H2) close both connection on either
// connection.close/xaConnection.close() call, so use mocks here to:
// a) prevent closing XA connection from connection.close()
// b) verify that both connections were closed
XADataSource xaDataSource = mock(XADataSource.class);
XAConnection xaConnection = mock(XAConnection.class);
Connection connection = mock(Connection.class);
when(xaDataSource.getXAConnection()).thenReturn(xaConnection);
when(xaConnection.getConnection()).thenReturn(connection);
try (XaFacade f = XaFacadeImpl.fromXaDataSource(xaDataSource)) {
f.open();
}
verify(connection).close();
verify(xaConnection).close();
}
}