| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static javax.transaction.xa.XAException.XAER_NOTA; |
| import static javax.transaction.xa.XAException.XAER_RMFAIL; |
| import static javax.transaction.xa.XAException.XA_HEURCOM; |
| import static javax.transaction.xa.XAException.XA_HEURHAZ; |
| import static javax.transaction.xa.XAException.XA_HEURMIX; |
| import static javax.transaction.xa.XAException.XA_HEURRB; |
| import static javax.transaction.xa.XAException.XA_RBBASE; |
| import static javax.transaction.xa.XAException.XA_RBTIMEOUT; |
| import static javax.transaction.xa.XAException.XA_RBTRANSIENT; |
| import static javax.transaction.xa.XAResource.TMENDRSCAN; |
| import static javax.transaction.xa.XAResource.TMNOFLAGS; |
| import static javax.transaction.xa.XAResource.TMSTARTRSCAN; |
| import static java.util.Optional.empty; |
| import static java.util.Optional.of; |
| |
| import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils; |
| import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions; |
| import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ThrowingRunnable; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.sql.XAConnection; |
| import javax.sql.XADataSource; |
| import javax.transaction.xa.XAException; |
| import javax.transaction.xa.XAResource; |
| import javax.transaction.xa.Xid; |
| |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| |
| /** |
| * Default {@link org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade} implementation. |
| */ |
| public class XaFacadeImplAutoLoad |
| implements XaFacade { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImplAutoLoad.class); |
| private static final Set<Integer> TRANSIENT_ERR_CODES = |
| new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL)); |
| private static final Set<Integer> HEUR_ERR_CODES = |
| new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, XA_HEURMIX)); |
| private static final int MAX_RECOVER_CALLS = 100; |
| |
| private final JdbcConnectionOptions jdbcConnectionOptions; |
| private transient XAResource xaResource; |
| private transient Connection connection; |
| private transient XAConnection xaConnection; |
| |
| XaFacadeImplAutoLoad(JdbcConnectionOptions jdbcConnectionOptions) { |
| this.jdbcConnectionOptions = jdbcConnectionOptions; |
| } |
| |
| @Override |
| public void open() throws SQLException { |
| checkState(!isOpen(), "already connected"); |
| XADataSource ds; |
| try { |
| ds = (XADataSource) DataSourceUtils.buildCommonDataSource(jdbcConnectionOptions); |
| } |
| catch (Exception e) { |
| throw new SQLException(e); |
| } |
| xaConnection = ds.getXAConnection(); |
| xaResource = xaConnection.getXAResource(); |
| if (jdbcConnectionOptions.getTransactionTimeoutSec().isPresent()) { |
| try { |
| xaResource.setTransactionTimeout(jdbcConnectionOptions.getTransactionTimeoutSec().get()); |
| } |
| catch (XAException e) { |
| throw new SQLException(e); |
| } |
| } |
| connection = xaConnection.getConnection(); |
| connection.setReadOnly(false); |
| connection.setAutoCommit(false); |
| checkState(!connection.getAutoCommit()); |
| } |
| |
| @Override |
| public void close() throws SQLException { |
| if (connection != null) { |
| connection.close(); // close connection - likely a wrapper |
| connection = null; |
| } |
| try { |
| xaConnection.close(); // close likely a pooled AND the underlying connection |
| } |
| catch (SQLException e) { |
| // Some databases (e.g. MySQL) rollback changes on normal client disconnect which |
| // causes an exception if an XA transaction was prepared. Note that resources are |
| // still released in case of an error. Pinning MySQL connections doesn't help as |
| // SuspendableXAConnection has the same close() logic. |
| // Other DBs don't rollback, e.g. for PgSql the previous connection.close() call |
| // disassociates the connection (and that call works because it has a check for XA) |
| // and rollback() is not called. |
| // In either case, not closing the XA connection here leads to the resource leak. |
| LOG.warn("unable to close XA connection", e); |
| } |
| xaResource = null; |
| } |
| |
| @Override |
| public Connection getConnection() { |
| checkNotNull(connection); |
| return connection; |
| } |
| |
| @Override |
| public boolean isConnectionValid() throws SQLException { |
| return isOpen() && connection.isValid(connection.getNetworkTimeout()); |
| } |
| |
| @Override |
| public Connection getOrEstablishConnection() throws SQLException { |
| if (!isOpen()) { |
| open(); |
| } |
| return connection; |
| } |
| |
| @Override |
| public void closeConnection() { |
| try { |
| close(); |
| } |
| catch (SQLException e) { |
| LOG.warn("Connection close failed.", e); |
| } |
| } |
| |
| @Override |
| public Connection reestablishConnection() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void start(Xid xid) { |
| execute(Command.fromRunnable("start", xid, () -> xaResource.start(xid, TMNOFLAGS))); |
| } |
| |
| @Override |
| public void endAndPrepare(Xid xid) { |
| execute(Command.fromRunnable("end", xid, () -> xaResource.end(xid, XAResource.TMSUCCESS))); |
| int prepResult = execute(new Command<>("prepare", of(xid), () -> xaResource.prepare(xid))); |
| if (prepResult == XAResource.XA_RDONLY) { |
| throw new EmptyXaTransactionException(xid); |
| } |
| else if (prepResult != XAResource.XA_OK) { |
| throw new RuntimeException( |
| formatErrorMessage("prepare", of(xid), empty(), "response: " + prepResult)); |
| } |
| } |
| |
| @Override |
| public void failAndRollback(Xid xid) { |
| execute( |
| Command.fromRunnable( |
| "end (fail)", |
| xid, |
| () -> { |
| xaResource.end(xid, XAResource.TMFAIL); |
| xaResource.rollback(xid); |
| }, |
| err -> { |
| if (err.errorCode >= XA_RBBASE) { |
| rollback(xid); |
| } |
| else { |
| LOG.warn( |
| formatErrorMessage( |
| "end (fail)", of(xid), of(err.errorCode))); |
| } |
| })); |
| } |
| |
| @Override |
| public void commit(Xid xid, boolean ignoreUnknown) { |
| execute( |
| Command.fromRunnableRecoverByWarn( |
| "commit", |
| xid, |
| () -> |
| xaResource.commit( |
| xid, |
| false /* not onePhase because the transaction should be prepared already */), |
| e -> buildCommitErrorDesc(e, ignoreUnknown))); |
| } |
| |
| @Override |
| public void rollback(Xid xid) { |
| execute( |
| Command.fromRunnableRecoverByWarn( |
| "rollback", |
| xid, |
| () -> xaResource.rollback(xid), |
| this::buildRollbackErrorDesc)); |
| } |
| |
| private void forget(Xid xid) { |
| execute( |
| Command.fromRunnableRecoverByWarn( |
| "forget", |
| xid, |
| () -> xaResource.forget(xid), |
| e -> of("manual cleanup may be required"))); |
| } |
| |
| @Override |
| public Collection<Xid> recover() { |
| return execute( |
| new Command<>( |
| "recover", |
| empty(), |
| () -> { |
| List<Xid> list = recover(TMSTARTRSCAN); |
| try { |
| for (int i = 0; list.addAll(recover(TMNOFLAGS)); i++) { |
| // H2 sometimes returns same tx list here - should probably use |
| // recover(TMSTARTRSCAN | TMENDRSCAN) |
| checkState( |
| i < MAX_RECOVER_CALLS, "too many xa_recover() calls"); |
| } |
| } |
| finally { |
| recover(TMENDRSCAN); |
| } |
| return list; |
| })); |
| } |
| |
| @Override |
| public boolean isOpen() { |
| return xaResource != null; |
| } |
| |
| private List<Xid> recover(int flags) throws XAException { |
| return Arrays.asList(xaResource.recover(flags)); |
| } |
| |
| private <T> T execute(Command<T> cmd) throws RuntimeException { |
| checkState(isOpen(), "not connected"); |
| LOG.debug("{}, xid={}", cmd.name, cmd.xid); |
| try { |
| T result = cmd.callable.call(); |
| LOG.trace("{} succeeded , xid={}", cmd.name, cmd.xid); |
| return result; |
| } |
| catch (XAException e) { |
| if (HEUR_ERR_CODES.contains(e.errorCode)) { |
| cmd.xid.ifPresent(this::forget); |
| } |
| return cmd.recover.apply(e).orElseThrow(() -> wrapException(cmd.name, cmd.xid, e)); |
| } |
| catch (RuntimeException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| throw wrapException(cmd.name, cmd.xid, e); |
| } |
| } |
| |
| private static RuntimeException wrapException( |
| String action, Optional<Xid> xid, Exception ex) { |
| if (ex instanceof XAException) { |
| XAException xa = (XAException) ex; |
| if (TRANSIENT_ERR_CODES.contains(xa.errorCode)) { |
| throw new TransientXaException(xa); |
| } |
| else { |
| throw new RuntimeException( |
| formatErrorMessage(action, xid, of(xa.errorCode), xa.getMessage())); |
| } |
| } |
| else { |
| throw new RuntimeException( |
| formatErrorMessage(action, xid, empty(), ex.getMessage()), ex); |
| } |
| } |
| |
| private Optional<String> buildCommitErrorDesc(XAException err, boolean ignoreUnknown) { |
| if (err.errorCode == XA_HEURCOM) { |
| return Optional.of("transaction was heuristically committed earlier"); |
| } |
| else if (ignoreUnknown && err.errorCode == XAER_NOTA) { |
| return Optional.of("transaction is unknown to RM (ignoring)"); |
| } |
| else { |
| return empty(); |
| } |
| } |
| |
| private Optional<String> buildRollbackErrorDesc(XAException err) { |
| if (err.errorCode == XA_HEURRB) { |
| return Optional.of("transaction was already heuristically rolled back"); |
| } |
| else if (err.errorCode >= XA_RBBASE) { |
| return Optional.of("transaction was already marked for rollback"); |
| } |
| else { |
| return empty(); |
| } |
| } |
| |
| private static String formatErrorMessage( |
| String action, Optional<Xid> xid, Optional<Integer> errorCode, String... more) { |
| return String.format( |
| "unable to %s%s%s%s", |
| action, |
| xid.map(x -> " XA transaction, xid: " + x).orElse(""), |
| errorCode |
| .map(code -> String.format(", error %d: %s", code, descError(code))) |
| .orElse(""), |
| more == null || more.length == 0 ? "" : ". " + Arrays.toString(more)); |
| } |
| |
| /** |
| * @return error description from {@link XAException} javadoc from to ease debug. |
| */ |
| private static String descError(int code) { |
| switch (code) { |
| case XA_HEURCOM: |
| return "heuristic commit decision was made"; |
| case XAException.XA_HEURHAZ: |
| return "heuristic decision may have been made"; |
| case XAException.XA_HEURMIX: |
| return "heuristic mixed decision was made"; |
| case XA_HEURRB: |
| return "heuristic rollback decision was made"; |
| case XAException.XA_NOMIGRATE: |
| return "the transaction resumption must happen where the suspension occurred"; |
| case XAException.XA_RBCOMMFAIL: |
| return "rollback happened due to a communications failure"; |
| case XAException.XA_RBDEADLOCK: |
| return "rollback happened because deadlock was detected"; |
| case XAException.XA_RBINTEGRITY: |
| return "rollback happened because an internal integrity check failed"; |
| case XAException.XA_RBOTHER: |
| return "rollback happened for some reason not fitting any of the other rollback error codes"; |
| case XAException.XA_RBPROTO: |
| return "rollback happened due to a protocol error in the resource manager"; |
| case XAException.XA_RBROLLBACK: |
| return "rollback happened for an unspecified reason"; |
| case XA_RBTIMEOUT: |
| return "rollback happened because of a timeout"; |
| case XA_RBTRANSIENT: |
| return "rollback happened due to a transient failure"; |
| case XAException.XA_RDONLY: |
| return "the transaction branch was read-only, and has already been committed"; |
| case XAException.XA_RETRY: |
| return "the method invoked returned without having any effect, and that it may be invoked again"; |
| case XAException.XAER_ASYNC: |
| return "an asynchronous operation is outstanding"; |
| case XAException.XAER_DUPID: |
| return "Xid given as an argument is already known to the resource manager"; |
| case XAException.XAER_INVAL: |
| return "invalid arguments were passed"; |
| case XAER_NOTA: |
| return "Xid is not valid"; |
| case XAException.XAER_OUTSIDE: |
| return "the resource manager is doing work outside the global transaction"; |
| case XAException.XAER_PROTO: |
| return "protocol error"; |
| case XAException.XAER_RMERR: |
| return "resource manager error has occurred"; |
| case XAER_RMFAIL: |
| return "the resource manager has failed and is not available"; |
| default: |
| return ""; |
| } |
| } |
| |
| private static class Command<T> { |
| private final String name; |
| private final Optional<Xid> xid; |
| private final Callable<T> callable; |
| private final Function<XAException, Optional<T>> recover; |
| |
| static Command<Object> fromRunnable( |
| String action, Xid xid, ThrowingRunnable<XAException> runnable) { |
| return fromRunnable( |
| action, |
| xid, |
| runnable, |
| e -> { |
| throw wrapException(action, of(xid), e); |
| }); |
| } |
| |
| static Command<Object> fromRunnableRecoverByWarn( |
| String action, |
| Xid xid, |
| ThrowingRunnable<XAException> runnable, |
| Function<XAException, Optional<String>> err2msg) { |
| return fromRunnable( |
| action, |
| xid, |
| runnable, |
| e -> |
| LOG.warn( |
| formatErrorMessage( |
| action, |
| of(xid), |
| of(e.errorCode), |
| err2msg.apply(e) |
| .orElseThrow( |
| () -> |
| wrapException( |
| action, of(xid), e))))); |
| } |
| |
| private static Command<Object> fromRunnable( |
| String action, |
| Xid xid, |
| ThrowingRunnable<XAException> runnable, |
| Consumer<XAException> recover) { |
| return new Command<>( |
| action, |
| of(xid), |
| () -> { |
| runnable.run(); |
| return null; |
| }, |
| e -> { |
| recover.accept(e); |
| return Optional.of(""); |
| }); |
| } |
| |
| private Command(String name, Optional<Xid> xid, Callable<T> callable) { |
| this(name, xid, callable, e -> empty()); |
| } |
| |
| private Command( |
| String name, |
| Optional<Xid> xid, |
| Callable<T> callable, |
| Function<XAException, Optional<T>> recover) { |
| this.name = name; |
| this.xid = xid; |
| this.callable = callable; |
| this.recover = recover; |
| } |
| } |
| } |