blob: 426b331382240a2f65e374a8f691de8e8e318c1e [file] [log] [blame]
/*
* Copyright 2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.kandula.coordinator.at;
import java.util.Iterator;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.kandula.Constants;
import org.apache.kandula.Status;
import org.apache.kandula.Status.CoordinatorStatus;
import org.apache.kandula.context.AbstractContext;
import org.apache.kandula.context.impl.ATActivityContext;
import org.apache.kandula.coordinator.Registerable;
import org.apache.kandula.faults.AbstractKandulaException;
import org.apache.kandula.faults.InvalidStateException;
import org.apache.kandula.participant.Vote;
import org.apache.kandula.storage.StorageFactory;
import org.apache.kandula.storage.Store;
import org.apache.kandula.wsat.completion.CompletionInitiatorPortTypeRawXMLStub;
import org.apache.kandula.wsat.twopc.ParticipantPortTypeRawXMLStub;
/**
* @author <a href="mailto:thilina@opensource.lk"> Thilina Gunarathne </a>
*/
public class ATCoordinator implements Registerable {
private Store store;
public ATCoordinator() {
StorageFactory storageFactory = StorageFactory.getInstance();
store = storageFactory.getStore();
}
public EndpointReference register(AbstractContext context, String protocol,
EndpointReference participantEPR) throws AbstractKandulaException {
ATActivityContext atContext = (ATActivityContext) context;
atContext.lock();
switch (atContext.getStatus()) {
case CoordinatorStatus.STATUS_PREPARING_DURABLE:
atContext.unlock();
try {
this.abortActivity(atContext);
} catch (Exception e) {
throw new InvalidStateException(e);
}
throw new InvalidStateException(
"Coordinator is in preparing state - Durable ");
case CoordinatorStatus.STATUS_PREPARED_SUCCESS:
atContext.unlock();
throw new InvalidStateException(
"Coordinator is in prepared success state");
case CoordinatorStatus.STATUS_COMMITTING:
atContext.unlock();
throw new InvalidStateException(
"Coordinator is in committing state");
case CoordinatorStatus.STATUS_ABORTING:
atContext.unlock();
throw new InvalidStateException("Coordinator is in Aborting state");
case CoordinatorStatus.STATUS_ACTIVE:
case CoordinatorStatus.STATUS_PREPARING_VOLATILE:
atContext.unlock();
return atContext.addParticipant(participantEPR, protocol);
case CoordinatorStatus.STATUS_NONE:
default:
atContext.unlock();
throw new InvalidStateException();
}
}
/**
* should send be a notification This wraps the Commit operation defined in
* Ws-AtomicTransaction specification.
*
* @throws Exception
*/
public void commitOperation(String id) throws AbstractKandulaException {
CompletionInitiatorPortTypeRawXMLStub stub;
ATActivityContext atContext = (ATActivityContext) store.get(id);
if (atContext == null) {
throw new IllegalStateException(
"No Activity Found for this Activity ID");
}
/*
* Check for states TODO Do we actually need to lock the activity
*/
atContext.lock();
switch (atContext.getStatus()) {
case CoordinatorStatus.STATUS_NONE:
case CoordinatorStatus.STATUS_ABORTING:
atContext.unlock();
stub = new CompletionInitiatorPortTypeRawXMLStub(atContext
.getCompletionParticipant());
stub.abortedOperation();
break;
case CoordinatorStatus.STATUS_PREPARING_DURABLE:
case CoordinatorStatus.STATUS_PREPARING_VOLATILE:
case CoordinatorStatus.STATUS_PREPARED_SUCCESS:
//If prepared success Ignore this message
atContext.unlock();
break;
case CoordinatorStatus.STATUS_COMMITTING:
atContext.unlock();
stub = new CompletionInitiatorPortTypeRawXMLStub(atContext
.getCompletionParticipant());
stub.committedOperation();
break;
case Status.CoordinatorStatus.STATUS_ACTIVE:
atContext.setStatus(Status.CoordinatorStatus.STATUS_PREPARING);
atContext.unlock();
volatilePrepare(atContext);
// wait till all the Volatile prepare()'s are done
while (atContext.hasMorePreparing()) {
if (atContext.getStatus() == Status.CoordinatorStatus.STATUS_ABORTING) {
return;
}
}
durablePrepare(atContext);
//wait till all the Durable prepare()'s are done
while (atContext.hasMorePreparing()) {
if (atContext.getStatus() == Status.CoordinatorStatus.STATUS_ABORTING) {
return;
}
}
if (!(atContext.getStatus() == Status.CoordinatorStatus.STATUS_ABORTING)) {
commitActivity(atContext);
}
break;
default:
atContext.unlock();
break;
}
}
public void rollbackOperation(String id) throws Exception {
CompletionInitiatorPortTypeRawXMLStub stub;
ATActivityContext atContext = (ATActivityContext) store.get(id);
// if store throws a Exception capture it
if (atContext == null) {
throw new IllegalStateException(
"No Activity Found for this Activity ID");
}
/*
* Check for states TODO Do we need to lock the activity
*/
atContext.lock();
switch (atContext.getStatus()) {
case CoordinatorStatus.STATUS_NONE:
case CoordinatorStatus.STATUS_ABORTING:
atContext.unlock();
stub = new CompletionInitiatorPortTypeRawXMLStub(atContext
.getCompletionParticipant());
stub.abortedOperation();
break;
case CoordinatorStatus.STATUS_PREPARING_DURABLE:
case CoordinatorStatus.STATUS_PREPARING_VOLATILE:
case CoordinatorStatus.STATUS_PREPARED_SUCCESS:
//If prepared success Ignoring
atContext.unlock();
break;
case CoordinatorStatus.STATUS_COMMITTING:
atContext.unlock();
stub = new CompletionInitiatorPortTypeRawXMLStub(atContext
.getCompletionParticipant());
stub.committedOperation();
break;
case Status.CoordinatorStatus.STATUS_ACTIVE:
atContext.setStatus(Status.CoordinatorStatus.STATUS_ABORTING);
atContext.unlock();
abortActivity(atContext);
break;
default:
atContext.unlock();
break;
}
}
/**
* @param context
* @throws Exception
* @see This methode issues the oneway prepare() message. Does not wait till
* partipants responds. Used in 2PC after user commits as well as in
* subordinate scenerio, when parent issues volatile prepare(). One can
* check if there are any more participants to be responded by checking
* the hasMorePreparing() methode of the context.
*/
public void volatilePrepare(AbstractContext context)
throws AbstractKandulaException {
ParticipantPortTypeRawXMLStub stub = new ParticipantPortTypeRawXMLStub();
ATActivityContext atContext = (ATActivityContext) context;
Iterator volatilePartipantIterator = atContext
.getRegistered2PCParticipants(Constants.WS_AT_VOLATILE2PC);
if (volatilePartipantIterator.hasNext()) {
atContext.lock();
atContext
.setStatus(Status.CoordinatorStatus.STATUS_PREPARING_VOLATILE);
atContext.unlock();
while (volatilePartipantIterator.hasNext()) {
atContext.countPreparing();
stub.prepareOperation(((ATParticipantInformation) volatilePartipantIterator
.next()).getEpr());
}
}
}
public void countVote(String activityID, Vote vote, String enlistmentID)
throws AbstractKandulaException {
ATActivityContext context = (ATActivityContext) store.get(activityID);
ATParticipantInformation participant = context.getParticipant(enlistmentID);
if (Vote.PREPARED.equals(vote)) {
participant.setStatus(Status.CoordinatorStatus.STATUS_PREPARED);
} else if (Vote.READ_ONLY.equals(vote)) {
participant.setStatus(Status.CoordinatorStatus.STATUS_READ_ONLY);
} else if (Vote.ABORT.equals(vote)) {
participant.setStatus(Status.ParticipantStatus.STATUS_ABORTED);
abortActivity(context);
}
context.decrementPreparing();
}
/**
* @param context
* @throws Exception
* @see This methode issues the oneway prepare() message. Does not wait till
* partipants responds. Used in 2PC after user commits as well as in
* subordinate scenerio, when parent issues Durable prepare(). One can
* check if there are any more participants to be responded by checking
* the hasMorePreparing() methode of the context.
*/
public void durablePrepare(AbstractContext context)
throws AbstractKandulaException {
ParticipantPortTypeRawXMLStub stub = new ParticipantPortTypeRawXMLStub();
ATActivityContext atContext = (ATActivityContext) context;
Iterator durablePartipantIterator = atContext
.getRegistered2PCParticipants(Constants.WS_AT_DURABLE2PC);
if (durablePartipantIterator.hasNext()) {
atContext.lock();
atContext
.setStatus(Status.CoordinatorStatus.STATUS_PREPARING_DURABLE);
atContext.unlock();
while (durablePartipantIterator.hasNext()) {
atContext.countPreparing();
stub.prepareOperation(((ATParticipantInformation) durablePartipantIterator
.next()).getEpr());
}
}
}
/**
* @param context
* @throws Exception
* @see This will send the commit() messages to all the participants
* registered for the Transaction Must check whether all the
* participants have replied to the prepare()
*/
private void commitActivity(AbstractContext context)
throws AbstractKandulaException {
// check whether all participants have prepared
ParticipantPortTypeRawXMLStub stub = new ParticipantPortTypeRawXMLStub();
ATActivityContext atContext = (ATActivityContext) context;
atContext.lock();
atContext.setStatus(Status.CoordinatorStatus.STATUS_COMMITTING);
atContext.unlock();
Iterator participants = atContext.getAll2PCParticipants();
while (participants.hasNext()) {
ATParticipantInformation participant = (ATParticipantInformation) participants.next();
if (!(Status.CoordinatorStatus.STATUS_READ_ONLY == participant
.getStatus())) {
stub.commitOperation(participant.getEpr());
}
}
CompletionInitiatorPortTypeRawXMLStub completionStub = new CompletionInitiatorPortTypeRawXMLStub(
atContext.getCompletionParticipant());
completionStub.committedOperation();
}
/**
* @param context
* @throws Exception
* @see This will send the rollback() messages to all the participants
* registered for the Transaction Do not have to check whether all the
* participants have replied to the prepare()
*/
private void abortActivity(AbstractContext context)
throws AbstractKandulaException {
ParticipantPortTypeRawXMLStub stub = new ParticipantPortTypeRawXMLStub();
ATActivityContext atContext = (ATActivityContext) context;
atContext.lock();
atContext.setStatus(Status.CoordinatorStatus.STATUS_ABORTING);
atContext.unlock();
Iterator participants = atContext.getAll2PCParticipants();
while (participants.hasNext()) {
stub
.rollbackOperation(((ATParticipantInformation) participants.next())
.getEpr());
}
CompletionInitiatorPortTypeRawXMLStub completionStub = new CompletionInitiatorPortTypeRawXMLStub(
atContext.getCompletionParticipant());
completionStub.abortedOperation();
}
}