| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.protonj2.client.impl; |
| |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.function.Consumer; |
| |
| import org.apache.qpid.protonj2.buffer.ProtonBuffer; |
| import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator; |
| import org.apache.qpid.protonj2.buffer.ProtonByteBufferAllocator; |
| import org.apache.qpid.protonj2.client.AdvancedMessage; |
| import org.apache.qpid.protonj2.client.Message; |
| import org.apache.qpid.protonj2.client.exceptions.ClientException; |
| import org.apache.qpid.protonj2.codec.CodecFactory; |
| import org.apache.qpid.protonj2.codec.Decoder; |
| import org.apache.qpid.protonj2.codec.DecoderState; |
| import org.apache.qpid.protonj2.codec.Encoder; |
| import org.apache.qpid.protonj2.codec.EncoderState; |
| import org.apache.qpid.protonj2.engine.util.StringUtils; |
| import org.apache.qpid.protonj2.types.Binary; |
| import org.apache.qpid.protonj2.types.Symbol; |
| import org.apache.qpid.protonj2.types.messaging.AmqpSequence; |
| import org.apache.qpid.protonj2.types.messaging.AmqpValue; |
| import org.apache.qpid.protonj2.types.messaging.ApplicationProperties; |
| import org.apache.qpid.protonj2.types.messaging.Data; |
| import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations; |
| import org.apache.qpid.protonj2.types.messaging.Footer; |
| import org.apache.qpid.protonj2.types.messaging.Header; |
| import org.apache.qpid.protonj2.types.messaging.MessageAnnotations; |
| import org.apache.qpid.protonj2.types.messaging.Properties; |
| import org.apache.qpid.protonj2.types.messaging.Section; |
| |
| /** |
| * Support methods dealing with Message types and encode or decode operations. |
| */ |
| public abstract class ClientMessageSupport { |
| |
| private static final Encoder DEFAULT_ENCODER = CodecFactory.getDefaultEncoder(); |
| private static final Decoder DEFAULT_DECODER = CodecFactory.getDefaultDecoder(); |
| |
| private static final ThreadLocal<EncoderState> THREAD_LOCAL_ENCODER_STATE = |
| ThreadLocal.withInitial(() -> DEFAULT_ENCODER.newEncoderState()); |
| private static final ThreadLocal<DecoderState> THREAD_LOCAL_DECODER_STATE = |
| ThreadLocal.withInitial(() -> DEFAULT_DECODER.newDecoderState()); |
| |
| //----- Message Conversion |
| |
| /** |
| * Converts a {@link Message} instance into a {@link ClientMessage} instance |
| * either by cast or by construction of a new instance with a copy of the |
| * values carried in the given message. |
| * |
| * @param <E> the body type of the given message. |
| * |
| * @param message |
| * The {@link Message} type to attempt to convert to a {@link ClientMessage} instance. |
| * |
| * @return a {@link ClientMessage} that represents the given {@link Message} instance. |
| * |
| * @throws ClientException if an unrecoverable error occurs during message conversion. |
| */ |
| public static <E> AdvancedMessage<E> convertMessage(Message<E> message) throws ClientException { |
| if (message instanceof AdvancedMessage) { |
| return (AdvancedMessage<E>) message; |
| } else { |
| try { |
| return message.toAdvancedMessage(); |
| } catch (UnsupportedOperationException uoe) { |
| return convertFromOutsideMessage(message); |
| } |
| } |
| } |
| |
| //----- Message Encoding |
| |
| public static ProtonBuffer encodeSection(Section<?> section, ProtonBuffer buffer) { |
| DEFAULT_ENCODER.writeObject(buffer, DEFAULT_ENCODER.newEncoderState(), section); |
| return buffer; |
| } |
| |
| //----- Message Encoding |
| |
| public static ProtonBuffer encodeMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException { |
| return encodeMessage(DEFAULT_ENCODER, THREAD_LOCAL_ENCODER_STATE.get(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations); |
| } |
| |
| public static ProtonBuffer encodeMessage(Encoder encoder, ProtonBufferAllocator allocator, AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException { |
| return encodeMessage(encoder, encoder.newEncoderState(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations); |
| } |
| |
| public static ProtonBuffer encodeMessage(Encoder encoder, EncoderState encoderState, ProtonBufferAllocator allocator, AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException { |
| ProtonBuffer buffer = allocator.allocate(); |
| |
| Header header = message.header(); |
| MessageAnnotations messageAnnotations = message.annotations(); |
| Properties properties = message.properties(); |
| ApplicationProperties applicationProperties = message.applicationProperties(); |
| Footer footer = message.footer(); |
| |
| if (header != null) { |
| encoder.writeObject(buffer, encoderState, header); |
| } |
| if (deliveryAnnotations != null) { |
| encoder.writeObject(buffer, encoderState, new DeliveryAnnotations(StringUtils.toSymbolKeyedMap(deliveryAnnotations))); |
| } |
| if (messageAnnotations != null) { |
| encoder.writeObject(buffer, encoderState, messageAnnotations); |
| } |
| if (properties != null) { |
| encoder.writeObject(buffer, encoderState, properties); |
| } |
| if (applicationProperties != null) { |
| encoder.writeObject(buffer, encoderState, applicationProperties); |
| } |
| |
| message.forEachBodySection(section -> encoder.writeObject(buffer, encoderState, section)); |
| |
| if (footer != null) { |
| encoder.writeObject(buffer, encoderState, footer); |
| } |
| |
| return buffer; |
| } |
| |
| //----- Message Decoding |
| |
| public static Message<?> decodeMessage(ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException { |
| return decodeMessage(DEFAULT_DECODER, THREAD_LOCAL_DECODER_STATE.get(), buffer, daConsumer); |
| } |
| |
| public static Message<?> decodeMessage(Decoder decoder, ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException { |
| return decodeMessage(decoder, decoder.newDecoderState(), buffer, daConsumer); |
| } |
| |
| public static Message<?> decodeMessage(Decoder decoder, DecoderState decoderState, |
| ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException { |
| |
| final ClientMessage<?> message = new ClientMessage<>(); |
| |
| Section<?> section = null; |
| |
| while (buffer.isReadable()) { |
| try { |
| section = (Section<?>) decoder.readObject(buffer, decoderState); |
| } catch (Exception e) { |
| throw ClientExceptionSupport.createNonFatalOrPassthrough(e); |
| } |
| |
| switch (section.getType()) { |
| case Header: |
| message.header((Header) section); |
| break; |
| case DeliveryAnnotations: |
| if (daConsumer != null) { |
| daConsumer.accept((DeliveryAnnotations) section); |
| } |
| break; |
| case MessageAnnotations: |
| message.annotations((MessageAnnotations) section); |
| break; |
| case Properties: |
| message.properties((Properties) section); |
| break; |
| case ApplicationProperties: |
| message.applicationProperties((ApplicationProperties) section); |
| break; |
| case Data: |
| case AmqpSequence: |
| case AmqpValue: |
| message.addBodySection(section); |
| break; |
| case Footer: |
| message.footer((Footer) section); |
| break; |
| default: |
| throw new ClientException("Unknown Message Section forced decode abort."); |
| } |
| } |
| |
| return message; |
| } |
| |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| public static <E> Section<E> createSectionFromValue(E body) { |
| if (body == null) { |
| return null; |
| } else if (body instanceof byte[]) { |
| return (Section<E>) new Data((byte[]) body); |
| } else if (body instanceof List){ |
| return new AmqpSequence((List) body); |
| } else { |
| return new AmqpValue(body); |
| } |
| } |
| |
| //----- Internal Implementation |
| |
| private static <E> ClientMessage<E> convertFromOutsideMessage(Message<E> source) throws ClientException { |
| Header header = new Header(); |
| header.setDurable(source.durable()); |
| header.setPriority(source.priority()); |
| header.setTimeToLive(source.timeToLive()); |
| header.setFirstAcquirer(source.firstAcquirer()); |
| header.setDeliveryCount(source.deliveryCount()); |
| |
| Properties properties = new Properties(); |
| properties.setMessageId(source.messageId()); |
| properties.setUserId(source.userId() != null ? new Binary(source.userId()) : null); |
| properties.setTo(source.to()); |
| properties.setSubject(source.subject()); |
| properties.setReplyTo(source.replyTo()); |
| properties.setCorrelationId(source.correlationId()); |
| properties.setContentType(source.contentType()); |
| properties.setContentEncoding(source.contentEncoding()); |
| properties.setAbsoluteExpiryTime(source.absoluteExpiryTime()); |
| properties.setCreationTime(source.creationTime()); |
| properties.setGroupId(source.groupId()); |
| properties.setGroupSequence(source.groupSequence()); |
| properties.setReplyToGroupId(source.replyToGroupId()); |
| |
| final MessageAnnotations messageAnnotations; |
| if (source.hasAnnotations()) { |
| messageAnnotations = new MessageAnnotations(new LinkedHashMap<>()); |
| |
| source.forEachAnnotation((key, value) -> { |
| messageAnnotations.getValue().put(Symbol.valueOf(key), value); |
| }); |
| } else { |
| messageAnnotations = null; |
| } |
| |
| final ApplicationProperties applicationProperties; |
| if (source.hasProperties()) { |
| applicationProperties = new ApplicationProperties(new LinkedHashMap<>()); |
| |
| source.forEachProperty((key, value) -> { |
| applicationProperties.getValue().put(key, value); |
| }); |
| } else { |
| applicationProperties = null; |
| } |
| |
| final Footer footer; |
| if (source.hasFooters()) { |
| footer = new Footer(new LinkedHashMap<>()); |
| |
| source.forEachFooter((key, value) -> { |
| footer.getValue().put(Symbol.valueOf(key), value); |
| }); |
| } else { |
| footer = null; |
| } |
| |
| ClientMessage<E> message = new ClientMessage<>(createSectionFromValue(source.body())); |
| |
| message.header(header); |
| message.properties(properties); |
| message.annotations(messageAnnotations); |
| message.applicationProperties(applicationProperties); |
| message.footer(footer); |
| |
| return message; |
| } |
| } |