blob: c168fc3617fa6eaf683cdb8272f431a9f7bb91d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.testutils.databases.h2.xa;
import org.apache.flink.util.function.ThrowingRunnable;
import org.h2.api.ErrorCode;
import org.h2.jdbc.JdbcConnection;
import org.h2.jdbcx.JdbcXAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.reflect.Field;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
/**
* Wraps H2 {@link XAResource} to:
*
* <ol>
* <li>reset <code>currentTransaction</code> field after a call to {@link XAResource#prepare
* prepare}. This allows to {@link XAResource#start start} a new transaction after preparing
* the current one. (see this <a
* href="http://h2-database.66688.n3.nabble.com/Possible-XA-bug-and-fix-td3303095.html">discussion</a>)
* <li>prevent {@link NullPointerException} when there is no active XA transaction in {@link
* XAResource#start start}, {@link XAResource#end end} (and throw appropriate {@link
* XAException} instead)
* </ol>
*
* <p>These fixes are just enough to test Flink XA-related functionality. They aren't enough for
* proper XA support.
*
* <p>TODO: fix the issue in the upstream.
*/
public class H2XaResourceWrapper implements XAResource {
private static final Field CURRENT_TRANSACTION_FIELD;
private static final Field IS_PREPARED_FIELD;
private static final Field PHY;
static {
try {
CURRENT_TRANSACTION_FIELD =
JdbcXAConnection.class.getDeclaredField("currentTransaction");
CURRENT_TRANSACTION_FIELD.setAccessible(true);
IS_PREPARED_FIELD = JdbcXAConnection.class.getDeclaredField("prepared");
IS_PREPARED_FIELD.setAccessible(true);
PHY = JdbcXAConnection.class.getDeclaredField("physicalConn");
PHY.setAccessible(true);
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private final XAResource wrapped;
private final Set<Xid> forgotten = new HashSet<>();
H2XaResourceWrapper(XAResource wrapped) {
this.wrapped = wrapped;
}
@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
ensureNotForgotten(xid);
Object currentTransaction = getCurrentTransaction();
if (onePhase) {
if (currentTransaction == null) {
// prevent NPE in wrapped
throw new XAException(XAException.XAER_PROTO);
} else if (!currentTransaction.equals(xid)) {
// guard from committing a wrong transaction:
// if onePhase=true then the underlying impl. commits current tx, not the one
// specified by xid
// (according to spec it should commit the ended but not prepared one - Section
// 2.3.2)
throw new XAException(XAException.XAER_INVAL);
}
}
finalizeTx(() -> wrapped.commit(xid, onePhase));
}
@Override
public void rollback(Xid xid) throws XAException {
ensureNotForgotten(xid);
setPrepared(
true); // if prepared=false then the underlying impl. will rollback current tx, not
// the one identified by xid
finalizeTx(() -> wrapped.rollback(xid));
}
@Override
public void end(Xid xid, int i) throws XAException {
Object currentTransaction = getCurrentTransaction();
if (currentTransaction == null) {
// prevent NPE in wrapped
throw new XAException(XAException.XAER_PROTO);
} else if (!currentTransaction.equals(xid)) {
// fix error code according to spec p.37:
// The argument xid must point to the same XID that was either passed to the xa_start()
// call or returned
// from the ax_reg() call that established the thread’s association; otherwise, an
// error, [XAER_NOTA], is
// returned
throw new XAException(XAException.XAER_NOTA);
}
wrapped.end(xid, i);
}
@Override
public int prepare(Xid xid) throws XAException {
// prevent NPE in wrapped and preparing the wrong transaction
if (getCurrentTransaction() == null || !xid.equals(getCurrentTransaction())) {
throw new UnsupportedOperationException(
"preparing a transaction that is not currently active not supported");
}
int ret = wrapped.prepare(xid);
setCurrentTransaction(
null); // prevent failure in start(); don't reset in in end() because end() actually
// does nothing
return ret;
}
@Override
public void forget(Xid xid) throws XAException {
wrapped.forget(xid);
forgotten.add(xid); // underlying implementation doesn't actually forget transactions
}
@Override
public int getTransactionTimeout() throws XAException {
return wrapped.getTransactionTimeout();
}
@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
return wrapped.isSameRM(xaResource);
}
@Override
public Xid[] recover(int i) throws XAException {
return wrapped.recover(i);
}
@Override
public boolean setTransactionTimeout(int i) throws XAException {
return wrapped.setTransactionTimeout(i);
}
@Override
public void start(Xid xid, int i) throws XAException {
wrapped.start(xid, i);
forgotten.remove(xid);
}
private boolean isPrepared() {
try {
return (boolean) IS_PREPARED_FIELD.get(wrapped);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private Object getCurrentTransaction() {
try {
return CURRENT_TRANSACTION_FIELD.get(wrapped);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private void ensureInTx() throws XAException {
if (getCurrentTransaction() == null) {
throw new XAException(XAException.XAER_OUTSIDE);
}
}
private void setCurrentTransaction(Object currentTransaction) {
try {
CURRENT_TRANSACTION_FIELD.set(wrapped, currentTransaction);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private void setPrepared(boolean prepared) {
try {
IS_PREPARED_FIELD.set(wrapped, prepared);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private XAException repackageXaException(XAException e) {
if (e.getCause() instanceof SQLException) {
if (((SQLException) e.getCause()).getErrorCode() == ErrorCode.TRANSACTION_NOT_FOUND_1) {
return new XAException(XAException.XAER_NOTA);
}
}
return e;
}
private JdbcConnection getJdbcConnection() {
try {
return (JdbcConnection) PHY.get(wrapped);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private void ensureNotForgotten(Xid xid) throws XAException {
if (forgotten.contains(xid)) {
throw new XAException(XAException.XAER_NOTA);
}
}
private void finalizeTx(ThrowingRunnable<XAException> runnable) throws XAException {
// underlying implementation nulls out current transaction just after commit
// or rollback
// which prevents it from being prepared afterwards
Object current = getCurrentTransaction();
try {
runnable.run();
} catch (XAException e) {
throw repackageXaException(e);
} finally {
setCurrentTransaction(current);
}
}
}