blob: 6f5df4d4ec7bdf4ef944b71dd90b503ff8554e95 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.qpid.protonj2.client.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.client.AdvancedMessage;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.messaging.ApplicationProperties;
import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
import org.apache.qpid.protonj2.types.messaging.Properties;
import org.apache.qpid.protonj2.types.messaging.Section;
import org.apache.qpid.protonj2.types.messaging.Section.SectionType;
* Client provided {@link AdvancedMessage} implementation that is used when sending messages
* from a {@link ClientSender} or when decoding an AMQP Transfer for which all frames have
* arrived.
* @param <E> the body type that the {@link Message} carries
public final class ClientMessage<E> implements AdvancedMessage<E> {
private Header header;
private MessageAnnotations messageAnnotations;
private Properties properties;
private ApplicationProperties applicationProperties;
private Section<E> body;
private List<Section<?>> bodySections;
private Footer footer;
private int messageFormat;
* Create a new {@link ClientMessage} instance with no default body section.
ClientMessage() {
this.body = null;
* Create a new {@link ClientMessage} instance with a {@link Section} value that
* will comprise the body of this message instance.
* @param body
* The object that comprises the value portion of the body {@link Section}.
ClientMessage(Section<E> body) {
this.body = body;
public AdvancedMessage<E> toAdvancedMessage() {
return this;
//----- Entry point for creating new ClientMessage instances.
* Creates an empty {@link ClientMessage} instance.
* @param <V> The type of the body value carried in this message.
* @return a new empty {@link ClientMessage} instance.
public static <V> ClientMessage<V> create() {
return new ClientMessage<V>();
* Creates an {@link ClientMessage} instance with the given body {@link Section} value.
* @param <V> The type of the body value carried in this message body section.
* @param body
* The body {@link Section} to assign to the created message instance.
* @return a new {@link ClientMessage} instance with the given body.
public static <V> ClientMessage<V> create(Section<V> body) {
return new ClientMessage<V>(body);
* Creates an empty {@link ClientMessage} instance.
* @param <V> The type of the body value carried in this message.
* @return a new empty {@link ClientMessage} instance.
public static <V> ClientMessage<V> createAdvancedMessage() {
return new ClientMessage<V>();
//----- Message Header API
public boolean durable() {
return header == null ? Header.DEFAULT_DURABILITY : header.isDurable();
public ClientMessage<E> durable(boolean durable) {
return this;
public byte priority() {
return header == null ? Header.DEFAULT_PRIORITY : header.getPriority();
public ClientMessage<E> priority(byte priority) {
return this;
public long timeToLive() {
return header == null ? Header.DEFAULT_TIME_TO_LIVE : header.getTimeToLive();
public ClientMessage<E> timeToLive(long timeToLive) {
return this;
public boolean firstAcquirer() {
return header == null ? Header.DEFAULT_FIRST_ACQUIRER : header.isFirstAcquirer();
public ClientMessage<E> firstAcquirer(boolean firstAcquirer) {
return this;
public long deliveryCount() {
return header == null ? Header.DEFAULT_DELIVERY_COUNT : header.getDeliveryCount();
public ClientMessage<E> deliveryCount(long deliveryCount) {
return this;
//----- Message Properties access
public Object messageId() {
return properties != null ? properties.getMessageId() : null;
public Message<E> messageId(Object messageId) {
return this;
public byte[] userId() {
byte[] copyOfUserId = null;
if (properties != null && properties.getUserId() != null) {
copyOfUserId = properties.getUserId().arrayCopy();
return copyOfUserId;
public Message<E> userId(byte[] userId) {
lazyCreateProperties().setUserId(new Binary(Arrays.copyOf(userId, userId.length)));
return this;
public String to() {
return properties != null ? properties.getTo() : null;
public Message<E> to(String to) {
return this;
public String subject() {
return properties != null ? properties.getSubject() : null;
public Message<E> subject(String subject) {
return this;
public String replyTo() {
return properties != null ? properties.getReplyTo() : null;
public Message<E> replyTo(String replyTo) {
return this;
public Object correlationId() {
return properties != null ? properties.getCorrelationId() : null;
public Message<E> correlationId(Object correlationId) {
return this;
public String contentType() {
return properties != null ? properties.getContentType() : null;
public Message<E> contentType(String contentType) {
return this;
public String contentEncoding() {
return properties != null ? properties.getContentEncoding() : null;
public Message<E> contentEncoding(String contentEncoding) {
return this;
public long absoluteExpiryTime() {
return properties != null ? properties.getAbsoluteExpiryTime() : 0;
public Message<E> absoluteExpiryTime(long expiryTime) {
return this;
public long creationTime() {
return properties != null ? properties.getCreationTime() : 0;
public Message<E> creationTime(long createTime) {
return this;
public String groupId() {
return properties != null ? properties.getGroupId() : null;
public Message<E> groupId(String groupId) {
return this;
public int groupSequence() {
return properties != null ? (int) properties.getGroupSequence() : 0;
public Message<E> groupSequence(int groupSequence) {
return this;
public String replyToGroupId() {
return properties != null ? properties.getReplyToGroupId() : null;
public Message<E> replyToGroupId(String replyToGroupId) {
return this;
//----- Message Annotations Access
public Object annotation(String key) {
if (hasAnnotations()) {
return messageAnnotations.getValue().get(Symbol.valueOf(key));
} else {
return null;
public boolean hasAnnotation(String key) {
if (hasAnnotations()) {
return messageAnnotations.getValue().containsKey(Symbol.valueOf(key));
} else {
return false;
public boolean hasAnnotations() {
return messageAnnotations != null &&
messageAnnotations.getValue() != null &&
messageAnnotations.getValue().size() > 0;
public Object removeAnnotation(String key) {
if (hasAnnotations()) {
return messageAnnotations.getValue().remove(Symbol.valueOf(key));
} else {
return null;
public Message<E> forEachAnnotation(BiConsumer<String, Object> action) {
if (hasAnnotations()) {
messageAnnotations.getValue().forEach((key, value) -> {
action.accept(key.toString(), value);
return this;
public ClientMessage<E> annotation(String key, Object value) {
return this;
//----- Application Properties Access
public Object property(String key) {
if (hasProperties()) {
return applicationProperties.getValue().get(key);
} else {
return null;
public boolean hasProperty(String key) {
if (hasProperties()) {
return applicationProperties.getValue().containsKey(key);
} else {
return false;
public boolean hasProperties() {
return applicationProperties != null &&
applicationProperties.getValue() != null &&
applicationProperties.getValue().size() > 0;
public Object removeProperty(String key) {
if (hasProperties()) {
return applicationProperties.getValue().remove(key);
} else {
return null;
public Message<E> forEachProperty(BiConsumer<String, Object> action) {
if (hasProperties()) {
return this;
public ClientMessage<E> property(String key, Object value) {
return this;
//----- Footer Access
public Object footer(String key) {
if (hasFooters()) {
return footer.getValue().get(Symbol.valueOf(key));
} else {
return null;
public boolean hasFooter(String key) {
if (hasFooters()) {
return footer.getValue().containsKey(Symbol.valueOf(key));
} else {
return false;
public boolean hasFooters() {
return footer != null &&
footer.getValue() != null &&
footer.getValue().size() > 0;
public Object removeFooter(String key) {
if (hasFooters()) {
return footer.getValue().remove(Symbol.valueOf(key));
} else {
return null;
public Message<E> forEachFooter(BiConsumer<String, Object> action) {
if (hasFooters()) {
footer.getValue().forEach((key, value) -> {
action.accept(key.toString(), value);
return this;
public ClientMessage<E> footer(String key, Object value) {
return this;
//----- Message body access
public E body() {
Section<E> section = body;
if (bodySections != null) {
section = (Section<E>) bodySections.get(0);
return section != null ? section.getValue() : null;
public ClientMessage<E> body(E value) {
body = ClientMessageSupport.createSectionFromValue(value);
return this;
//----- Internal API
private Header lazyCreateHeader() {
if (header == null) {
header = new Header();
return header;
private Properties lazyCreateProperties() {
if (properties == null) {
properties = new Properties();
return properties;
private ApplicationProperties lazyCreateApplicationProperties() {
if (applicationProperties == null) {
applicationProperties = new ApplicationProperties(new LinkedHashMap<>());
return applicationProperties;
private MessageAnnotations lazyCreateMessageAnnotations() {
if (messageAnnotations == null) {
messageAnnotations = new MessageAnnotations(new LinkedHashMap<>());
return messageAnnotations;
private Footer lazyCreateFooter() {
if (footer == null) {
footer = new Footer(new LinkedHashMap<>());
return footer;
//----- AdvancedMessage interface implementation
public Header header() {
return header;
public ClientMessage<E> header(Header header) {
this.header = header;
return this;
public MessageAnnotations annotations() {
return messageAnnotations;
public ClientMessage<E> annotations(MessageAnnotations messageAnnotations) {
this.messageAnnotations = messageAnnotations;
return this;
public Properties properties() {
return properties;
public ClientMessage<E> properties(Properties properties) { = properties;
return this;
public ApplicationProperties applicationProperties() {
return applicationProperties;
public ClientMessage<E> applicationProperties(ApplicationProperties applicationProperties) {
this.applicationProperties = applicationProperties;
return this;
public Footer footer() {
return footer;
public ClientMessage<E> footer(Footer footer) {
this.footer = footer;
return this;
public int messageFormat() {
return messageFormat;
public ClientMessage<E> messageFormat(int messageFormat) {
this.messageFormat = messageFormat;
return this;
public ProtonBuffer encode(Map<String, Object> deliveryAnnotations) throws ClientException {
return ClientMessageSupport.encodeMessage(this, deliveryAnnotations);
@SuppressWarnings({ "unchecked" })
public ClientMessage<E> addBodySection(Section<?> bodySection) {
Objects.requireNonNull(bodySection, "Additional Body Section cannot be null");
if (body == null && bodySections == null) {
body = (Section<E>) bodySection;
} else {
if (bodySections == null) {
bodySections = new ArrayList<>();
// Preserve older section from original message creation.
if (body != null) {
body = null;
bodySections.add(validateBodySections(messageFormat, bodySections, bodySection));
return this;
public ClientMessage<E> bodySections(Collection<Section<?>> sections) {
if (sections == null || sections.isEmpty()) {
bodySections = null;
} else {
List<Section<?>> result = new ArrayList<>(sections.size());
sections.forEach(section -> result.add(validateBodySections(messageFormat, result, section)));
bodySections = result;
body = null; // Body is always cleared by this assignment.
return this;
public Collection<Section<?>> bodySections() {
if (bodySections == null && body == null) {
return Collections.EMPTY_LIST;
} else if (body != null) {
return Collections.singletonList(body);
} else {
final Collection<Section<?>> result = new ArrayList<>();
forEachBodySection(section -> result.add(section));
return Collections.unmodifiableCollection(result);
public ClientMessage<E> forEachBodySection(Consumer<Section<?>> consumer) {
if (bodySections != null) {
bodySections.forEach(section -> {
} else {
if (body != null) {
return this;
public ClientMessage<E> clearBodySections() {
bodySections = null;
body = null;
return this;
private static Section<?> validateBodySections(int messageFormat, List<Section<?>> target, Section<?> section) {
if (messageFormat == 0 && target != null && !target.isEmpty()) {
switch (section.getType()) {
case AmqpSequence:
if (target.get(0).getType() != SectionType.AmqpSequence) {
throw new IllegalArgumentException(
"Message Format violation: AmqpSequence expected but got type: " + section.getType());
case AmqpValue:
throw new IllegalArgumentException(
"Message Format violation: Only one AmqpValue section allowed");
case Data:
if (target.get(0).getType() != SectionType.Data) {
throw new IllegalArgumentException(
"Message Format violation: Data Section expected but got type: " + section.getType());
return section;