| # 批量消息发送 |
| 批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。 |
| |
| ### 1 发送批量消息 |
| 如果你一次只发送不超过 4MiB 的消息,使用批处理很容易: |
| ```java |
| String topic = "BatchTest"; |
| List<Message> messages = new ArrayList<>(); |
| messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); |
| messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); |
| messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); |
| try { |
| producer.send(messages); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| //handle the error |
| } |
| ``` |
| ### 2 拆分 |
| 当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息: |
| |
| ```java |
| public class ListSplitter implements Iterator<List<Message>> { |
| private final int SIZE_LIMIT = 1024 * 1024 * 4; |
| private final List<Message> messages; |
| private int currIndex; |
| public ListSplitter(List<Message> messages) { |
| this.messages = messages; |
| } |
| @Override |
| public boolean hasNext() { |
| return currIndex < messages.size(); |
| } |
| @Override |
| public List<Message> next() { |
| int startIndex = getStartIndex(); |
| int nextIndex = startIndex; |
| int totalSize = 0; |
| for (; nextIndex < messages.size(); nextIndex++) { |
| Message message = messages.get(nextIndex); |
| int tmpSize = calcMessageSize(message); |
| if (tmpSize + totalSize > SIZE_LIMIT) { |
| break; |
| } else { |
| totalSize += tmpSize; |
| } |
| } |
| List<Message> subList = messages.subList(startIndex, nextIndex); |
| currIndex = nextIndex; |
| return subList; |
| } |
| private int getStartIndex() { |
| Message currMessage = messages.get(currIndex); |
| int tmpSize = calcMessageSize(currMessage); |
| while(tmpSize > SIZE_LIMIT) { |
| currIndex += 1; |
| Message message = messages.get(curIndex); |
| tmpSize = calcMessageSize(message); |
| } |
| return currIndex; |
| } |
| private int calcMessageSize(Message message) { |
| int tmpSize = message.getTopic().length() + message.getBody().length(); |
| Map<String, String> properties = message.getProperties(); |
| for (Map.Entry<String, String> entry : properties.entrySet()) { |
| tmpSize += entry.getKey().length() + entry.getValue().length(); |
| } |
| tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes |
| return tmpSize; |
| } |
| } |
| |
| // then you could split the large list into small ones: |
| ListSplitter splitter = new ListSplitter(messages); |
| while (splitter.hasNext()) { |
| try { |
| List<Message> listItem = splitter.next(); |
| producer.send(listItem); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| // handle the error |
| } |
| } |
| ``` |