blob: b1487f8a2183d2ce623cb9ec7c9ef140cbd336cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.aries.tx.control.service.xa.impl;
import static javax.transaction.xa.XAResource.XA_OK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.osgi.service.transaction.control.TransactionStatus.ROLLED_BACK;
import java.io.File;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import org.h2.jdbcx.JdbcDataSource;
import org.h2.tools.Server;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.osgi.service.transaction.control.TransactionException;
import org.osgi.service.transaction.control.TransactionRolledBackException;
import org.osgi.service.transaction.control.TransactionStatus;
import org.osgi.service.transaction.control.recovery.RecoverableXAResource;
/**
* The tests in this class look a little odd because we're using an
* unmanaged resource. This is to avoid creating a dependency on a
* JDBCResourceProvider just for the tests, and to give explicit
* control of when things get registered
*
*/
@RunWith(MockitoJUnitRunner.class)
public class TransactionLogTest {
@Mock
BundleContext ctx;
@Mock
ServiceReference<RecoverableXAResource> serviceRef;
TransactionControlImpl txControl;
JdbcDataSource dataSource;
Server server;
@Before
public void setUp() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("recovery.log.enabled", true);
config.put("recovery.log.dir", "target/recovery-test/recoverylog");
txControl = new TransactionControlImpl(ctx, config);
setupServerAndDataSource();
try (Connection conn = dataSource.getConnection()) {
Statement s = conn.createStatement();
try {s.execute("DROP TABLE TEST_TABLE");} catch (SQLException sqle) {}
s.execute("CREATE TABLE TEST_TABLE ( message varchar(255) )");
}
}
private void setupServerAndDataSource() throws SQLException {
server = Server.createTcpServer("-tcpPort", "0");
server.start();
File dbPath = new File("target/recovery-test/database");
dataSource = new JdbcDataSource();
dataSource.setUrl("jdbc:h2:tcp://127.0.0.1:" + server.getPort() + "/" + dbPath.getAbsolutePath());
}
@After
public void destroy() {
txControl.destroy();
try (Connection conn = dataSource.getConnection()) {
conn.createStatement().execute("shutdown immediately");
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
delete(new File("target/recovery-test"));
}
private void delete(File file) {
if(file.isDirectory()) {
for(File f : file.listFiles()) {
delete(f);
}
}
file.delete();
}
@Test
public void testRequiredNoRecovery() throws Exception {
XAConnection xaConn = dataSource.getXAConnection();
try {
txControl.required(() -> {
txControl.getCurrentContext().registerXAResource(xaConn.getXAResource(), null);
Connection conn = xaConn.getConnection();
// conn.setAutoCommit(false);
return conn.createStatement()
.execute("Insert into TEST_TABLE values ( 'Hello World!' )");
});
} finally {
xaConn.close();
}
try (Connection conn = dataSource.getConnection()) {
ResultSet rs = conn.createStatement()
.executeQuery("Select * from TEST_TABLE");
rs.next();
assertEquals("Hello World!", rs.getString(1));
assertFalse(rs.next());
}
}
@Test
public void testRequired2PCNoRecovery() throws Exception {
XAConnection xaConn = dataSource.getXAConnection();
XAConnection xaConn2 = dataSource.getXAConnection();
try {
txControl.required(() -> {
txControl.getCurrentContext().registerXAResource(xaConn.getXAResource(), null);
txControl.getCurrentContext().registerXAResource(xaConn2.getXAResource(), null);
Connection conn = xaConn.getConnection();
// conn.setAutoCommit(false);
Connection conn2 = xaConn2.getConnection();
conn2.setAutoCommit(false);
conn.createStatement()
.execute("Insert into TEST_TABLE values ( 'Hello World!' )");
return conn2.createStatement()
.execute("Insert into TEST_TABLE values ( 'Hello World 2!' )");
});
} finally {
xaConn.close();
}
try (Connection conn = dataSource.getConnection()) {
ResultSet rs = conn.createStatement()
.executeQuery("Select * from TEST_TABLE order by message DESC");
rs.next();
assertEquals("Hello World!", rs.getString(1));
rs.next();
assertEquals("Hello World 2!", rs.getString(1));
assertFalse(rs.next());
}
}
@Test
public void testRequiredRecoverable() throws Exception {
XAConnection xaConn = dataSource.getXAConnection();
try {
txControl.required(() -> {
txControl.getCurrentContext().registerXAResource(xaConn.getXAResource(), "foo");
Connection conn = xaConn.getConnection();
// conn.setAutoCommit(false);
return conn.createStatement()
.execute("Insert into TEST_TABLE values ( 'Hello World!' )");
});
} finally {
xaConn.close();
}
try (Connection conn = dataSource.getConnection()) {
ResultSet rs = conn.createStatement()
.executeQuery("Select * from TEST_TABLE");
rs.next();
assertEquals("Hello World!", rs.getString(1));
}
}
@Test
public void testRequiredRecoveryRequiredPrePrepare() throws Exception {
doRecoveryRequired((good, poison) -> {
txControl.getCurrentContext().registerXAResource(poison, null);
txControl.getCurrentContext().registerXAResource(good, "foo");
}, TransactionStatus.ROLLED_BACK);
boolean success = false;
XAConnection conn = dataSource.getXAConnection();
for(int i=0; i < 5; i++) {
if(conn.getXAResource().recover(XAResource.TMSTARTRSCAN).length == 0) {
success = true;
break;
} else {
// Wait for recovery to happen!
Thread.sleep(500);
}
}
assertTrue("No recovery in time", success);
}
@Test
public void testRequiredRecoveryRequiredPostPrepare() throws Exception {
doRecoveryRequired((good, poison) -> {
txControl.getCurrentContext().registerXAResource(good, "foo");
txControl.getCurrentContext().registerXAResource(poison, null);
}, TransactionStatus.COMMITTED);
boolean success = false;
for(int i=0; i < 5; i++) {
try (Connection conn = dataSource.getConnection()) {
ResultSet rs = conn.createStatement()
.executeQuery("Select * from TEST_TABLE");
if(rs.next()) {
assertEquals("Hello World!", rs.getString(1));
success = true;
break;
} else {
// Wait for recovery to happen!
Thread.sleep(500);
}
}
}
assertTrue("No recovery in time", success);
}
public void doRecoveryRequired(BiConsumer<XAResource, XAResource> ordering,
TransactionStatus expectedFinalState) throws Exception {
//Register the recoverable resource
ArgumentCaptor<ServiceListener> captor = ArgumentCaptor.forClass(ServiceListener.class);
Mockito.verify(ctx).addServiceListener(captor.capture(), Mockito.anyString());
Mockito.when(ctx.getService(serviceRef)).thenReturn(new TestRecoverableResource("foo", dataSource));
captor.getValue().serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, serviceRef));
XAConnection xaConn = dataSource.getXAConnection();
AtomicReference<TransactionStatus> ref = new AtomicReference<TransactionStatus>();
try {
txControl.required(() -> {
txControl.getCurrentContext().postCompletion(ref::set);
Connection conn = xaConn.getConnection();
// conn.setAutoCommit(false);
XAResource dsResource = xaConn.getXAResource();
XAResource poison = Mockito.mock(XAResource.class);
Mockito.when(poison.prepare(Mockito.any())).thenAnswer(i -> {
// Now kill the db server before it commits!
conn.createStatement().execute("shutdown immediately");
Thread.sleep(1000);
return XA_OK;
});
ordering.accept(dsResource, poison);
return conn.createStatement()
.execute("Insert into TEST_TABLE values ( 'Hello World!' )");
});
} catch (TransactionException te) {
assertEquals(expectedFinalState, ref.get());
assertEquals(expectedFinalState == ROLLED_BACK, te instanceof TransactionRolledBackException);
} finally {
try {
xaConn.close();
} catch (SQLException sqle) {}
}
setupServerAndDataSource();
}
static class TestRecoverableResource implements RecoverableXAResource {
private final String id;
private final JdbcDataSource dataSource;
public TestRecoverableResource(String id, JdbcDataSource dataSource) {
this.id = id;
this.dataSource = dataSource;
}
@Override
public String getId() {
return id;
}
@Override
public XAResource getXAResource() throws Exception {
XAConnection xaConnection = dataSource.getXAConnection();
if(xaConnection.getConnection().isValid(2)) {
return xaConnection.getXAResource();
} else {
return null;
}
}
@Override
public void releaseXAResource(XAResource xaRes) {
// This is valid for H2;
try {
((XAConnection) xaRes).close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Test
public void testRequiredWithRollback() throws Exception {
XAConnection xaConn = dataSource.getXAConnection();
try {
txControl.required(() -> {
txControl.getCurrentContext().registerXAResource(xaConn.getXAResource(), null);
Connection conn = xaConn.getConnection();
// conn.setAutoCommit(false);
conn.createStatement()
.execute("Insert into TEST_TABLE values ( 'Hello World!' )");
txControl.setRollbackOnly();
return null;
});
} finally {
xaConn.close();
}
try (Connection conn = dataSource.getConnection()) {
ResultSet rs = conn.createStatement()
.executeQuery("Select * from TEST_TABLE");
assertFalse(rs.next());
}
}
}