| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.protonj2.client.impl; |
| |
| import java.util.ArrayDeque; |
| import java.util.Deque; |
| import java.util.Map; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.qpid.protonj2.buffer.ProtonBuffer; |
| import org.apache.qpid.protonj2.client.AdvancedMessage; |
| import org.apache.qpid.protonj2.client.Message; |
| import org.apache.qpid.protonj2.client.StreamSender; |
| import org.apache.qpid.protonj2.client.StreamSenderOptions; |
| import org.apache.qpid.protonj2.client.StreamTracker; |
| import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException; |
| import org.apache.qpid.protonj2.client.futures.ClientFuture; |
| import org.apache.qpid.protonj2.client.futures.ClientSynchronization; |
| import org.apache.qpid.protonj2.engine.OutgoingDelivery; |
| import org.apache.qpid.protonj2.engine.util.StringUtils; |
| import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations; |
| import org.apache.qpid.protonj2.types.transport.DeliveryState; |
| import org.apache.qpid.protonj2.types.transport.SenderSettleMode; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Client implementation of a {@link StreamSender}. |
| */ |
| public final class ClientStreamSender extends ClientSenderLinkType<StreamSender> implements StreamSender { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ClientStreamSender.class); |
| |
| private final StreamSenderOptions options; |
| private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>(); |
| |
| ClientStreamSender(ClientSession session, StreamSenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) { |
| super(session, senderId, options, protonSender); |
| |
| this.options = new StreamSenderOptions(options); |
| } |
| |
| @Override |
| public StreamTracker send(Message<?> message) throws ClientException { |
| checkClosedOrFailed(); |
| return sendMessage(ClientMessageSupport.convertMessage(message), null, true); |
| } |
| |
| @Override |
| public StreamTracker send(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException { |
| checkClosedOrFailed(); |
| return sendMessage(ClientMessageSupport.convertMessage(message), null, true); |
| } |
| |
| @Override |
| public StreamTracker trySend(Message<?> message) throws ClientException { |
| checkClosedOrFailed(); |
| return sendMessage(ClientMessageSupport.convertMessage(message), null, false); |
| } |
| |
| @Override |
| public StreamTracker trySend(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException { |
| checkClosedOrFailed(); |
| return sendMessage(ClientMessageSupport.convertMessage(message), null, false); |
| } |
| |
| @Override |
| public ClientStreamSenderMessage beginMessage() throws ClientException { |
| return beginMessage(null); |
| } |
| |
| @Override |
| public ClientStreamSenderMessage beginMessage(Map<String, Object> deliveryAnnotations) throws ClientException { |
| checkClosedOrFailed(); |
| final ClientFuture<ClientStreamSenderMessage> request = session.getFutureFactory().createFuture(); |
| final DeliveryAnnotations annotations; |
| |
| if (deliveryAnnotations != null) { |
| annotations = new DeliveryAnnotations(StringUtils.toSymbolKeyedMap(deliveryAnnotations)); |
| } else { |
| annotations = null; |
| } |
| |
| executor.execute(() -> { |
| if (protonSender.current() != null) { |
| request.failed(new ClientIllegalStateException( |
| "Cannot initiate a new streaming send until the previous one is complete")); |
| } else { |
| // Grab the next delivery and hold for stream writes, no other sends |
| // can occur while we hold the delivery. |
| final OutgoingDelivery streamDelivery = protonSender.next(); |
| final ClientStreamTracker streamTracker = createTracker(streamDelivery); |
| |
| streamDelivery.setLinkedResource(streamTracker); |
| |
| request.complete(new ClientStreamSenderMessage(this, streamTracker, annotations)); |
| } |
| }); |
| |
| return session.request(this, request); |
| } |
| |
| //----- Internal API |
| |
| StreamSenderOptions options() { |
| return this.options; |
| } |
| |
| @Override |
| protected StreamSender self() { |
| return this; |
| } |
| |
| private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) { |
| if (options.sendTimeout() > 0 && send.sendTimeout() == null) { |
| send.sendTimeout(executor.schedule(() -> { |
| send.failed(send.createSendTimedOutException()); |
| }, options.sendTimeout(), TimeUnit.MILLISECONDS)); |
| } |
| |
| blocked.addLast(send); |
| } |
| |
| private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) { |
| if (options.sendTimeout() > 0 && send.sendTimeout() == null) { |
| send.sendTimeout(executor.schedule(() -> { |
| send.failed(send.createSendTimedOutException()); |
| }, options.sendTimeout(), TimeUnit.MILLISECONDS)); |
| } |
| |
| blocked.addFirst(send); |
| } |
| |
| private StreamTracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException { |
| final ClientFuture<StreamTracker> operation = session.getFutureFactory().createFuture(); |
| final ProtonBuffer buffer = message.encode(deliveryAnnotations); |
| |
| executor.execute(() -> { |
| if (notClosedOrFailed(operation)) { |
| try { |
| final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, null, message.messageFormat(), buffer, true, operation); |
| |
| if (protonSender.isSendable() && protonSender.current() == null) { |
| session.getTransactionContext().send(envelope, null, protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED); |
| } else if (waitForCredit) { |
| addToTailOfBlockedQueue(envelope); |
| } else { |
| operation.complete(null); |
| } |
| } catch (Exception error) { |
| operation.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| } |
| }); |
| |
| return session.request(this, operation); |
| } |
| |
| StreamTracker sendMessage(ClientStreamSenderMessage context, ProtonBuffer payload, int messageFormat) throws ClientException { |
| checkClosedOrFailed(); |
| |
| final ClientFuture<StreamTracker> operation = session.getFutureFactory().createFuture(); |
| final ProtonBuffer buffer = payload; |
| final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope( |
| this, context.getProtonDelivery(), messageFormat, buffer, context.completed(), operation); |
| |
| executor.execute(() -> { |
| if (notClosedOrFailed(operation, context.getProtonDelivery().getLink())) { |
| try { |
| if (protonSender.isSendable()) { |
| session.getTransactionContext().send(envelope, null, isSendingSettled()); |
| } else { |
| addToHeadOfBlockedQueue(envelope); |
| } |
| } catch (Exception error) { |
| operation.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| } |
| }); |
| |
| return session.request(this, operation); |
| } |
| |
| private ClientStreamTracker createTracker(OutgoingDelivery delivery) { |
| return new ClientStreamTracker(this, delivery); |
| } |
| |
| private ClientNoOpStreamTracker createNoOpTracker() { |
| return new ClientNoOpStreamTracker(this); |
| } |
| |
| @Override |
| void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException { |
| checkClosedOrFailed(); |
| executor.execute(() -> { |
| delivery.disposition(state, settled); |
| }); |
| } |
| |
| void abort(OutgoingDelivery delivery, ClientStreamTracker tracker) throws ClientException { |
| checkClosedOrFailed(); |
| ClientFuture<StreamTracker> request = session().getFutureFactory().createFuture(new ClientSynchronization<StreamTracker>() { |
| |
| @Override |
| public void onPendingSuccess(StreamTracker result) { |
| handleCreditStateUpdated(protonLink()); |
| } |
| |
| @Override |
| public void onPendingFailure(Throwable cause) { |
| handleCreditStateUpdated(protonLink()); |
| } |
| }); |
| |
| executor.execute(() -> { |
| if (delivery.getTransferCount() == 0) { |
| delivery.abort(); |
| request.complete(tracker); |
| } else { |
| ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, delivery, delivery.getMessageFormat(), null, false, request).abort(); |
| try { |
| if (protonSender.isSendable() && (protonSender.current() == null || protonSender.current() == delivery)) { |
| envelope.send(delivery.getState(), delivery.isSettled()); |
| } else { |
| if (protonSender.current() == delivery) { |
| addToHeadOfBlockedQueue(envelope); |
| } else { |
| addToTailOfBlockedQueue(envelope); |
| } |
| } |
| } catch (Exception error) { |
| request.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| } |
| }); |
| |
| session.request(this, request); |
| } |
| |
| void complete(OutgoingDelivery delivery, ClientStreamTracker tracker) throws ClientException { |
| checkClosedOrFailed(); |
| ClientFuture<StreamTracker> request = session().getFutureFactory().createFuture(new ClientSynchronization<StreamTracker>() { |
| |
| @Override |
| public void onPendingSuccess(StreamTracker result) { |
| handleCreditStateUpdated(protonLink()); |
| } |
| |
| @Override |
| public void onPendingFailure(Throwable cause) { |
| handleCreditStateUpdated(protonLink()); |
| } |
| }); |
| |
| executor.execute(() -> { |
| ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, delivery, delivery.getMessageFormat(), null, true, request); |
| try { |
| if (protonSender.isSendable() && (protonSender.current() == null || protonSender.current() == delivery)) { |
| envelope.send(delivery.getState(), delivery.isSettled()); |
| } else { |
| if (protonSender.current() == delivery) { |
| addToHeadOfBlockedQueue(envelope); |
| } else { |
| addToTailOfBlockedQueue(envelope); |
| } |
| } |
| } catch (Exception error) { |
| request.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| session.request(this, request); |
| } |
| |
| //----- Handlers for proton receiver events |
| |
| private void handleCreditStateUpdated(org.apache.qpid.protonj2.engine.Sender sender) { |
| if (!blocked.isEmpty()) { |
| while (sender.isSendable() && !blocked.isEmpty()) { |
| ClientOutgoingEnvelope held = blocked.peek(); |
| if (held.delivery() == protonSender.current()) { |
| LOG.trace("Dispatching previously held send"); |
| try { |
| // We don't currently allow a sender to define any outcome so we pass null for |
| // now, however a transaction context will apply its TransactionalState outcome |
| // and would wrap anything we passed in the future. |
| session.getTransactionContext().send(held, null, isSendingSettled()); |
| } catch (Exception error) { |
| held.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } finally { |
| blocked.poll(); |
| } |
| } else { |
| break; |
| } |
| } |
| } |
| |
| if (sender.isDraining() && sender.current() == null && blocked.isEmpty()) { |
| sender.drained(); |
| } |
| } |
| |
| @Override |
| protected void linkSpecificLocalOpenHandler() { |
| protonSender.creditStateUpdateHandler(this::handleCreditStateUpdated); |
| } |
| |
| @Override |
| protected void recreateLinkForReconnect() { |
| protonSender.localCloseHandler(null); |
| protonSender.localDetachHandler(null); |
| protonSender.close(); |
| if (protonSender.hasUnsettled()) { |
| failPendingUnsettledAndBlockedSends( |
| new ClientConnectionRemotelyClosedException("Connection failed and send result is unknown")); |
| } |
| protonSender = ClientSenderBuilder.recreateSender(session, protonSender, options); |
| protonSender.setLinkedResource(this); |
| } |
| |
| @Override |
| protected void linkSpecificCleanupHandler(ClientException failureCause) { |
| // If the parent of this sender is a stream session than this sender owns it |
| // and must close it when it closes itself to ensure that the resources are |
| // cleaned up on the remote for the session. |
| if (session instanceof ClientStreamSession) { |
| session.closeAsync(); |
| } |
| |
| if (failureCause != null) { |
| failPendingUnsettledAndBlockedSends(failureCause); |
| } else { |
| failPendingUnsettledAndBlockedSends(new ClientResourceRemotelyClosedException("The sender link has closed")); |
| } |
| } |
| |
| private void failPendingUnsettledAndBlockedSends(ClientException cause) { |
| // Cancel all settlement futures for in-flight sends passing an appropriate error to the future |
| protonSender.unsettled().forEach((delivery) -> { |
| try { |
| final ClientTracker tracker = delivery.getLinkedResource(); |
| tracker.settlementFuture().failed(cause); |
| } catch (Exception e) { |
| } |
| }); |
| |
| // Cancel all blocked sends passing an appropriate error to the future |
| blocked.removeIf((held) -> { |
| held.failed(cause); |
| return true; |
| }); |
| } |
| |
| @Override |
| protected void linkSpecificLocalCloseHandler() { |
| // Nothing needed for sender handling |
| } |
| |
| @Override |
| protected void linkSpecificRemoteOpenHandler() { |
| // Nothing needed for sender handling |
| } |
| |
| @Override |
| protected void linkSpecificRemoteCloseHandler() { |
| // Nothing needed for sender handling |
| } |
| |
| //----- Internal envelope for deliveries to track potential partial sends etc. |
| |
| public static final class ClientOutgoingEnvelope implements ClientTransactionContext.Sendable { |
| |
| private final ProtonBuffer payload; |
| private final ClientFuture<StreamTracker> request; |
| private final ClientStreamSender sender; |
| private final boolean complete; |
| private final int messageFormat; |
| |
| private boolean aborted; |
| private ScheduledFuture<?> sendTimeout; |
| private OutgoingDelivery delivery; |
| |
| /** |
| * Create a new In-flight Send instance that is a continuation on an existing delivery. |
| * |
| * @param sender |
| * The {@link ClientSender} instance that is attempting to send this encoded message. |
| * @param messageFormat |
| * The message format code to assign the send if this is the first delivery. |
| * @param delivery |
| * The {@link OutgoingDelivery} context this envelope will be added to. |
| * @param payload |
| * The payload that comprises this portion of the send. |
| * @param complete |
| * Indicates if the encoded payload represents the complete transfer or if more is coming. |
| * @param request |
| * The requesting operation that initiated this send. |
| */ |
| public ClientOutgoingEnvelope(ClientStreamSender sender, OutgoingDelivery delivery, int messageFormat, ProtonBuffer payload, boolean complete, ClientFuture<StreamTracker> request) { |
| this.payload = payload; |
| this.request = request; |
| this.sender = sender; |
| this.complete = complete; |
| this.messageFormat = messageFormat; |
| this.delivery = delivery; |
| } |
| |
| /** |
| * @return the {@link ScheduledFuture} used to determine when the send should fail if no credit available to write. |
| */ |
| public ScheduledFuture<?> sendTimeout() { |
| return sendTimeout; |
| } |
| |
| /** |
| * Sets the {@link ScheduledFuture} which should be used when a send cannot be immediately performed. |
| * |
| * @param sendTimeout |
| * The {@link ScheduledFuture} that will fail the send if not cancelled once it has been performed. |
| */ |
| public void sendTimeout(ScheduledFuture<?> sendTimeout) { |
| this.sendTimeout = sendTimeout; |
| } |
| |
| public ProtonBuffer payload() { |
| return payload; |
| } |
| |
| public OutgoingDelivery delivery() { |
| return delivery; |
| } |
| |
| public ClientOutgoingEnvelope abort() { |
| this.aborted = true; |
| return this; |
| } |
| |
| public boolean aborted() { |
| return aborted; |
| } |
| |
| @Override |
| public void discard() { |
| if (sendTimeout != null) { |
| sendTimeout.cancel(true); |
| sendTimeout = null; |
| } |
| |
| if (delivery != null) { |
| ClientTracker tracker = delivery.getLinkedResource(); |
| if (tracker != null) { |
| tracker.settlementFuture().complete(tracker); |
| } |
| request.complete(delivery.getLinkedResource()); |
| } else { |
| request.complete(sender.createNoOpTracker()); |
| } |
| } |
| |
| public ClientOutgoingEnvelope succeeded() { |
| if (sendTimeout != null) { |
| sendTimeout.cancel(true); |
| } |
| |
| request.complete(delivery.getLinkedResource()); |
| |
| return this; |
| } |
| |
| public ClientOutgoingEnvelope failed(ClientException exception) { |
| if (sendTimeout != null) { |
| sendTimeout.cancel(true); |
| } |
| |
| request.failed(exception); |
| |
| return this; |
| } |
| |
| @Override |
| public void send(DeliveryState state, boolean settled) { |
| if (delivery == null) { |
| delivery = sender.protonLink().next(); |
| delivery.setLinkedResource(sender.createTracker(delivery)); |
| } |
| |
| if (delivery.getTransferCount() == 0) { |
| delivery.setMessageFormat(messageFormat); |
| delivery.disposition(state, settled); |
| } |
| |
| // We must check if the delivery was fully written and then complete the send operation otherwise |
| // if the session capacity limited the amount of payload data we need to hold the completion until |
| // the session capacity is refilled and we can fully write the remaining message payload. This |
| // area could use some enhancement to allow control of write and flush when dealing with delivery |
| // modes that have low assurance versus those that are strict. |
| if (aborted()) { |
| delivery.abort(); |
| succeeded(); |
| } else { |
| boolean wasAutoFlushOn = sender.connection().autoFlushOff(); |
| try { |
| delivery.streamBytes(payload, complete); |
| if (payload != null && payload.isReadable()) { |
| sender.addToHeadOfBlockedQueue(this); |
| } else { |
| succeeded(); |
| } |
| } finally { |
| if (wasAutoFlushOn) { |
| sender.connection().flush(); |
| sender.connection().autoFlushOn(); |
| } |
| } |
| } |
| } |
| |
| public ClientException createSendTimedOutException() { |
| return new ClientSendTimedOutException("Timed out waiting for credit to send"); |
| } |
| } |
| } |