| @startuml |
| |
| start |
| :messageList = transformMessageToConsume; |
| :ret = MessageListener(messageList); |
| if (ret.ConsumeStatus == 'CONSUME_SUCCESS'?) then (yes) |
| :ackIndex = ret.AckIndex; |
| else (no) |
| :ackIndex = -1; |
| endif |
| :successMessages = msgs[:ackIndex + 1]; |
| : i = ackIndex+1; |
| while (i < msgs.length?) |
| :sendBackResult = sendMessageBack(msgs[i]); |
| if(sendBackResult?) then (yes) |
| :successMessages.append(msgs[i]); |
| else |
| :failedMessages.append(msgs[i]); |
| endif |
| endwhile |
| :commitOffset = removeSuccessMessage(successMessages); |
| :submitConsumeRequest(failedMessages); |
| :updateOffsetInLocalMemory(commitOffset); |
| stop |
| @enduml |
| |