| --- |
| title: "Core Library" |
| layout: default |
| |
| # Sub-level navigation |
| sub-nav-group: user-guide |
| sub-nav-parent: apis |
| sub-nav-pos: 3 |
| sub-nav-title: Core Library API |
| |
| --- |
| |
| .. contents:: Core Library API |
| |
| Core Library API |
| ================ |
| |
| The distributedlog core library interacts with namespaces and logs directly. |
| It is written in Java. |
| |
| Namespace API |
| ------------- |
| |
| A DL namespace is a collection of *log streams*. Applications could *create* |
| or *delete* logs under a DL namespace. |
| |
| Namespace URI |
| ~~~~~~~~~~~~~ |
| |
| An **URI** is used to locate the *namespace*. The *Namespace URI* is typically |
| comprised of *3* components: |
| |
| * scheme: `distributedlog-<backend>`. The *backend* indicates what backend is used to store the log data. |
| * domain name: the domain name that used to talk to the *backend*. In the example as below, the domain name part is *zookeeper server*, which is used to store log metadata in bookkeeper based backend implementation. |
| * path: path points to the location that stores logs. In the example as below, it is a zookeeper path that points to the znode that stores all the logs metadata. |
| |
| :: |
| |
| distributedlog-bk://<zookeeper-server>/path/to/stream |
| |
| The available backend is only bookkeeper based backend. |
| The default `distributedlog` scheme is aliased to `distributedlog-bk`. |
| |
| Building a Namespace |
| ~~~~~~~~~~~~~~~~~~~~ |
| |
| Once you have the *namespace uri*, you could build the namespace instance. |
| The namespace instance will be used for operating streams under it. |
| |
| :: |
| |
| // DistributedLog Configuration |
| DistributedLogConfiguration conf = new DistributedLogConfiguration(); |
| // Namespace URI |
| URI uri = ...; // create the namespace uri |
| // create a builder to build namespace instances |
| DistributedLogNamespaceBuilder builder = DistributedLogNamespaceBuilder.newBuilder(); |
| DistributedLogNamespace namespace = builder |
| .conf(conf) // configuration that used by namespace |
| .uri(uri) // namespace uri |
| .statsLogger(...) // stats logger to log stats |
| .featureProvider(...) // feature provider on controlling features |
| .build(); |
| |
| Create a Log |
| ~~~~~~~~~~~~ |
| |
| Creating a log is pretty straight forward by calling `distributedlognamespace#createlog(logname)`. |
| it only creates the log under the namespace but doesn't return any handle for operating the log. |
| |
| :: |
| |
| DistributedLogNamespace namespace = ...; // namespace |
| try { |
| namespace.createLog("test-log"); |
| } catch (IOException ioe) { |
| // handling the exception on creating a log |
| } |
| |
| Open a Log |
| ~~~~~~~~~~ |
| |
| A `DistributedLogManager` handle will be returned when opening a log by `#openLog(logName)`. The |
| handle could be used for writing data to or reading data from the log. If the log doesn't exist |
| and `createStreamIfNotExists` is set to true in the configuration, the log will be created |
| automatically when writing first record. |
| |
| :: |
| |
| DistributedLogConfiguration conf = new DistributedLogConfiguration(); |
| conf.setCreateStreamIfNotExists(true); |
| DistributedLogNamespace namespace = DistributedLogNamespace.newBuilder() |
| .conf(conf) |
| ... |
| .build(); |
| DistributedLogManager logManager = namespace.openLog("test-log"); |
| // use the log manager to open writer to write data or open reader to read data |
| ... |
| |
| Sometimes, applications may open a log with different configuration settings. It could be done via |
| a overloaded `#openLog` method, as below: |
| |
| :: |
| |
| DistributedLogConfiguration conf = new DistributedLogConfiguration(); |
| // set the retention period hours to 24 hours. |
| conf.setRetentionPeriodHours(24); |
| URI uri = ...; |
| DistributedLogNamespace namespace = DistributedLogNamespace.newBuilder() |
| .conf(conf) |
| .uri(uri) |
| ... |
| .build(); |
| |
| // Per Log Configuration |
| DistributedLogConfigration logConf = new DistributedLogConfiguration(); |
| // set the retention period hours to 12 hours for a single stream |
| logConf.setRetentionPeriodHours(12); |
| |
| // open the log with overrided settings |
| DistributedLogManager logManager = namespace.openLog("test-log", |
| Optional.of(logConf), |
| Optiona.absent()); |
| |
| Delete a Log |
| ~~~~~~~~~~~~ |
| |
| `DistributedLogNamespace#deleteLog(logName)` will deletes the log from the namespace. Deleting a log |
| will attempt acquiring a lock before deletion. If a log is writing by an active writer, the lock |
| would be already acquired by the writer. so the deleting will fail. |
| |
| :: |
| |
| DistributedLogNamespace namespace = ...; |
| try { |
| namespace.deleteLog("test-log"); |
| } catch (IOException ioe) { |
| // handle the exceptions |
| } |
| |
| Log Existence |
| ~~~~~~~~~~~~~ |
| |
| Applications could check whether a log exists in a namespace by calling `DistributedLogNamespace#logExists(logName)`. |
| |
| :: |
| |
| DistributedLogNamespace namespace = ...; |
| if (namespace.logExists("test-log")) { |
| // actions when log exists |
| } else { |
| // actions when log doesn't exist |
| } |
| |
| Get List of Logs |
| ~~~~~~~~~~~~~~~~ |
| |
| Applications could list the logs under a namespace by calling `DistributedLogNamespace#getLogs()`. |
| |
| :: |
| |
| DistributedLogNamespace namespace = ...; |
| Iterator<String> logs = namespace.getLogs(); |
| while (logs.hasNext()) { |
| String logName = logs.next(); |
| // ... process the log |
| } |
| |
| Writer API |
| ---------- |
| |
| There are two ways to write records into a log stream, one is using 'synchronous' `LogWriter`, while the other one is using |
| asynchronous `AsyncLogWriter`. |
| |
| LogWriter |
| ~~~~~~~~~ |
| |
| The first thing to write data into a log stream is to construct the writer instance. Please note that the distributedlog core library enforce single-writer |
| semantic by deploying a zookeeper locking mechanism. If there is only an active writer, the subsequent calls to `#startLogSegmentNonPartitioned()` will |
| fail with `OwnershipAcquireFailedException`. |
| |
| :: |
| |
| DistributedLogNamespace namespace = ....; |
| DistributedLogManager dlm = namespace.openLog("test-log"); |
| LogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| |
| .. _Construct Log Record: |
| |
| Log records are constructed to represent the data written to a log stream. Each log record is associated with application defined transaction id. |
| The transaction id has to be non-decreasing otherwise writing the record will be rejected with `TransactionIdOutOfOrderException`. Application is allowed to |
| bypass the transaction id sanity checking by setting `maxIdSanityCheck` to false in configuration. System time and atomic numbers are good candicates used for |
| transaction id. |
| |
| :: |
| |
| long txid = 1L; |
| byte[] data = ...; |
| LogRecord record = new LogRecord(txid, data); |
| |
| Application could either add a single record (via `#write(LogRecord)`) or a bunch of records (via `#writeBulk(List<LogRecord>)`) into the log stream. |
| |
| :: |
| |
| writer.write(record); |
| // or |
| List<LogRecord> records = Lists.newArrayList(); |
| records.add(record); |
| writer.writeBulk(records); |
| |
| The write calls return immediately after the records are added into the output buffer of writer. So the data isn't guaranteed to be durable until writer |
| explicitly calls `#setReadyToFlush()` and `#flushAndSync()`. Those two calls will first transmit buffered data to backend, wait for transmit acknowledges |
| and commit the written data to make them visible to readers. |
| |
| :: |
| |
| // flush the records |
| writer.setReadyToFlush(); |
| // commit the records to make them visible to readers |
| writer.flushAndSync(); |
| |
| The DL log streams are endless streams unless they are sealed. 'endless' means that writers keep writing records to the log streams, readers could keep |
| tailing reading from the end of the streams and it never stops. Application could seal a log stream by calling `#markEndOfStream()`. |
| |
| :: |
| |
| // seal the log stream |
| writer.markEndOfStream(); |
| |
| |
| The complete example of writing records is showed as below. |
| |
| :: |
| |
| DistributedLogNamespace namespace = ....; |
| DistributedLogManager dlm = namespace.openLog("test-log"); |
| |
| LogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long txid = 1L; txid <= 100L; txid++) { |
| byte[] data = ...; |
| LogRecord record = new LogRecord(txid, data); |
| writer.write(record); |
| } |
| // flush the records |
| writer.setReadyToFlush(); |
| // commit the records to make them visible to readers |
| writer.flushAndSync(); |
| |
| // seal the log stream |
| writer.markEndOfStream(); |
| |
| AsyncLogWriter |
| ~~~~~~~~~~~~~~ |
| |
| Constructing an asynchronous `AsyncLogWriter` is as simple as synchronous `LogWriter`. |
| |
| :: |
| |
| DistributedLogNamespace namespace = ....; |
| DistributedLogManager dlm = namespace.openLog("test-log"); |
| AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| |
| All the writes to `AsyncLogWriter` are asynchronous. The futures representing write results are only satisfied when the data are persisted in the stream durably. |
| A DLSN (distributedlog sequence number) will be returned for each write, which is used to represent the position (aka offset) of the record in the log stream. |
| All the records adding in order are guaranteed to be persisted in order. |
| |
| .. _Async Write Records: |
| |
| :: |
| |
| List<Future<DLSN>> addFutures = Lists.newArrayList(); |
| for (long txid = 1L; txid <= 100L; txid++) { |
| byte[] data = ...; |
| LogRecord record = new LogRecord(txid, data); |
| addFutures.add(writer.write(record)); |
| } |
| List<DLSN> addResults = Await.result(Future.collect(addFutures)); |
| |
| The `AsyncLogWriter` also provides the method to truncate a stream to a given DLSN. This is super helpful for building replicated state machines, who need |
| explicit controls on when the data could be deleted. |
| |
| :: |
| |
| DLSN truncateDLSN = ...; |
| Future<DLSN> truncateFuture = writer.truncate(truncateDLSN); |
| // wait for truncation result |
| Await.result(truncateFuture); |
| |
| Reader API |
| ---------- |
| |
| Sequence Numbers |
| ~~~~~~~~~~~~~~~~ |
| |
| A log record is associated with sequence numbers. First of all, application can assign its own sequence number (called `TransactionID`) |
| to the log record while writing it (see `Construct Log Record`_). Secondly, a log record will be assigned with an unique system generated sequence number |
| `DLSN` (distributedlog sequence number) when it is written to a log (see `Async Write Records`_). Besides `DLSN` and `TransactionID`, |
| a monotonically increasing 64-bits `SequenceId` is assigned to the record at read time, indicating its position within the log. |
| |
| :Transaction ID: Transaction ID is a positive 64-bits long number that is assigned by the application. |
| Transaction ID is very helpful when application wants to organize the records and position the readers using their own sequencing method. A typical |
| use case of `Transaction ID` is `DistributedLog Write Proxy`. The write proxy assigns non-decreasing timestamps to log records, which the timestamps |
| could be used as `physical time` to implement `TTL` (Time To Live) feature in a strong consistent database. |
| :DLSN: DLSN (DistributedLog Sequence Number) is the sequence number generated during written time. |
| DLSN is comparable and could be used to figure out the order between records. A DLSN is comprised with 3 components. They are `Log Segment Sequence Number`, |
| `Entry Id` and `Slot Id`. The DLSN is usually used for comparison, positioning and truncation. |
| :Sequence ID: Sequence ID is introduced to address the drawback of `DLSN`, in favor of answering questions like `how many records written between two DLSNs`. |
| Sequence ID is a 64-bits monotonic increasing number starting from zero. The sequence ids are computed during reading, and only accessible by readers. |
| That means writers don't know the sequence ids of records at the point they wrote them. |
| |
| The readers could be positioned to start reading from any positions in the log, by using `DLSN` or `Transaction ID`. |
| |
| LogReader |
| ~~~~~~~~~ |
| |
| `LogReader` is a 'synchronous' sequential reader reading records from a log stream starting from a given position. The position could be |
| `DLSN` (via `#getInputStream(DLSN)`) or `Transaction ID` (via `#getInputStream(long)`). After the reader is open, it could call either |
| `#readNext(boolean)` or `#readBulk(boolean, int)` to read records out of the log stream sequentially. Closing the reader (via `#close()`) |
| will release all the resources occupied by this reader instance. |
| |
| Exceptions could be thrown during reading records due to various issues. Once the exception is thrown, the reader is set to an error state |
| and it isn't usable anymore. It is the application's responsibility to handle the exceptions and re-create readers if necessary. |
| |
| :: |
| |
| DistributedLogManager dlm = ...; |
| long nextTxId = ...; |
| LogReader reader = dlm.getInputStream(nextTxId); |
| |
| while (true) { // keep reading & processing records |
| LogRecord record; |
| try { |
| record = reader.readNext(false); |
| nextTxId = record.getTransactionId(); |
| // process the record |
| ... |
| } catch (IOException ioe) { |
| // handle the exception |
| ... |
| reader = dlm.getInputStream(nextTxId + 1); |
| } |
| } |
| |
| Reading records from an endless log stream in `synchronous` way isn't as trivial as in `asynchronous` way. Because it lacks of callback mechanism. |
| Instead, `LogReader` introduces a flag `nonBlocking` on controlling the waiting behavior on `synchronous` reads. Blocking (`nonBlocking = false`) |
| means the reads will wait for records before returning read calls, while NonBlocking (`nonBlocking = true`) means the reads will only check readahead |
| cache and return whatever records available in readahead cache. |
| |
| The `waiting` period varies in `blocking` mode. If the reader is catching up with writer (there are plenty of records in the log), the read call will |
| wait until records are read and returned. If the reader is already caught up with writer (there are no more records in the log at read time), the read |
| call will wait for a small period of time (defined in `DistributedLogConfiguration#getReadAheadWaitTime()`) and return whatever records available in |
| readahead cache. In other words, if a reader sees no record on blocking reads, it means the reader is `caught-up` with the writer. |
| |
| See examples below on how to read records using `LogReader`. |
| |
| :: |
| |
| // Read individual records |
| |
| LogReader reader = ...; |
| // keep reading records in blocking mode until no records available in the log |
| LogRecord record = reader.readNext(false); |
| while (null != record) { |
| // process the record |
| ... |
| // read next record |
| record = reader.readNext(false); |
| } |
| ... |
| |
| // reader is caught up with writer, doing non-blocking reads to tail the log |
| while (true) { |
| record = reader.readNext(true); |
| if (null == record) { |
| // no record available yet. backoff ? |
| ... |
| } else { |
| // process the new record |
| ... |
| } |
| } |
| |
| :: |
| |
| // Read records in batch |
| |
| LogReader reader = ...; |
| int N = 10; |
| |
| // keep reading N records in blocking mode until no records available in the log |
| List<LogRecord> records = reader.readBulk(false, N); |
| while (!records.isEmpty()) { |
| // process the list of records |
| ... |
| if (records.size() < N) { // no more records available in the log |
| break; |
| } |
| // read next N records |
| records = reader.readBulk(false, N); |
| } |
| |
| ... |
| |
| // reader is caught up with writer, doing non-blocking reads to tail the log |
| while (true) { |
| records = reader.readBulk(true, N); |
| // process the new records |
| ... |
| } |
| |
| |
| AsyncLogReader |
| ~~~~~~~~~~~~~~ |
| |
| Similar as `LogReader`, applications could open an `AsyncLogReader` by positioning to different positions, either `DLSN` or `Transaction ID`. |
| |
| :: |
| |
| DistributedLogManager dlm = ...; |
| |
| Future<AsyncLogReader> openFuture; |
| |
| // position the reader to transaction id `999` |
| openFuture = dlm.openAsyncLogReader(999L); |
| |
| // or position the reader to DLSN |
| DLSN fromDLSN = ...; |
| openFuture = dlm.openAsyncLogReader(fromDLSN); |
| |
| AsyncLogReader reader = Await.result(openFuture); |
| |
| Reading records from an `AsyncLogReader` is asynchronously. The future returned by `#readNext()`, `#readBulk(int)` or `#readBulk(int, long, TimeUnit)` |
| represents the result of the read operation. The future is only satisfied when there are records available. Application could chain the futures |
| to do sequential reads. |
| |
| Reading records one by one from an `AsyncLogReader`. |
| |
| :: |
| |
| void readOneRecord(AsyncLogReader reader) { |
| reader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { |
| public void onSuccess(LogRecordWithDLSN record) { |
| // process the record |
| ... |
| // read next |
| readOneRecord(reader); |
| } |
| public void onFailure(Throwable cause) { |
| // handle errors and re-create reader |
| ... |
| reader = ...; |
| // read next |
| readOneRecord(reader); |
| } |
| }); |
| } |
| |
| AsyncLogReader reader = ...; |
| readOneRecord(reader); |
| |
| Reading records in batches from an `AsyncLogReader`. |
| |
| :: |
| |
| void readBulk(AsyncLogReader reader, int N) { |
| reader.readBulk(N).addEventListener(new FutureEventListener<List<LogRecordWithDLSN>>() { |
| public void onSuccess(List<LogRecordWithDLSN> records) { |
| // process the records |
| ... |
| // read next |
| readBulk(reader, N); |
| } |
| public void onFailure(Throwable cause) { |
| // handle errors and re-create reader |
| ... |
| reader = ...; |
| // read next |
| readBulk(reader, N); |
| } |
| }); |
| } |
| |
| AsyncLogReader reader = ...; |
| readBulk(reader, N); |
| |