blob: d05824db38eebf6c31493c5db7a785f25f0aa937 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AMQP Producer object that is used to manage JMS MessageProducer semantics.
*
* This Producer is fixed to a given JmsDestination and can only produce messages to it.
*/
public class AmqpFixedProducer extends AmqpProducer {
private static final Logger LOG = LoggerFactory.getLogger(AmqpFixedProducer.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>();
private byte[] encodeBuffer = new byte[1024 * 8];
private boolean presettle = false;
public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
super(session, info);
}
@Override
public void close(AsyncResult request) {
// If any sends are held we need to wait for them to complete.
if (!pendingSends.isEmpty()) {
this.closeRequest = request;
return;
}
super.close(request);
}
@Override
public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
// TODO - Handle the case where remote has no credit which means we can't send to it.
// We need to hold the send until remote credit becomes available but we should
// also have a send timeout option and filter timed out sends.
if (getEndpoint().getCredit() <= 0) {
LOG.trace("Holding Message send until credit is available.");
// Once a message goes into a held mode we no longer can send it async, so
// we clear the async flag if set to avoid the sender never getting notified.
envelope.setSendAsync(false);
this.pendingSends.addLast(new PendingSend(envelope, request));
return false;
} else {
doSend(envelope, request);
return true;
}
}
private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
JmsMessageFacade facade = envelope.getMessage().getFacade();
LOG.trace("Producer sending message: {}", envelope);
byte[] tag = tagGenerator.getNextTag();
Delivery delivery = null;
if (presettle) {
delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
delivery = getEndpoint().delivery(tag, 0, tag.length);
}
delivery.setContext(request);
if (session.isTransacted()) {
Binary amqpTxId = session.getTransactionContext().getAmqpTransactionId();
TransactionalState state = new TransactionalState();
state.setTxnId(amqpTxId);
delivery.disposition(state);
}
AmqpJmsMessageFacade amqpMessageFacade = (AmqpJmsMessageFacade) facade;
encodeAndSend(amqpMessageFacade.getAmqpMessage(), delivery);
if (presettle) {
delivery.settle();
} else {
pending.add(delivery);
getEndpoint().advance();
}
if (envelope.isSendAsync() || presettle) {
request.onSuccess();
}
}
private void encodeAndSend(Message message, Delivery delivery) throws IOException {
int encodedSize;
while (true) {
try {
encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
break;
} catch (java.nio.BufferOverflowException e) {
encodeBuffer = new byte[encodeBuffer.length * 2];
}
}
int sentSoFar = 0;
while (true) {
int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
if (sent > 0) {
sentSoFar += sent;
if ((encodedSize - sentSoFar) == 0) {
break;
}
} else {
LOG.warn("{} failed to send any data from current Message.", this);
}
}
}
@Override
public void processFlowUpdates() throws IOException {
if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) {
while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) {
LOG.trace("Dispatching previously held send");
PendingSend held = pendingSends.pop();
try {
doSend(held.envelope, held.request);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
}
}
// Once the pending sends queue is drained we can propagate the close request.
if (pendingSends.isEmpty() && isAwaitingClose()) {
super.close(closeRequest);
}
}
@Override
public void processDeliveryUpdates() {
List<Delivery> toRemove = new ArrayList<Delivery>();
for (Delivery delivery : pending) {
DeliveryState state = delivery.getRemoteState();
if (state == null) {
continue;
}
Outcome outcome = null;
if (state instanceof TransactionalState) {
LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
outcome = ((TransactionalState) state).getOutcome();
} else if (state instanceof Outcome) {
outcome = (Outcome) state;
} else {
LOG.warn("Message send updated with unsupported state: {}", state);
continue;
}
AsyncResult request = (AsyncResult) delivery.getContext();
if (outcome instanceof Accepted) {
toRemove.add(delivery);
LOG.trace("Outcome of delivery was accepted: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
request.onSuccess();
}
} else if (outcome instanceof Rejected) {
Exception remoteError = getRemoteError();
toRemove.add(delivery);
LOG.trace("Outcome of delivery was rejected: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
request.onFailure(remoteError);
} else {
connection.getProvider().fireProviderException(remoteError);
}
} else {
LOG.warn("Message send updated with unsupported outcome: {}", outcome);
}
}
pending.removeAll(toRemove);
}
@Override
protected void doOpen() {
String targetAddress = null;
if (resource.getDestination() != null) {
JmsDestination destination = resource.getDestination();
targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
}
Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
String sourceAddress = getProducerId().toString();
Source source = new Source();
source.setAddress(sourceAddress);
source.setOutcomes(outcomes);
//TODO: default outcome. Accepted normally, Rejected for transaction controller?
Target target = new Target();
target.setAddress(targetAddress);
String senderName = sourceAddress + ":" + targetAddress;
Sender sender = session.getProtonSession().sender(senderName);
sender.setSource(source);
sender.setTarget(target);
if (presettle) {
sender.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
setEndpoint(sender);
super.doOpen();
}
@Override
protected void doOpenCompletion() {
// Verify the attach response contained a non-null target
org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
if (t == null) {
// No link terminus was created, the peer should now detach us. Producer creation has failed.
failed(new RuntimeException("link was refused")); //TODO: proper exception.
} else {
super.doOpenCompletion();
}
}
public AmqpSession getSession() {
return this.session;
}
public Sender getProtonSender() {
return this.getEndpoint();
}
@Override
public boolean isAnonymous() {
return this.resource.getDestination() == null;
}
@Override
public void setPresettle(boolean presettle) {
this.presettle = presettle;
}
@Override
public boolean isPresettle() {
return this.presettle;
}
@Override
public String toString() {
return "AmqpFixedProducer { " + getProducerId() + " }";
}
private static class PendingSend {
public JmsOutboundMessageDispatch envelope;
public AsyncResult request;
public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
this.envelope = envelope;
this.request = request;
}
}
}