blob: 4ad9bce4cca26f8a4d3d3f3707a586b44a1608de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.server.protocol.v1_0;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator>
{
private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
private ArrayList<Transfer> _incompleteMessage;
public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link)
{
super(session, link);
}
@Override
public void start()
{
setLinkCredit(UnsignedInteger.ONE);
setCreditWindow();
}
@Override
protected Error messageTransfer(Transfer xfr)
{
List<QpidByteBuffer> payload = new ArrayList<>();
final Binary deliveryTag = xfr.getDeliveryTag();
if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
{
_incompleteMessage = new ArrayList<Transfer>();
_incompleteMessage.add(xfr);
return null;
}
else if(_incompleteMessage != null)
{
_incompleteMessage.add(xfr);
if(Boolean.TRUE.equals(xfr.getMore()))
{
return null;
}
for(Transfer t : _incompleteMessage)
{
final List<QpidByteBuffer> bufs = t.getPayload();
if(bufs != null)
{
payload.addAll(bufs);
}
t.dispose();
}
_incompleteMessage=null;
}
else
{
payload.addAll(xfr.getPayload());
xfr.dispose();
}
// Only interested in the amqp-value section that holds the message to the coordinator
try
{
List<EncodingRetainingSection<?>> sections = getSectionDecoder().parseAll(payload);
boolean amqpValueSectionFound = false;
for(EncodingRetainingSection section : sections)
{
if(section instanceof AmqpValueSection)
{
if (amqpValueSectionFound)
{
throw new ConnectionScopedRuntimeException("Received more than one AmqpValue sections");
}
amqpValueSectionFound = true;
Object command = section.getValue();
Session_1_0 session = getSession();
if(command instanceof Declare)
{
final IdentifiedTransaction txn = session.getConnection().createLocalTransaction();
_createdTransactions.put(txn.getId(), txn.getServerTransaction());
Declared state = new Declared();
session.incrementStartedTransactions();
state.setTxnId(session.integerToBinary(txn.getId()));
updateDisposition(deliveryTag, state, true);
}
else if(command instanceof Discharge)
{
Discharge discharge = (Discharge) command;
final Error error = discharge(session.binaryToInteger(discharge.getTxnId()),
Boolean.TRUE.equals(discharge.getFail()));
updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
return error;
}
else
{
throw new ConnectionScopedRuntimeException(String.format("Received unknown command '%s'",
command.getClass().getSimpleName()));
}
}
}
if (!amqpValueSectionFound)
{
throw new ConnectionScopedRuntimeException("Received no AmqpValue section");
}
}
catch (AmqpErrorException e)
{
return e.getError();
}
finally
{
for(QpidByteBuffer buf : payload)
{
buf.dispose();
}
}
return null;
}
private Error discharge(Integer transactionId, boolean fail)
{
Error error = null;
ServerTransaction txn = _createdTransactions.get(transactionId);
if(txn != null)
{
if(fail)
{
txn.rollback();
getSession().incrementRolledBackTransactions();
}
else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
{
txn.commit();
getSession().incrementCommittedTransactions();
}
else
{
txn.rollback();
getSession().incrementRolledBackTransactions();
error = new Error();
error.setCondition(LinkError.DETACH_FORCED);
error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
}
_createdTransactions.remove(transactionId);
getSession().getConnection().removeTransaction(transactionId);
}
else
{
error = new Error();
error.setCondition(AmqpError.NOT_FOUND);
error.setDescription("Unknown transactionId" + transactionId);
}
return error;
}
@Override
protected void remoteDetachedPerformDetach(Detach detach)
{
// force rollback of open transactions
for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
{
entry.getValue().rollback();
getSession().incrementRolledBackTransactions();
getSession().getConnection().removeTransaction(entry.getKey());
}
detach();
}
@Override
protected void reattachLink(final Attach attach) throws AmqpErrorException
{
throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot reattach a Coordinator Link."));
}
@Override
protected void resumeLink(final Attach attach) throws AmqpErrorException
{
throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot resume a Coordinator Link."));
}
@Override
protected void establishLink(final Attach attach) throws AmqpErrorException
{
if (getSource() != null || getTarget() != null)
{
throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
}
Coordinator target = new Coordinator();
Source source = (Source) attach.getSource();
getLink().setTermini(source, target);
attachReceived(attach);
}
@Override
protected void recoverLink(final Attach attach) throws AmqpErrorException
{
throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot recover a Coordinator Link."));
}
@Override
protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
}
@Override
public void attachReceived(final Attach attach) throws AmqpErrorException
{
super.attachReceived(attach);
setDeliveryCount(attach.getInitialDeliveryCount());
}
@Override
public void initialiseUnsettled()
{
}
}