blob: 07a06d8b8c774624cb3c3b481c26c318ffcc2069 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.cache;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.TransactionInDoubtException;
import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ReliableReplyException;
import org.apache.geode.distributed.internal.ReliableReplyProcessor21;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXRemoteCommitMessage.RemoteCommitResponse;
import org.apache.geode.internal.cache.tx.BucketTXRegionStub;
import org.apache.geode.internal.cache.tx.DistributedTXRegionStub;
import org.apache.geode.internal.cache.tx.PartitionedTXRegionStub;
import org.apache.geode.internal.cache.tx.TXRegionStub;
import org.apache.geode.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class PeerTXStateStub extends TXStateStub {
protected static final Logger logger = LogService.getLogger();
private InternalDistributedMember originatingMember = null;
protected TXCommitMessage commitMessage = null;
public PeerTXStateStub(TXStateProxy stateProxy, DistributedMember target,
InternalDistributedMember onBehalfOfClient) {
super(stateProxy, target);
originatingMember = onBehalfOfClient;
* (non-Javadoc)
* @see org.apache.geode.internal.cache.TXStateInterface#rollback()
public void rollback() {
* txtodo: work this into client realm
ReliableReplyProcessor21 response = TXRemoteRollbackMessage.send(proxy.getCache(),
proxy.getTxId().getUniqId(), getOriginatingMember(), target);
if (internalAfterSendRollback != null) {;
try {
} catch (PrimaryBucketException pbe) {
// ignore this
} catch (ReplyException e) {
if (e.getCause() != null && e.getCause() instanceof CancelException) {
// other cache must have closed (bug #43649), so the transaction is lost
if (internalAfterSendRollback != null) {;
} else {
throw new TransactionException(
String.format("Rollback operation on node %s failed", target), e);
} catch (Exception e) {
throw new TransactionException(
String.format("Rollback operation on node %s failed", target), e);
} finally {
public void commit() throws CommitConflictException {
assert target != null;
* txtodo: Going to need to deal with client here
RemoteCommitResponse message = TXRemoteCommitMessage.send(proxy.getCache(),
proxy.getTxId().getUniqId(), getOriginatingMember(), target);
if (internalAfterSendCommit != null) {;
try {
commitMessage = message.waitForResponse();
} catch (CommitConflictException e) {
throw e;
} catch (TransactionException te) {
throw te;
} catch (ReliableReplyException e) {
if (e.getCause() != null) {
throw new TransactionInDoubtException(e.getCause());
} else {
throw new TransactionInDoubtException(e);
} catch (ReplyException e) {
if (e.getCause() instanceof CommitConflictException) {
throw (CommitConflictException) e.getCause();
} else if (e.getCause() instanceof TransactionException) {
throw (TransactionException) e.getCause();
* if(e.getCause()!=null) { throw new CommitConflictException(e.getCause()); } else { throw
* new CommitConflictException(e); }
if (e.getCause() != null) {
throw new TransactionInDoubtException(e.getCause());
} else {
throw new TransactionInDoubtException(e);
} catch (Exception e) {
Throwable eCause = e.getCause();
if (eCause != null) {
if (eCause instanceof ForceReattemptException) {
if (eCause.getCause() instanceof PrimaryBucketException) {
// data rebalanced
TransactionDataRebalancedException tdnce =
new TransactionDataRebalancedException(eCause.getCause().getMessage(),
throw tdnce;
} else {
// We cannot be sure that the member departed starting to process commit request,
// so throw a TransactionInDoubtException rather than a TransactionDataNodeHasDeparted.
// fixes 44939
TransactionInDoubtException tdnce =
new TransactionInDoubtException(e.getCause().getMessage(), eCause);
throw tdnce;
throw new TransactionInDoubtException(eCause);
} else {
throw new TransactionInDoubtException(e);
} finally {
protected void cleanup() {
for (TXRegionStub regionStub : regionStubs.values()) {
protected TXRegionStub generateRegionStub(InternalRegion region) {
TXRegionStub stub = null;
if (region.getPartitionAttributes() != null) {
// a partitioned region
stub = new PartitionedTXRegionStub(this, (PartitionedRegion) region);
} else if (region.getScope().isLocal()) {
// GEODE-3744 Local region should not be involved in a transaction on a PeerTXStateStub
throw new TransactionException(
"Local region " + region + " should not participate in a transaction not hosted locally");
} else if (region.isUsedForPartitionedRegionBucket()) {
stub = new BucketTXRegionStub(this, (BucketRegion) region);
} else {
// This is a dist region
stub = new DistributedTXRegionStub(this, (DistributedRegion) region);
return stub;
protected void validateRegionCanJoinTransaction(InternalRegion region)
throws TransactionException {
* Ok is this region legit to enter into tx?
if (region.hasServerProxy()) {
* This is a c/s region in a peer tx. nope!
throw new TransactionException("Can't involve c/s region in peer tx");
public void afterCompletion(int status) {
RemoteCommitResponse response = JtaAfterCompletionMessage.send(proxy.getCache(),
proxy.getTxId().getUniqId(), getOriginatingMember(), status, target);
try {
commitMessage = response.waitForResponse();
if (logger.isDebugEnabled()) {
logger.debug("afterCompletion received commit response of {}", commitMessage);
} catch (Exception e) {
throw new TransactionException(e);
// TODO throw a better exception
} finally {
public InternalDistributedMember getOriginatingMember() {
* This needs to be set to the clients member id if the client originated the tx
return originatingMember;
public void setOriginatingMember(InternalDistributedMember clientMemberId) {
* This TX is on behalf of a client, so we have to send the client's member id around
originatingMember = clientMemberId;
public boolean isMemberIdForwardingRequired() {
return getOriginatingMember() != null;
public TXCommitMessage getCommitMessage() {
return commitMessage;
public void suspend() {
// no special tasks to perform
public void resume() {
// no special tasks to perform
public void recordTXOperation(ServerRegionDataAccess region, ServerRegionOperation op, Object key,
Object[] arguments) {
// no-op here