| # Batch Message Sample |
| ------ |
| Sending messages in batch improves performance of delivering small messages. Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support. You can send messages up to 4MiB at a time, but if you need to send a larger message, it is recommended to divide the larger messages into multiple small messages of no more than 1MiB. |
| ### 1 Send Batch Messages |
| If you just send messages of no more than 4MiB at a time, it is easy to use batch: |
| ```java |
| String topic = "BatchTest"; |
| List<Message> messages = new ArrayList<>(); |
| messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); |
| messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); |
| messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); |
| try { |
| producer.send(messages); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| //handle the error |
| } |
| ``` |
| ### 2 Split into Lists |
| The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (4MiB). At this time, you’d better split the lists: |
| ```java |
| public class ListSplitter implements Iterator<List<Message>> { |
| private final int SIZE_LIMIT = 1024 * 1024 * 4; |
| private final List<Message> messages; |
| private int currIndex; |
| public ListSplitter(List<Message> messages) { |
| this.messages = messages; |
| } |
| @Override |
| public boolean hasNext() { |
| return currIndex < messages.size(); |
| } |
| @Override |
| public List<Message> next() { |
| int startIndex = getStartIndex(); |
| int nextIndex = startIndex; |
| int totalSize = 0; |
| for (; nextIndex < messages.size(); nextIndex++) { |
| Message message = messages.get(nextIndex); |
| int tmpSize = calcMessageSize(message); |
| if (tmpSize + totalSize > SIZE_LIMIT) { |
| break; |
| } else { |
| totalSize += tmpSize; |
| } |
| } |
| List<Message> subList = messages.subList(startIndex, nextIndex); |
| currIndex = nextIndex; |
| return subList; |
| } |
| private int getStartIndex() { |
| Message currMessage = messages.get(currIndex); |
| int tmpSize = calcMessageSize(currMessage); |
| while(tmpSize > SIZE_LIMIT) { |
| currIndex += 1; |
| Message message = messages.get(curIndex); |
| tmpSize = calcMessageSize(message); |
| } |
| return currIndex; |
| } |
| private int calcMessageSize(Message message) { |
| int tmpSize = message.getTopic().length() + message.getBody().length; |
| Map<String, String> properties = message.getProperties(); |
| for (Map.Entry<String, String> entry : properties.entrySet()) { |
| tmpSize += entry.getKey().length() + entry.getValue().length(); |
| } |
| tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes |
| return tmpSize; |
| } |
| } |
| |
| // then you could split the large list into small ones: |
| ListSplitter splitter = new ListSplitter(messages); |
| while (splitter.hasNext()) { |
| try { |
| List<Message> listItem = splitter.next(); |
| producer.send(listItem); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| // handle the error |
| } |
| } |
| ``` |