blob: 6f5df4d4ec7bdf4ef944b71dd90b503ff8554e95 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.client.AdvancedMessage;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.types.Binary;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.messaging.ApplicationProperties;
import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
import org.apache.qpid.protonj2.types.messaging.Properties;
import org.apache.qpid.protonj2.types.messaging.Section;
import org.apache.qpid.protonj2.types.messaging.Section.SectionType;
/**
* Client provided {@link AdvancedMessage} implementation that is used when sending messages
* from a {@link ClientSender} or when decoding an AMQP Transfer for which all frames have
* arrived.
*
* @param <E> the body type that the {@link Message} carries
*/
public final class ClientMessage<E> implements AdvancedMessage<E> {
private Header header;
private MessageAnnotations messageAnnotations;
private Properties properties;
private ApplicationProperties applicationProperties;
private Section<E> body;
private List<Section<?>> bodySections;
private Footer footer;
private int messageFormat;
/**
* Create a new {@link ClientMessage} instance with no default body section.
*/
ClientMessage() {
this.body = null;
}
/**
* Create a new {@link ClientMessage} instance with a {@link Section} value that
* will comprise the body of this message instance.
*
* @param body
* The object that comprises the value portion of the body {@link Section}.
*/
ClientMessage(Section<E> body) {
this.body = body;
}
@Override
public AdvancedMessage<E> toAdvancedMessage() {
return this;
}
//----- Entry point for creating new ClientMessage instances.
/**
* Creates an empty {@link ClientMessage} instance.
*
* @param <V> The type of the body value carried in this message.
*
* @return a new empty {@link ClientMessage} instance.
*/
public static <V> ClientMessage<V> create() {
return new ClientMessage<V>();
}
/**
* Creates an {@link ClientMessage} instance with the given body {@link Section} value.
*
* @param <V> The type of the body value carried in this message body section.
*
* @param body
* The body {@link Section} to assign to the created message instance.
*
* @return a new {@link ClientMessage} instance with the given body.
*/
public static <V> ClientMessage<V> create(Section<V> body) {
return new ClientMessage<V>(body);
}
/**
* Creates an empty {@link ClientMessage} instance.
*
* @param <V> The type of the body value carried in this message.
*
* @return a new empty {@link ClientMessage} instance.
*/
public static <V> ClientMessage<V> createAdvancedMessage() {
return new ClientMessage<V>();
}
//----- Message Header API
@Override
public boolean durable() {
return header == null ? Header.DEFAULT_DURABILITY : header.isDurable();
}
@Override
public ClientMessage<E> durable(boolean durable) {
lazyCreateHeader().setDurable(durable);
return this;
}
@Override
public byte priority() {
return header == null ? Header.DEFAULT_PRIORITY : header.getPriority();
}
@Override
public ClientMessage<E> priority(byte priority) {
lazyCreateHeader().setPriority(priority);
return this;
}
@Override
public long timeToLive() {
return header == null ? Header.DEFAULT_TIME_TO_LIVE : header.getTimeToLive();
}
@Override
public ClientMessage<E> timeToLive(long timeToLive) {
lazyCreateHeader().setTimeToLive(timeToLive);
return this;
}
@Override
public boolean firstAcquirer() {
return header == null ? Header.DEFAULT_FIRST_ACQUIRER : header.isFirstAcquirer();
}
@Override
public ClientMessage<E> firstAcquirer(boolean firstAcquirer) {
lazyCreateHeader().setFirstAcquirer(firstAcquirer);
return this;
}
@Override
public long deliveryCount() {
return header == null ? Header.DEFAULT_DELIVERY_COUNT : header.getDeliveryCount();
}
@Override
public ClientMessage<E> deliveryCount(long deliveryCount) {
lazyCreateHeader().setDeliveryCount(deliveryCount);
return this;
}
//----- Message Properties access
@Override
public Object messageId() {
return properties != null ? properties.getMessageId() : null;
}
@Override
public Message<E> messageId(Object messageId) {
lazyCreateProperties().setMessageId(messageId);
return this;
}
@Override
public byte[] userId() {
byte[] copyOfUserId = null;
if (properties != null && properties.getUserId() != null) {
copyOfUserId = properties.getUserId().arrayCopy();
}
return copyOfUserId;
}
@Override
public Message<E> userId(byte[] userId) {
lazyCreateProperties().setUserId(new Binary(Arrays.copyOf(userId, userId.length)));
return this;
}
@Override
public String to() {
return properties != null ? properties.getTo() : null;
}
@Override
public Message<E> to(String to) {
lazyCreateProperties().setTo(to);
return this;
}
@Override
public String subject() {
return properties != null ? properties.getSubject() : null;
}
@Override
public Message<E> subject(String subject) {
lazyCreateProperties().setSubject(subject);
return this;
}
@Override
public String replyTo() {
return properties != null ? properties.getReplyTo() : null;
}
@Override
public Message<E> replyTo(String replyTo) {
lazyCreateProperties().setReplyTo(replyTo);
return this;
}
@Override
public Object correlationId() {
return properties != null ? properties.getCorrelationId() : null;
}
@Override
public Message<E> correlationId(Object correlationId) {
lazyCreateProperties().setCorrelationId(correlationId);
return this;
}
@Override
public String contentType() {
return properties != null ? properties.getContentType() : null;
}
@Override
public Message<E> contentType(String contentType) {
lazyCreateProperties().setContentType(contentType);
return this;
}
@Override
public String contentEncoding() {
return properties != null ? properties.getContentEncoding() : null;
}
@Override
public Message<E> contentEncoding(String contentEncoding) {
lazyCreateProperties().setContentEncoding(contentEncoding);
return this;
}
@Override
public long absoluteExpiryTime() {
return properties != null ? properties.getAbsoluteExpiryTime() : 0;
}
@Override
public Message<E> absoluteExpiryTime(long expiryTime) {
lazyCreateProperties().setAbsoluteExpiryTime(expiryTime);
return this;
}
@Override
public long creationTime() {
return properties != null ? properties.getCreationTime() : 0;
}
@Override
public Message<E> creationTime(long createTime) {
lazyCreateProperties().setCreationTime(createTime);
return this;
}
@Override
public String groupId() {
return properties != null ? properties.getGroupId() : null;
}
@Override
public Message<E> groupId(String groupId) {
lazyCreateProperties().setGroupId(groupId);
return this;
}
@Override
public int groupSequence() {
return properties != null ? (int) properties.getGroupSequence() : 0;
}
@Override
public Message<E> groupSequence(int groupSequence) {
lazyCreateProperties().setGroupSequence(groupSequence);
return this;
}
@Override
public String replyToGroupId() {
return properties != null ? properties.getReplyToGroupId() : null;
}
@Override
public Message<E> replyToGroupId(String replyToGroupId) {
lazyCreateProperties().setReplyToGroupId(replyToGroupId);
return this;
}
//----- Message Annotations Access
@Override
public Object annotation(String key) {
if (hasAnnotations()) {
return messageAnnotations.getValue().get(Symbol.valueOf(key));
} else {
return null;
}
}
@Override
public boolean hasAnnotation(String key) {
if (hasAnnotations()) {
return messageAnnotations.getValue().containsKey(Symbol.valueOf(key));
} else {
return false;
}
}
@Override
public boolean hasAnnotations() {
return messageAnnotations != null &&
messageAnnotations.getValue() != null &&
messageAnnotations.getValue().size() > 0;
}
@Override
public Object removeAnnotation(String key) {
if (hasAnnotations()) {
return messageAnnotations.getValue().remove(Symbol.valueOf(key));
} else {
return null;
}
}
@Override
public Message<E> forEachAnnotation(BiConsumer<String, Object> action) {
if (hasAnnotations()) {
messageAnnotations.getValue().forEach((key, value) -> {
action.accept(key.toString(), value);
});
}
return this;
}
@Override
public ClientMessage<E> annotation(String key, Object value) {
lazyCreateMessageAnnotations().getValue().put(Symbol.valueOf(key),value);
return this;
}
//----- Application Properties Access
@Override
public Object property(String key) {
if (hasProperties()) {
return applicationProperties.getValue().get(key);
} else {
return null;
}
}
@Override
public boolean hasProperty(String key) {
if (hasProperties()) {
return applicationProperties.getValue().containsKey(key);
} else {
return false;
}
}
@Override
public boolean hasProperties() {
return applicationProperties != null &&
applicationProperties.getValue() != null &&
applicationProperties.getValue().size() > 0;
}
@Override
public Object removeProperty(String key) {
if (hasProperties()) {
return applicationProperties.getValue().remove(key);
} else {
return null;
}
}
@Override
public Message<E> forEachProperty(BiConsumer<String, Object> action) {
if (hasProperties()) {
applicationProperties.getValue().forEach(action);
}
return this;
}
@Override
public ClientMessage<E> property(String key, Object value) {
lazyCreateApplicationProperties().getValue().put(key,value);
return this;
}
//----- Footer Access
@Override
public Object footer(String key) {
if (hasFooters()) {
return footer.getValue().get(Symbol.valueOf(key));
} else {
return null;
}
}
@Override
public boolean hasFooter(String key) {
if (hasFooters()) {
return footer.getValue().containsKey(Symbol.valueOf(key));
} else {
return false;
}
}
@Override
public boolean hasFooters() {
return footer != null &&
footer.getValue() != null &&
footer.getValue().size() > 0;
}
@Override
public Object removeFooter(String key) {
if (hasFooters()) {
return footer.getValue().remove(Symbol.valueOf(key));
} else {
return null;
}
}
@Override
public Message<E> forEachFooter(BiConsumer<String, Object> action) {
if (hasFooters()) {
footer.getValue().forEach((key, value) -> {
action.accept(key.toString(), value);
});
}
return this;
}
@Override
public ClientMessage<E> footer(String key, Object value) {
lazyCreateFooter().getValue().put(Symbol.valueOf(key),value);
return this;
}
//----- Message body access
@SuppressWarnings("unchecked")
@Override
public E body() {
Section<E> section = body;
if (bodySections != null) {
section = (Section<E>) bodySections.get(0);
}
return section != null ? section.getValue() : null;
}
@Override
public ClientMessage<E> body(E value) {
clearBodySections();
body = ClientMessageSupport.createSectionFromValue(value);
return this;
}
//----- Internal API
private Header lazyCreateHeader() {
if (header == null) {
header = new Header();
}
return header;
}
private Properties lazyCreateProperties() {
if (properties == null) {
properties = new Properties();
}
return properties;
}
private ApplicationProperties lazyCreateApplicationProperties() {
if (applicationProperties == null) {
applicationProperties = new ApplicationProperties(new LinkedHashMap<>());
}
return applicationProperties;
}
private MessageAnnotations lazyCreateMessageAnnotations() {
if (messageAnnotations == null) {
messageAnnotations = new MessageAnnotations(new LinkedHashMap<>());
}
return messageAnnotations;
}
private Footer lazyCreateFooter() {
if (footer == null) {
footer = new Footer(new LinkedHashMap<>());
}
return footer;
}
//----- AdvancedMessage interface implementation
@Override
public Header header() {
return header;
}
@Override
public ClientMessage<E> header(Header header) {
this.header = header;
return this;
}
@Override
public MessageAnnotations annotations() {
return messageAnnotations;
}
@Override
public ClientMessage<E> annotations(MessageAnnotations messageAnnotations) {
this.messageAnnotations = messageAnnotations;
return this;
}
@Override
public Properties properties() {
return properties;
}
@Override
public ClientMessage<E> properties(Properties properties) {
this.properties = properties;
return this;
}
@Override
public ApplicationProperties applicationProperties() {
return applicationProperties;
}
@Override
public ClientMessage<E> applicationProperties(ApplicationProperties applicationProperties) {
this.applicationProperties = applicationProperties;
return this;
}
@Override
public Footer footer() {
return footer;
}
@Override
public ClientMessage<E> footer(Footer footer) {
this.footer = footer;
return this;
}
@Override
public int messageFormat() {
return messageFormat;
}
@Override
public ClientMessage<E> messageFormat(int messageFormat) {
this.messageFormat = messageFormat;
return this;
}
@Override
public ProtonBuffer encode(Map<String, Object> deliveryAnnotations) throws ClientException {
return ClientMessageSupport.encodeMessage(this, deliveryAnnotations);
}
@SuppressWarnings({ "unchecked" })
@Override
public ClientMessage<E> addBodySection(Section<?> bodySection) {
Objects.requireNonNull(bodySection, "Additional Body Section cannot be null");
if (body == null && bodySections == null) {
body = (Section<E>) bodySection;
} else {
if (bodySections == null) {
bodySections = new ArrayList<>();
// Preserve older section from original message creation.
if (body != null) {
bodySections.add(body);
body = null;
}
}
bodySections.add(validateBodySections(messageFormat, bodySections, bodySection));
}
return this;
}
@Override
public ClientMessage<E> bodySections(Collection<Section<?>> sections) {
if (sections == null || sections.isEmpty()) {
bodySections = null;
} else {
List<Section<?>> result = new ArrayList<>(sections.size());
sections.forEach(section -> result.add(validateBodySections(messageFormat, result, section)));
bodySections = result;
}
body = null; // Body is always cleared by this assignment.
return this;
}
@SuppressWarnings("unchecked")
@Override
public Collection<Section<?>> bodySections() {
if (bodySections == null && body == null) {
return Collections.EMPTY_LIST;
} else if (body != null) {
return Collections.singletonList(body);
} else {
final Collection<Section<?>> result = new ArrayList<>();
forEachBodySection(section -> result.add(section));
return Collections.unmodifiableCollection(result);
}
}
@Override
public ClientMessage<E> forEachBodySection(Consumer<Section<?>> consumer) {
if (bodySections != null) {
bodySections.forEach(section -> {
consumer.accept(section);
});
} else {
if (body != null) {
consumer.accept(body);
}
}
return this;
}
@Override
public ClientMessage<E> clearBodySections() {
bodySections = null;
body = null;
return this;
}
private static Section<?> validateBodySections(int messageFormat, List<Section<?>> target, Section<?> section) {
if (messageFormat == 0 && target != null && !target.isEmpty()) {
switch (section.getType()) {
case AmqpSequence:
if (target.get(0).getType() != SectionType.AmqpSequence) {
throw new IllegalArgumentException(
"Message Format violation: AmqpSequence expected but got type: " + section.getType());
}
break;
case AmqpValue:
throw new IllegalArgumentException(
"Message Format violation: Only one AmqpValue section allowed");
case Data:
if (target.get(0).getType() != SectionType.Data) {
throw new IllegalArgumentException(
"Message Format violation: Data Section expected but got type: " + section.getType());
}
break;
default:
break;
}
}
return section;
}
}