| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.protocol.v1_0; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.server.model.NamedAddressSpace; |
| import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils; |
| import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; |
| import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; |
| import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; |
| import org.apache.qpid.server.protocol.v1_0.type.Binary; |
| import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; |
| import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Error; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; |
| import org.apache.qpid.server.txn.LocalTransaction; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| |
| public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0 |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(TxnCoordinatorReceivingLink_1_0.class); |
| private NamedAddressSpace _namedAddressSpace; |
| private ReceivingLinkEndpoint _endpoint; |
| |
| private ArrayList<Transfer> _incompleteMessage; |
| private SectionDecoder _sectionDecoder; |
| private LinkedHashMap<Integer, ServerTransaction> _openTransactions; |
| private Session_1_0 _session; |
| |
| |
| public TxnCoordinatorReceivingLink_1_0(NamedAddressSpace namedAddressSpace, |
| Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint, |
| LinkedHashMap<Integer, ServerTransaction> openTransactions) |
| { |
| _namedAddressSpace = namedAddressSpace; |
| _session = session_1_0; |
| _endpoint = endpoint; |
| _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry()); |
| _openTransactions = openTransactions; |
| } |
| |
| public Error messageTransfer(Transfer xfr) |
| { |
| List<QpidByteBuffer> payload = new ArrayList<>(); |
| |
| final Binary deliveryTag = xfr.getDeliveryTag(); |
| |
| if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null) |
| { |
| _incompleteMessage = new ArrayList<Transfer>(); |
| _incompleteMessage.add(xfr); |
| return null; |
| } |
| else if(_incompleteMessage != null) |
| { |
| _incompleteMessage.add(xfr); |
| if(Boolean.TRUE.equals(xfr.getMore())) |
| { |
| return null; |
| } |
| |
| int size = 0; |
| for(Transfer t : _incompleteMessage) |
| { |
| final List<QpidByteBuffer> bufs = t.getPayload(); |
| if(bufs != null) |
| { |
| size += QpidByteBufferUtils.remaining(bufs); |
| payload.addAll(bufs); |
| } |
| t.dispose(); |
| } |
| _incompleteMessage=null; |
| |
| } |
| else |
| { |
| payload.addAll(xfr.getPayload()); |
| xfr.dispose(); |
| } |
| |
| // Only interested in the amqp-value section that holds the message to the coordinator |
| try |
| { |
| List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(payload); |
| for(EncodingRetainingSection section : sections) |
| { |
| if(section instanceof AmqpValueSection) |
| { |
| Object command = section.getValue(); |
| |
| |
| if(command instanceof Declare) |
| { |
| Integer txnId = Integer.valueOf(0); |
| Iterator<Integer> existingTxn = _openTransactions.keySet().iterator(); |
| while(existingTxn.hasNext()) |
| { |
| txnId = existingTxn.next(); |
| } |
| txnId = Integer.valueOf(txnId.intValue() + 1); |
| |
| _openTransactions.put(txnId, new LocalTransaction(_namedAddressSpace.getMessageStore())); |
| |
| Declared state = new Declared(); |
| |
| _session.incrementStartedTransactions(); |
| |
| state.setTxnId(_session.integerToBinary(txnId)); |
| _endpoint.updateDisposition(deliveryTag, state, true); |
| |
| } |
| else if(command instanceof Discharge) |
| { |
| Discharge discharge = (Discharge) command; |
| |
| final Error error = discharge(_session.binaryToInteger(discharge.getTxnId()), |
| Boolean.TRUE.equals(discharge.getFail())); |
| _endpoint.updateDisposition(deliveryTag, error == null ? new Accepted() : null, true); |
| return error; |
| } |
| else |
| { |
| // TODO error handling |
| |
| // also should panic if we receive more than one AmqpValue, or no AmqpValue section |
| } |
| } |
| } |
| |
| } |
| catch (AmqpErrorException e) |
| { |
| return e.getError(); |
| } |
| finally |
| { |
| for(QpidByteBuffer buf : payload) |
| { |
| buf.dispose(); |
| } |
| } |
| return null; |
| } |
| |
| public void remoteDetached(LinkEndpoint endpoint, Detach detach) |
| { |
| endpoint.detach(); |
| } |
| |
| @Override |
| public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled) |
| { |
| |
| } |
| |
| private Error discharge(Integer transactionId, boolean fail) |
| { |
| Error error = null; |
| ServerTransaction txn = _openTransactions.get(transactionId); |
| if(txn != null) |
| { |
| if(fail) |
| { |
| txn.rollback(); |
| _session.incrementRolledBackTransactions(); |
| } |
| else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly())) |
| { |
| txn.commit(); |
| _session.incrementCommittedTransactions(); |
| } |
| else |
| { |
| txn.rollback(); |
| _session.incrementRolledBackTransactions(); |
| error = new Error(); |
| error.setCondition(LinkError.DETACH_FORCED); |
| error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)"); |
| _openTransactions.remove(transactionId); |
| |
| return error; |
| } |
| _openTransactions.remove(transactionId); |
| } |
| else |
| { |
| error = new Error(); |
| error.setCondition(AmqpError.NOT_FOUND); |
| error.setDescription("Unknown transactionId" + transactionId); |
| } |
| return error; |
| } |
| |
| |
| |
| public void start() |
| { |
| _endpoint.setLinkCredit(UnsignedInteger.ONE); |
| _endpoint.setCreditWindow(); |
| } |
| } |