blob: ac6a44d39528be98f15f0130c247c16db054a4aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.StreamReceiverMessage;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientMessageFormatViolationException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.codec.DecodeEOFException;
import org.apache.qpid.protonj2.codec.DecodeException;
import org.apache.qpid.protonj2.codec.StreamDecoder;
import org.apache.qpid.protonj2.codec.StreamDecoderState;
import org.apache.qpid.protonj2.codec.StreamTypeDecoder;
import org.apache.qpid.protonj2.codec.decoders.ProtonStreamDecoderFactory;
import org.apache.qpid.protonj2.codec.decoders.primitives.BinaryTypeDecoder;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.messaging.AmqpSequence;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.ApplicationProperties;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
import org.apache.qpid.protonj2.types.messaging.Properties;
import org.apache.qpid.protonj2.types.messaging.Section;
import org.apache.qpid.protonj2.types.transport.Transfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streamed message delivery context used to request reads of possible split framed
* {@link Transfer} payload's that comprise a single large overall message.
*/
public final class ClientStreamReceiverMessage implements StreamReceiverMessage {
private static final Logger LOG = LoggerFactory.getLogger(ClientStreamReceiverMessage.class);
private enum StreamState {
IDLE,
HEADER_READ,
DELIVERY_ANNOTATIONS_READ,
MESSAGE_ANNOTATIONS_READ,
PROPERTIES_READ,
APPLICATION_PROPERTIES_READ,
BODY_PENDING,
BODY_READABLE,
FOOTER_READ,
DECODE_ERROR
}
private final ClientStreamReceiver receiver;
private final ClientStreamDelivery delivery;
private final InputStream deliveryStream;
private final IncomingDelivery protonDelivery;
private final StreamDecoder protonDecoder = ProtonStreamDecoderFactory.create();
private final StreamDecoderState decoderState = protonDecoder.newDecoderState();
private Header header;
private DeliveryAnnotations deliveryAnnotations;
private MessageAnnotations annotations;
private Properties properties;
private ApplicationProperties applicationProperties;
private Footer footer;
private StreamState currentState = StreamState.IDLE;
private MessageBodyInputStream bodyStream;
ClientStreamReceiverMessage(ClientStreamReceiver receiver, ClientStreamDelivery delivery, InputStream deliveryStream) {
this.receiver = receiver;
this.delivery = delivery;
this.deliveryStream = deliveryStream;
this.protonDelivery = delivery.protonDelivery();
}
@Override
public ClientStreamReceiver receiver() {
return receiver;
}
@Override
public ClientStreamDelivery delivery() {
return delivery;
}
@Override
public boolean aborted() {
if (protonDelivery != null) {
return protonDelivery.isAborted();
} else {
return false;
}
}
@Override
public boolean completed() {
if (protonDelivery != null) {
return !protonDelivery.isPartial() && !protonDelivery.isAborted();
} else {
return false;
}
}
@Override
public int messageFormat() throws ClientException {
return protonDelivery != null ? protonDelivery.getMessageFormat() : 0;
}
@Override
public StreamReceiverMessage messageFormat(int messageFormat) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiverMessage");
}
//----- Header API implementation
@Override
public boolean durable() throws ClientException {
return header() != null ? header.isDurable() : false;
}
@Override
public StreamReceiverMessage durable(boolean durable) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public byte priority() throws ClientException {
return header() != null ? header.getPriority() : Header.DEFAULT_PRIORITY;
}
@Override
public StreamReceiverMessage priority(byte priority) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public long timeToLive() throws ClientException {
return header() != null ? header.getTimeToLive() : Header.DEFAULT_TIME_TO_LIVE;
}
@Override
public StreamReceiverMessage timeToLive(long timeToLive) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public boolean firstAcquirer() throws ClientException {
return header() != null ? header.isFirstAcquirer() : Header.DEFAULT_FIRST_ACQUIRER;
}
@Override
public StreamReceiverMessage firstAcquirer(boolean firstAcquirer) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public long deliveryCount() throws ClientException {
return header() != null ? header.getDeliveryCount() : Header.DEFAULT_DELIVERY_COUNT;
}
@Override
public StreamReceiverMessage deliveryCount(long deliveryCount) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public Header header() throws ClientException {
ensureStreamDecodedTo(StreamState.HEADER_READ);
return header;
}
@Override
public StreamReceiverMessage header(Header header) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
//----- Properties API implementation
@Override
public Object messageId() throws ClientException {
if (properties() != null) {
return properties().getMessageId();
} else {
return null;
}
}
@Override
public StreamReceiverMessage messageId(Object messageId) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public byte[] userId() throws ClientException {
if (properties() != null) {
byte[] copyOfUserId = null;
if (properties != null && properties().getUserId() != null) {
copyOfUserId = properties().getUserId().arrayCopy();
}
return copyOfUserId;
} else {
return null;
}
}
@Override
public StreamReceiverMessage userId(byte[] userId) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public String to() throws ClientException {
if (properties() != null) {
return properties().getTo();
} else {
return null;
}
}
@Override
public StreamReceiverMessage to(String to) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public String subject() throws ClientException {
if (properties() != null) {
return properties().getSubject();
} else {
return null;
}
}
@Override
public StreamReceiverMessage subject(String subject) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public String replyTo() throws ClientException {
if (properties() != null) {
return properties().getReplyTo();
} else {
return null;
}
}
@Override
public StreamReceiverMessage replyTo(String replyTo) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public Object correlationId() throws ClientException {
if (properties() != null) {
return properties().getCorrelationId();
} else {
return null;
}
}
@Override
public StreamReceiverMessage correlationId(Object correlationId) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public String contentType() throws ClientException {
if (properties() != null) {
return properties().getContentType();
} else {
return null;
}
}
@Override
public StreamReceiverMessage contentType(String contentType) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public String contentEncoding() throws ClientException {
if (properties() != null) {
return properties().getContentEncoding();
} else {
return null;
}
}
@Override
public Message<?> contentEncoding(String contentEncoding) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public long absoluteExpiryTime() throws ClientException {
if (properties() != null) {
return properties().getAbsoluteExpiryTime();
} else {
return 0l;
}
}
@Override
public StreamReceiverMessage absoluteExpiryTime(long expiryTime) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public long creationTime() throws ClientException {
if (properties() != null) {
return properties().getCreationTime();
} else {
return 0l;
}
}
@Override
public StreamReceiverMessage creationTime(long createTime) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public String groupId() throws ClientException {
if (properties() != null) {
return properties().getGroupId();
} else {
return null;
}
}
@Override
public StreamReceiverMessage groupId(String groupId) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public int groupSequence() throws ClientException {
if (properties() != null) {
return (int) properties().getGroupSequence();
} else {
return 0;
}
}
@Override
public StreamReceiverMessage groupSequence(int groupSequence) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public String replyToGroupId() throws ClientException {
if (properties() != null) {
return properties().getReplyToGroupId();
} else {
return null;
}
}
@Override
public StreamReceiverMessage replyToGroupId(String replyToGroupId) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public Properties properties() throws ClientException {
ensureStreamDecodedTo(StreamState.PROPERTIES_READ);
return properties;
}
@Override
public StreamReceiverMessage properties(Properties properties) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
//----- Delivery Annotations API (Internal Access Only)
DeliveryAnnotations deliveryAnnotations() throws ClientException {
ensureStreamDecodedTo(StreamState.DELIVERY_ANNOTATIONS_READ);
return deliveryAnnotations;
}
//----- Message Annotations API
@Override
public Object annotation(String key) throws ClientException {
if (hasAnnotations()) {
return annotations.getValue().get(Symbol.valueOf(key));
} else {
return null;
}
}
@Override
public boolean hasAnnotation(String key) throws ClientException {
if (hasAnnotations()) {
return annotations.getValue().containsKey(Symbol.valueOf(key));
} else {
return false;
}
}
@Override
public boolean hasAnnotations() throws ClientException {
ensureStreamDecodedTo(StreamState.MESSAGE_ANNOTATIONS_READ);
return annotations != null && annotations.getValue() != null && annotations.getValue().size() > 0;
}
@Override
public Object removeAnnotation(String key) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public StreamReceiverMessage forEachAnnotation(BiConsumer<String, Object> action) throws ClientException {
if (hasAnnotations()) {
annotations.getValue().forEach((key, value) -> {
action.accept(key.toString(), value);
});
}
return this;
}
@Override
public StreamReceiverMessage annotation(String key, Object value) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public MessageAnnotations annotations() throws ClientException {
if (hasAnnotations()) {
return annotations;
} else {
return null;
}
}
@Override
public StreamReceiverMessage annotations(MessageAnnotations messageAnnotations) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
//----- Application Properties API
@Override
public Object property(String key) throws ClientException {
if (hasProperties()) {
return applicationProperties.getValue().get(key);
} else {
return null;
}
}
@Override
public boolean hasProperty(String key) throws ClientException {
if (hasProperties()) {
return applicationProperties.getValue().containsKey(key);
} else {
return false;
}
}
@Override
public boolean hasProperties() throws ClientException {
ensureStreamDecodedTo(StreamState.APPLICATION_PROPERTIES_READ);
return applicationProperties != null &&
applicationProperties.getValue() != null &&
applicationProperties.getValue().size() > 0;
}
@Override
public Object removeProperty(String key) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public StreamReceiverMessage forEachProperty(BiConsumer<String, Object> action) throws ClientException {
if (hasProperties()) {
applicationProperties.getValue().forEach(action);
}
return this;
}
@Override
public StreamReceiverMessage property(String key, Object value) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public ApplicationProperties applicationProperties() throws ClientException {
if (hasProperties()) {
return applicationProperties;
} else {
return null;
}
}
@Override
public StreamReceiverMessage applicationProperties(ApplicationProperties applicationProperties) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
//----- Message Footer API
@Override
public Object footer(String key) throws ClientException {
if (hasFooters()) {
return footer.getValue().get(Symbol.valueOf(key));
} else {
return null;
}
}
@Override
public boolean hasFooter(String key) throws ClientException {
if (hasFooters()) {
return footer.getValue().containsKey(Symbol.valueOf(key));
} else {
return false;
}
}
@Override
public boolean hasFooters() throws ClientException {
ensureStreamDecodedTo(StreamState.BODY_READABLE);
if (currentState != StreamState.FOOTER_READ) {
if (currentState == StreamState.DECODE_ERROR) {
throw new ClientException("Cannot read Footer due to decoding error in message payload");
} else {
throw new ClientIllegalStateException("Cannot read message Footer until message body fully read");
}
}
return footer != null && footer.getValue() != null && footer.getValue().size() > 0;
}
@Override
public Object removeFooter(String key) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public StreamReceiverMessage forEachFooter(BiConsumer<String, Object> action) throws ClientException {
if (hasFooters()) {
footer.getValue().forEach((key, value) -> {
action.accept(key.toString(), value);
});
}
return this;
}
@Override
public StreamReceiverMessage footer(String key, Object value) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
@Override
public Footer footer() throws ClientException {
if (hasFooters()) {
return footer;
} else {
return null;
}
}
@Override
public StreamReceiverMessage footer(Footer footer) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
//----- Message Body Access API
@Override
public StreamReceiverMessage addBodySection(Section<?> bodySection) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot encode from an StreamReceiverMessage instance.");
}
@Override
public StreamReceiverMessage bodySections(Collection<Section<?>> sections) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot encode from an StreamReceiverMessage instance.");
}
@Override
public Collection<Section<?>> bodySections() throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot decode all body sections from a StreamReceiverMessage instance.");
}
@Override
public StreamReceiverMessage forEachBodySection(Consumer<Section<?>> consumer) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot decode all body sections from a StreamReceiverMessage instance.");
}
@Override
public StreamReceiverMessage clearBodySections() throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot encode from an StreamReceiverMessage instance.");
}
@Override
public InputStream body() throws ClientException {
if (currentState.ordinal() > StreamState.BODY_READABLE.ordinal()) {
if (currentState == StreamState.DECODE_ERROR) {
throw new ClientException("Cannot read body due to decoding error in message payload");
} else if (bodyStream != null) {
throw new ClientIllegalStateException("Cannot read body from message whose body has already been read.");
}
}
ensureStreamDecodedTo(StreamState.BODY_READABLE);
return bodyStream;
}
@Override
public StreamReceiverMessage body(InputStream value) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot encode from an StreamReceiverMessage instance.");
}
//----- AdvancedMessage encoding API implementation.
@Override
public ProtonBuffer encode(Map<String, Object> deliveryAnnotations) throws ClientUnsupportedOperationException {
throw new ClientUnsupportedOperationException("Cannot encode from an StreamReceiverMessage instance.");
}
//----- Internal Streamed Delivery API and support methods
private void checkClosedOrAborted() throws ClientIllegalStateException {
if (receiver.isClosed()) {
throw new ClientIllegalStateException("The parent Receiver instance has already been closed.");
}
if (aborted()) {
throw new ClientIllegalStateException("The incoming delivery was aborted.");
}
}
private void ensureStreamDecodedTo(StreamState desiredState) throws ClientException {
checkClosedOrAborted();
while (currentState.ordinal() < desiredState.ordinal()) {
try {
final StreamTypeDecoder<?> decoder;
try {
decoder = protonDecoder.readNextTypeDecoder(deliveryStream, decoderState);
} catch (DecodeEOFException eof) {
currentState = StreamState.FOOTER_READ;
break;
}
final Class<?> typeClass = decoder.getTypeClass();
if (typeClass == Header.class) {
header = (Header) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.HEADER_READ;
} else if (typeClass == DeliveryAnnotations.class) {
deliveryAnnotations = (DeliveryAnnotations) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.DELIVERY_ANNOTATIONS_READ;
} else if (typeClass == MessageAnnotations.class) {
annotations = (MessageAnnotations) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.MESSAGE_ANNOTATIONS_READ;
} else if (typeClass == Properties.class) {
properties = (Properties) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.PROPERTIES_READ;
} else if (typeClass == ApplicationProperties.class) {
applicationProperties = (ApplicationProperties) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.APPLICATION_PROPERTIES_READ;
} else if (typeClass == AmqpSequence.class) {
currentState = StreamState.BODY_READABLE;
if (bodyStream == null) {
bodyStream = new AmqpSequenceInputStream(deliveryStream);
}
} else if (typeClass == AmqpValue.class) {
currentState = StreamState.BODY_READABLE;
if (bodyStream == null) {
bodyStream = new AmqpValueInputStream(deliveryStream);
}
} else if (typeClass == Data.class) {
currentState = StreamState.BODY_READABLE;
if (bodyStream == null) {
bodyStream = new DataSectionInputStream(deliveryStream);
}
} else if (typeClass == Footer.class) {
footer = (Footer) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.FOOTER_READ;
} else {
throw new ClientMessageFormatViolationException("Incoming message carries unknown Section");
}
} catch (ClientMessageFormatViolationException | DecodeException ex) {
currentState = StreamState.DECODE_ERROR;
if (deliveryStream != null) {
try {
deliveryStream.close();
} catch (IOException e) {
}
}
// TODO: At the moment there is no automatic rejection or release etc
// of the delivery. The user is expected to apply a disposition in
// response to this error that initiates the desired outcome. We
// could look to add auto settlement with a configured outcome in
// the future.
throw ClientExceptionSupport.createNonFatalOrPassthrough(ex);
}
}
}
//----- Internal InputStream implementations
private abstract class MessageBodyInputStream extends FilterInputStream {
protected boolean closed;
protected long remainingSectionBytes = 0;
protected MessageBodyInputStream(InputStream deliveryStream) throws ClientException {
super(deliveryStream);
validateAndScanNextSection();
}
@Override
public void close() throws IOException {
try {
// This will check is another body section is present or if there was a footer and if
// a Footer is present it will be decoded and the message payload should be fully consumed
// at that point. Otherwise the underlying raw InputStream will handle the task of
// discarding pending bytes for the message to ensure the receiver does not still on
// waiting for session window to be opened.
if (remainingSectionBytes == 0) {
ensureStreamDecodedTo(StreamState.FOOTER_READ);
}
} catch (ClientException e) {
throw new IOException("Caught error while attempting to advance past remaining message body");
} finally {
this.closed = true;
super.close();
}
}
@Override
public int read() throws IOException {
checkClosed();
while (true) {
if (remainingSectionBytes == 0 && !tryMoveToNextBodySection()) {
return -1; // Cannot read any further.
} else {
remainingSectionBytes--;
return super.read();
}
}
}
@Override
public int read(byte target[], int offset, int length) throws IOException {
checkClosed();
int bytesRead = 0;
while (bytesRead != length) {
if (remainingSectionBytes == 0 && !tryMoveToNextBodySection()) {
bytesRead = bytesRead > 0 ? bytesRead : -1;
break; // We are at the end of the body sections
}
final int readChunk = (int) Math.min(remainingSectionBytes, length - bytesRead);
final int actualRead = super.read(target, offset + bytesRead, readChunk);
if (actualRead > 0) {
bytesRead += actualRead;
remainingSectionBytes -= actualRead;
}
}
return bytesRead;
}
@Override
public long skip(long skipSize) throws IOException {
checkClosed();
int bytesSkipped = 0;
while (bytesSkipped != skipSize) {
if (remainingSectionBytes == 0 && !tryMoveToNextBodySection()) {
bytesSkipped = bytesSkipped > 0 ? bytesSkipped : -1;
break; // We are at the end of the body sections
}
final long skipChunk = (int) Math.min(remainingSectionBytes, skipSize - bytesSkipped);
final long actualSkip = super.skip(skipChunk);
// Ensure we handle wrapped stream not honoring the API and returning -1 for EOF
if (actualSkip > 0) {
bytesSkipped += actualSkip;
remainingSectionBytes -= actualSkip;
}
}
return bytesSkipped;
}
public abstract Class<?> getBodyTypeClass();
protected abstract void validateAndScanNextSection() throws ClientException;
protected boolean tryMoveToNextBodySection() throws IOException {
try {
if (currentState != StreamState.FOOTER_READ) {
currentState = StreamState.BODY_PENDING;
ensureStreamDecodedTo(StreamState.BODY_READABLE);
if (currentState == StreamState.BODY_READABLE) {
validateAndScanNextSection();
return true;
}
}
return false;
} catch (ClientException e) {
throw new IOException(e);
}
}
protected void checkClosed() throws IOException {
if (closed) {
throw new IOException("Stream was closed previously");
}
}
}
private class DataSectionInputStream extends MessageBodyInputStream {
public DataSectionInputStream(InputStream deliveryStream) throws ClientException {
super(deliveryStream);
}
@Override
public Class<?> getBodyTypeClass() {
return byte[].class;
}
@Override
protected void validateAndScanNextSection() throws ClientException {
final StreamTypeDecoder<?> typeDecoder =
protonDecoder.readNextTypeDecoder(deliveryStream, decoderState);
if (typeDecoder.getTypeClass() == Binary.class) {
LOG.trace("Data Section of size {} ready for read.", remainingSectionBytes);
BinaryTypeDecoder binaryDecoder = (BinaryTypeDecoder) typeDecoder;
remainingSectionBytes = binaryDecoder.readSize(deliveryStream);
} else if (typeDecoder.getTypeClass() == Void.class) {
// Null body in the Data section which can be skipped.
LOG.trace("Data Section with no Binary payload read and skipped.");
remainingSectionBytes = 0;
} else {
throw new DecodeException("Unknown payload in body of Data Section encoding.");
}
}
}
private class AmqpSequenceInputStream extends MessageBodyInputStream {
public AmqpSequenceInputStream(InputStream deliveryStream) throws ClientException {
super(deliveryStream);
}
@Override
public Class<?> getBodyTypeClass() {
return List.class;
}
@Override
protected void validateAndScanNextSection() throws ClientException {
throw new DecodeException("Cannot read the binary payload of an AMQP Sequence body.");
}
}
private class AmqpValueInputStream extends MessageBodyInputStream {
private Class<?> bodyTypeClass = Void.class;
public AmqpValueInputStream(InputStream deliveryStream) throws ClientException {
super(deliveryStream);
}
@Override
public Class<?> getBodyTypeClass() {
return bodyTypeClass;
}
@Override
protected void validateAndScanNextSection() throws ClientException {
throw new DecodeException("Cannot read the binary payload of an AMQP Value body.");
}
}
}