At-least-once Processing

Applications typically choose between at-least-once and exactly-once processing semantics. At-least-once processing guarantees that the application will process all the log records, however when the application resumes after failure, previously processed records may be re-processed if they have not been acknowledged. With at least once processing guarantees the application can store reader positions in an external store and update it periodically. Upon restart the application will reprocess messages since the last updated reader position.

This tutorial shows how to do at-least-once processing by using a offset-store to track the reading positions.

How to track reading positions

Applications typically choose an external storage (e.g key/value storage) or another log stream to store their read positions. In this example, we used a local key/value store - LevelDB to store the read positions.

  • Open the offset store.

        String offsetStoreFile = ...;
        Options options = new Options();
        options.createIfMissing(true);
        DB offsetDB = factory.open(new File(offsetStoreFile), options);
    
  • Read the reader read position from the offset store.

        byte[] offset = offsetDB.get(readerId.getBytes(UTF_8));
        DLSN dlsn;
        if (null == offset) {
            dlsn = DLSN.InitialDLSN;
        } else {
            dlsn = DLSN.deserializeBytes(offset);
        }
    
  • Start read from the read position that recorded in offset store.

        final AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(dlsn));
    
  • Track the last read position while reading using AtomicReference.

        final AtomicReference<DLSN> lastReadDLSN = new AtomicReference<DLSN>(null);
        reader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
            ...
    
            @Override
            public void onSuccess(LogRecordWithDLSN record) {
                lastReadDLSN.set(record.getDlsn()); 
                // process the record
                ...
                // read next record
                reader.readNext().addEventListener(this);
            }
        });
    
  • Periodically update the last read position to the offset store.

        final ScheduledExecutorService executorService =
                Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if (null != lastDLSN.get()) {
                    offsetDB.put(readerId.getBytes(UTF_8), lastDLSN.get().serializeBytes());
                }
            }
        }, 10, 10, TimeUnit.SECONDS);
    

Check distributedlog-tutorials/distributedlog-messaging/src/main/java/com/twitter/distributedlog/messaging/ReaderWithOffsets for more details.