blob: fe905d302f2972caf858d162b06158833f521a4a [file] [log] [blame]
title: API - Atomic Write Multiple Records
layout: default
.. contents:: Basic Tutorial - Write Multi Records Atomic using Write Proxy Client
Write Multi Records Atomic using Write Proxy Client
This tutorial shows how to write multi records atomic using write proxy client.
.. sectnum::
Open a write proxy client
Create write proxy client builder
DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder();
Enable thrift mux
builder = builder.thriftmux(true);
Point the client to write proxy using finagle name
String finagleNameStr = "inet!";
builder = builder.finagleNameStr(finagleNameStr);
Build the write proxy client
DistributedLogClient client =;
Write Records
Create a RecordSet
Create a `RecordSet` for multiple records. The RecordSet has initial `16KB` buffer and its
compression codec is `NONE`.
LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(16 * 1024, Type.NONE);
Write multiple records
Write multiple records into the `RecordSet`.
for (String msg : messages) {
ByteBuffer msgBuf = ByteBuffer.wrap(msg.getBytes(UTF_8));
Promise<DLSN> writeFuture = new Promise<DLSN>();
recordSetWriter.writeRecord(msgBuf, writeFuture);
Write the RecordSet
Write the `RecordSet` to a stream.
String streamName = "basic-stream-8";
Future<DLSN> writeFuture = client.writeRecordSet(streamName, recordSetWriter);
Register the write callback
Register a future listener on write completion. The writer will be notified once the write is completed.
writeFuture.addEventListener(new FutureEventListener<DLSN>() {
public void onFailure(Throwable cause) {
// executed when write failed.
public void onSuccess(DLSN value) {
// executed when write completed.
Close the write proxy client
Close the write proxy client after usage.
Run the tutorial
Run the example in the following steps:
Start the local bookkeeper cluster
You can use follow command to start the distributedlog stack locally.
After the distributedlog cluster is started, you could access it using
distributedlog uri *distributedlog://*.
// dlog local ${zk-port}
./distributedlog-core/bin/dlog local 7000
Start the write proxy
Start the write proxy, listening on port 8000.
// DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file}
./distributedlog-service/bin/dlog com.twitter.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog:// -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf
Create the stream
Create the stream under the distributedlog uri.
// Create Stream `basic-stream-8`
// dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex}
./distributedlog-core/bin/dlog tool create -u distributedlog:// -r basic-stream- -e 8
Tail the stream
Tailing the stream using `TailReader` to wait for new records.
// Tailing Stream `basic-stream-8`
// runner run com.twitter.distributedlog.basic.TailReader ${distributedlog-uri} ${stream}
./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.TailReader distributedlog:// basic-stream-8
Write records
Run the example to write multiple records to the stream.
// Write Records into Stream `basic-stream-8`
// runner run com.twitter.distributedlog.basic.AtomicWriter ${distributedlog-uri} ${stream} ${message}[, ${message}]
./distributedlog-tutorials/distributedlog-basic/bin/runner run com.twitter.distributedlog.basic.AtomicWriter 'inet!' basic-stream-8 "message-1" "message-2" "message-3" "message-4" "message-5"
Check the results
Example output from `AtomicWriter` and `TailReader`.
// Output of `AtomicWriter`
May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[inet] = com.twitter.finagle.InetResolver(com.twitter.finagle.InetResolver@6c3e459e)
May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[fixedinet] = com.twitter.finagle.FixedInetResolver(com.twitter.finagle.FixedInetResolver@4d5698f)
May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[neg] = com.twitter.finagle.NegResolver$(com.twitter.finagle.NegResolver$@57052dc3)
May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[nil] = com.twitter.finagle.NilResolver$(com.twitter.finagle.NilResolver$@14ff89d7)
May 08, 2016 11:48:19 AM com.twitter.finagle.BaseResolver$$anonfun$resolvers$1 apply
INFO: Resolver[fail] = com.twitter.finagle.FailResolver$(com.twitter.finagle.FailResolver$@14b28d06)
May 08, 2016 11:48:19 AM com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp
Write 'message-1' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Write 'message-2' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=1}
Write 'message-3' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=2}
Write 'message-4' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=3}
Write 'message-5' as record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=4}
// Output of `TailReader`
Opening log stream basic-stream-8
Log stream basic-stream-8 is empty.
Wait for records starting from DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=1}
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=2}
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=3}
Received record DLSN{logSegmentSequenceNo=1, entryId=0, slotId=4}