blob: 7629c4df9dc15baf22ee5700a46c85f12633c609 [file] [log] [blame]
---
title: Tutorials - At-least-once Processing
top-nav-group: messaging
top-nav-pos: 3
top-nav-title: At-least-once Processing
layout: default
---
.. contents:: Messaging Tutorial - At-least-once Processing
At-least-once Processing
========================
Applications typically choose between `at-least-once` and `exactly-once` processing semantics.
`At-least-once` processing guarantees that the application will process all the log records,
however when the application resumes after failure, previously processed records may be re-processed
if they have not been acknowledged. With at least once processing guarantees the application can store
reader positions in an external store and update it periodically. Upon restart the application will
reprocess messages since the last updated reader position.
This tutorial shows how to do `at-least-once` processing by using a `offset-store` to track the reading positions.
.. sectnum::
How to track reading positions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Applications typically choose an external storage (e.g key/value storage) or another log stream to store their
read positions. In this example, we used a local key/value store - `LevelDB` to store the read positions.
Open the offset store
---------------------
::
String offsetStoreFile = ...;
Options options = new Options();
options.createIfMissing(true);
DB offsetDB = factory.open(new File(offsetStoreFile), options);
Read the reader read position
-----------------------------
Read the reader read position from the offset store.
::
byte[] offset = offsetDB.get(readerId.getBytes(UTF_8));
DLSN dlsn;
if (null == offset) {
dlsn = DLSN.InitialDLSN;
} else {
dlsn = DLSN.deserializeBytes(offset);
}
Read records
------------
Start read from the read position that recorded in offset store.
::
final AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(dlsn));
Track read position
-------------------
Track the last read position while reading using `AtomicReference`.
::
final AtomicReference<DLSN> lastReadDLSN = new AtomicReference<DLSN>(null);
reader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
...
@Override
public void onSuccess(LogRecordWithDLSN record) {
lastReadDLSN.set(record.getDlsn());
// process the record
...
// read next record
reader.readNext().addEventListener(this);
}
});
Record read position
--------------------
Periodically update the last read position to the offset store.
::
final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (null != lastDLSN.get()) {
offsetDB.put(readerId.getBytes(UTF_8), lastDLSN.get().serializeBytes());
}
}
}, 10, 10, TimeUnit.SECONDS);
Check `distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets`_ for more details.
.. _distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets: https://github.com/apache/incubator-distributedlog/blob/master/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java