blob: 0edebac48ed30176e50caac7c05a7c79b140dee2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.util.Map;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.StreamSender;
import org.apache.qpid.protonj2.client.StreamSenderOptions;
import org.apache.qpid.protonj2.client.StreamTracker;
import org.apache.qpid.protonj2.client.Tracker;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
import org.apache.qpid.protonj2.engine.util.StringUtils;
import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
/**
* Client implementation of a {@link StreamSender}.
*/
public final class ClientStreamSender extends ClientSender implements StreamSender {
private final StreamSenderOptions options;
ClientStreamSender(ClientSession session, StreamSenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
super(session, options, senderId, protonSender);
this.options = new StreamSenderOptions(options);
}
@Override
public StreamTracker send(Message<?> message) throws ClientException {
checkClosedOrFailed();
return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, true);
}
@Override
public StreamTracker send(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
checkClosedOrFailed();
return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, true);
}
@Override
public StreamTracker trySend(Message<?> message) throws ClientException {
checkClosedOrFailed();
return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, false);
}
@Override
public StreamTracker trySend(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
checkClosedOrFailed();
return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, false);
}
@Override
public ClientStreamSenderMessage beginMessage() throws ClientException {
return beginMessage(null);
}
@Override
public ClientStreamSenderMessage beginMessage(Map<String, Object> deliveryAnnotations) throws ClientException {
checkClosedOrFailed();
final ClientFuture<ClientStreamSenderMessage> request = session.getFutureFactory().createFuture();
final DeliveryAnnotations annotations;
if (deliveryAnnotations != null) {
annotations = new DeliveryAnnotations(StringUtils.toSymbolKeyedMap(deliveryAnnotations));
} else {
annotations = null;
}
executor.execute(() -> {
if (protonSender.current() != null) {
request.failed(new ClientIllegalStateException(
"Cannot initiate a new streaming send until the previous one is complete"));
} else {
// Grab the next delivery and hold for stream writes, no other sends
// can occur while we hold the delivery.
final OutgoingDelivery streamDelivery = protonSender.next();
final ClientStreamTracker streamTracker = createTracker(streamDelivery);
streamDelivery.setLinkedResource(streamTracker);
request.complete(new ClientStreamSenderMessage(this, streamTracker, annotations));
}
});
return session.request(this, request);
}
//----- Internal API
@Override
StreamSenderOptions options() {
return this.options;
}
@Override
ClientStreamSender open() {
return (ClientStreamSender) super.open();
}
StreamTracker sendMessage(ClientStreamSenderMessage context, ProtonBuffer payload, int messageFormat) throws ClientException {
checkClosedOrFailed();
final ClientFuture<Tracker> operation = session.getFutureFactory().createFuture();
final ProtonBuffer buffer = payload;
final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(
this, context.getProtonDelivery(), messageFormat, buffer, context.completed(), operation);
executor.execute(() -> {
if (notClosedOrFailed(operation)) {
try {
if (protonSender.isSendable()) {
session.getTransactionContext().send(envelope, null, isSendingSettled());
} else {
addToHeadOfBlockedQueue(envelope);
}
} catch (Exception error) {
operation.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
}
}
});
return (StreamTracker) session.request(this, operation);
}
@Override
protected ClientStreamTracker createTracker(OutgoingDelivery delivery) {
return new ClientStreamTracker(this, delivery);
}
@Override
protected ClientNoOpStreamTracker createNoOpTracker() {
return new ClientNoOpStreamTracker(this);
}
}