| = Large Messages |
| :idprefix: |
| :idseparator: - |
| :docinfo: shared |
| |
| Apache ActiveMQ Artemis can be configured to give special treatment to messages which are beyond a configured size. |
| Instead of keeping the entire contents of these messages _in memory_ the broker will hold just a thin object on the queues with a reference to the content (e.g. in a file or a database table). |
| |
| This is supported on Core Protocol and on the AMQP Protocol. |
| |
| == Configuring the server |
| |
| When using the xref:persistence.adoc#file-journal-default[file journal] large messages are stored on disk on the server. |
| The configuration property `large-messages-directory` specifies where large messages are stored. |
| |
| [,xml] |
| ---- |
| <configuration...> |
| <core...> |
| ... |
| <large-messages-directory>data/large-messages</large-messages-directory> |
| ... |
| </core> |
| </configuration> |
| ---- |
| |
| By default `large-messages-directory` is `data/largemessages`. |
| |
| [NOTE] |
| ==== |
| For the best performance we recommend using the file journal with the large messages directory on a different physical volume to the message journal or paging directory. |
| ==== |
| |
| For xref:persistence.adoc#jdbc-persistence[JDBC persistence] the `large-message-table` should be configured. |
| |
| [,xml] |
| ---- |
| <configuration...> |
| <core...> |
| ... |
| <store> |
| <database-store> |
| ... |
| <large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name> |
| ... |
| </database-store> |
| </store> |
| ... |
| </core> |
| </configuration> |
| ---- |
| |
| By default `large-message-table` is `LARGE_MESSAGE_TABLE`. |
| |
| By default when writing the final bytes to a large message all writes are synchronized to the storage medium. |
| This can be configured via `large-message-sync`, e.g.: |
| |
| [,xml] |
| ---- |
| <configuration...> |
| <core...> |
| ... |
| <large-message-sync>true</large-message-sync> |
| ... |
| </core> |
| </configuration> |
| ---- |
| |
| By default `large-message-sync` is `true`. |
| |
| == Configuring the Core Client |
| |
| Any message larger than a certain size is considered a large message. |
| Large messages will be split up and sent in fragments. |
| This is determined by the URL parameter `minLargeMessageSize` |
| |
| [NOTE] |
| ==== |
| Apache ActiveMQ Artemis messages are encoded using 2 bytes per character so if the message data is filled with ASCII characters (which are 1 byte) the size of the resulting Apache ActiveMQ Artemis message would roughly double. |
| This is important when calculating the size of a "large" message as it may appear to be less than the `minLargeMessageSize` before it is sent, but it then turns into a "large" message once it is encoded. |
| ==== |
| |
| The default value is 100KiB. |
| |
| xref:configuring-transports.adoc#configuring-the-transport-directly-from-the-client[Configuring the transport directly from the client side] will provide more information on how to instantiate the core session factory or JMS connection factory. |
| |
| == Compressed Large Messages on Core Protocol |
| |
| You can choose to send large messages in compressed form using `compressLargeMessage` URL parameter. |
| |
| If you specify the boolean URL parameter `compressLargeMessage` as true, the system will use the ZIP algorithm to compress the message body as the message is transferred to the server's side. |
| Notice that there's no special treatment at the server's side, all the compressing and uncompressing is done at the client. |
| |
| This behavior can be tuned further by setting an optional parameter: `compressionLevel`. |
| This will decide how much the message body should be compressed. |
| `compressionLevel` accepts an integer of `-1` or a value between `0-9`. |
| The default value is `-1` which corresponds to around level 6-7. |
| |
| If the compressed size of a large message is below `minLargeMessageSize`, it is sent to server as regular messages. |
| This means that the message won't be written into the server's large-message data directory, thus reducing the disk I/O. |
| |
| NOTE: A higher `compressionLevel` means the message body will get further compressed, but this is at the cost of speed and computational overhead. |
| Make sure to tune this value according to its specific use-case. |
| |
| == Streaming large messages from Core Protocol |
| |
| Apache ActiveMQ Artemis supports setting the body of messages using input and output streams (`java.lang.io`) |
| |
| These streams are then used directly for sending (input streams) and receiving (output streams) messages. |
| |
| When receiving messages there are 2 ways to deal with the output stream; |
| you may choose to block while the output stream is recovered using the method `ClientMessage.saveOutputStream` or alternatively using the method `ClientMessage.setOutputstream` which will asynchronously write the message to the stream. |
| If you choose the latter the consumer must be kept alive until the message has been fully received. |
| |
| You can use any kind of stream you like. |
| The most common use case is to send files stored in your disk, but you could also send things like JDBC Blobs, `SocketInputStream`, things you recovered from `HTTPRequests` etc. |
| Anything as long as it implements `java.io.InputStream` for sending messages or `java.io.OutputStream` for receiving them. |
| |
| === Streaming over Core API |
| |
| The following table shows a list of methods available at `ClientMessage` which are also available through JMS by the use of object properties. |
| |
| |=== |
| | Name | Description | JMS Equivalent |
| |
| | setBodyInputStream(InputStream) |
| | Set the InputStream used to read a message body when sending it. |
| | JMS_AMQ_InputStream |
| |
| | setOutputStream(OutputStream) |
| | Set the OutputStream that will receive the body of a message. |
| This method does not block. |
| | JMS_AMQ_OutputStream |
| |
| | saveOutputStream(OutputStream) |
| | Save the body of the message to the `OutputStream`. |
| It will block until the entire content is transferred to the `OutputStream`. |
| | JMS_AMQ_SaveStream |
| |=== |
| |
| To set the output stream when receiving a core message: |
| |
| [,java] |
| ---- |
| ClientMessage msg = consumer.receive(...); |
| |
| // This will block here until the stream was transferred |
| msg.saveOutputStream(someOutputStream); |
| |
| ClientMessage msg2 = consumer.receive(...); |
| |
| // This will not wait the transfer to finish |
| msg2.setOutputStream(someOtherOutputStream); |
| ---- |
| |
| Set the input stream when sending a core message: |
| |
| [,java] |
| ---- |
| ClientMessage msg = session.createMessage(); |
| msg.setInputStream(dataInputStream); |
| ---- |
| |
| Notice also that for messages with more than 2GiB the getBodySize() will return invalid values since this is an integer (which is also exposed to the JMS API). |
| On those cases you can use the message property _AMQ_LARGE_SIZE. |
| |
| === Streaming over JMS |
| |
| When using JMS, Apache ActiveMQ Artemis maps the streaming methods on the core API (see ClientMessage API table above) by setting object properties . You can use the method `Message.setObjectProperty` to set the input and output streams. |
| |
| The `InputStream` can be defined through the JMS Object Property JMS_AMQ_InputStream on messages being sent: |
| |
| [,java] |
| ---- |
| BytesMessage message = session.createBytesMessage(); |
| |
| FileInputStream fileInputStream = new FileInputStream(fileInput); |
| |
| BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); |
| |
| message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput); |
| |
| someProducer.send(message); |
| ---- |
| |
| The `OutputStream` can be set through the JMS Object Property JMS_AMQ_SaveStream on messages being received in a blocking way. |
| |
| [,java] |
| ---- |
| BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000); |
| |
| File outputFile = new File("huge_message_received.dat"); |
| |
| FileOutputStream fileOutputStream = new FileOutputStream(outputFile); |
| |
| BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); |
| |
| // This will block until the entire content is saved on disk |
| messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput); |
| ---- |
| |
| Setting the `OutputStream` could also be done in a non blocking way using the property JMS_AMQ_OutputStream. |
| |
| [,java] |
| ---- |
| // This won't wait the stream to finish. You need to keep the consumer active. |
| messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput); |
| ---- |
| |
| [NOTE] |
| ==== |
| |
| |
| When using JMS, Streaming large messages are only supported on `StreamMessage` and `BytesMessage`. |
| ==== |
| |
| === Streaming Alternative on Core Protocol |
| |
| If you choose not to use the `InputStream` or `OutputStream` capability of Apache ActiveMQ Artemis You could still access the data directly in an alternative fashion. |
| |
| On the Core API just get the bytes of the body as you normally would. |
| |
| [,java] |
| ---- |
| ClientMessage msg = consumer.receive(); |
| |
| byte[] bytes = new byte[1024]; |
| for (int i = 0 ; i < msg.getBodySize(); i += bytes.length) |
| { |
| msg.getBody().readBytes(bytes); |
| // Whatever you want to do with the bytes |
| } |
| ---- |
| |
| If using JMS API, `BytesMessage` and `StreamMessage` also supports it transparently. |
| |
| [,java] |
| ---- |
| BytesMessage rm = (BytesMessage)cons.receive(10000); |
| |
| byte data[] = new byte[1024]; |
| |
| for (int i = 0; i < rm.getBodyLength(); i += 1024) |
| { |
| int numberOfBytes = rm.readBytes(data); |
| // Do whatever you want with the data |
| } |
| ---- |
| |
| == Configuring AMQP Acceptor |
| |
| You can configure the property `amqpMinLargeMessageSize` at the acceptor. |
| |
| The default value is 102400 (100KBytes). |
| |
| Setting it to -1 will disable large message support. |
| |
| WARNING: setting amqpMinLargeMessageSize to -1, your AMQP message might be stored as a Core Large Message if the size of the message does not fit into the journal. |
| This is the former semantic of the broker and it is kept this way for compatibility reasons. |
| |
| [,xml] |
| ---- |
| <acceptors> |
| <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> |
| <acceptor name="amqp">tcp://0.0.0.0:5672?; ..... amqpMinLargeMessageSize=102400; .... </acceptor> |
| </acceptors> |
| ---- |
| |
| == Large message example |
| |
| Please see the xref:examples.adoc#large-message[Large Message Example] which shows how large messages are configured and used with JMS. |