ISSUE #207: Support getFirstLogRecord
Descriptions of the changes in this PR:
- support getFirstLogRecord
Author: Sijie Guo <sijie@apache.org>
Reviewers: Jia Zhai <None>
This closes #208 from sijie/3_get_firstrecord_pr, closes #207
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index d5d1477..5ca76d3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -273,7 +273,8 @@
return Utils.ioResult(getLogSegmentsAsync());
}
- protected CompletableFuture<List<LogSegmentMetadata>> getLogSegmentsAsync() {
+ @Override
+ public CompletableFuture<List<LogSegmentMetadata>> getLogSegmentsAsync() {
final BKLogReadHandler readHandler = createReadHandler();
return readHandler.readLogSegmentsFromStore(
LogSegmentMetadata.COMPARATOR,
@@ -901,6 +902,16 @@
return getFirstRecordAsyncInternal().thenApply(RECORD_2_DLSN_FUNCTION);
}
+ @Override
+ public LogRecordWithDLSN getFirstLogRecord() throws IOException {
+ return Utils.ioResult(getFirstRecordAsyncInternal());
+ }
+
+ @Override
+ public CompletableFuture<LogRecordWithDLSN> getFirstLogRecordAsync() {
+ return getFirstRecordAsyncInternal();
+ }
+
private CompletableFuture<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
return processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<LogRecordWithDLSN>>() {
@Override
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
index c8907b5..0e4a2f0 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
@@ -83,8 +83,8 @@
/**
* TruncationStatus.
*/
- public enum TruncationStatus {
- ACTIVE (0), PARTIALLY_TRUNCATED(1), TRUNCATED (2);
+ public enum TruncationStatus {
+ UNKNOWN(-1), ACTIVE (0), PARTIALLY_TRUNCATED(1), TRUNCATED (2);
private final int value;
private TruncationStatus(int value) {
@@ -503,6 +503,19 @@
? startSequenceId : Long.MIN_VALUE + (getLogSegmentSequenceNumber() << 32L);
}
+ public TruncationStatus getTruncationStatus() {
+ switch ((int) (status & METADATA_TRUNCATION_STATUS_MASK)) {
+ case 0:
+ return TruncationStatus.ACTIVE;
+ case 1:
+ return TruncationStatus.PARTIALLY_TRUNCATED;
+ case 2:
+ return TruncationStatus.TRUNCATED;
+ default:
+ return TruncationStatus.UNKNOWN;
+ }
+ }
+
public boolean isTruncated() {
return ((status & METADATA_TRUNCATION_STATUS_MASK)
== TruncationStatus.TRUNCATED.value);
@@ -530,6 +543,14 @@
return minActiveDLSN;
}
+ public long getMinActiveEntryId() {
+ return minActiveDLSN.getEntryId();
+ }
+
+ public long getMinActiveSlotId() {
+ return minActiveDLSN.getSlotId();
+ }
+
public DLSN getFirstDLSN() {
return new DLSN(getLogSegmentSequenceNumber(), 0, 0);
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
index fb9c6be..db4b65e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
@@ -71,6 +71,14 @@
*/
List<LogSegmentMetadata> getLogSegments() throws IOException;
+
+ /**
+ * Get the log segments asynchronously.
+ *
+ * @return the log segments
+ */
+ CompletableFuture<List<LogSegmentMetadata>> getLogSegmentsAsync();
+
/**
* Register <i>listener</i> on log segment updates of this stream.
*
@@ -296,6 +304,22 @@
CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync();
/**
+ * Get the first log record in the stream.
+ *
+ * @return the first log record in the stream
+ * @throws IOException if a stream cannot be found.
+ */
+ LogRecordWithDLSN getFirstLogRecord()
+ throws IOException;
+
+ /**
+ * Get first log record with DLSN in the log - async.
+ *
+ * @return latest log record with DLSN
+ */
+ CompletableFuture<LogRecordWithDLSN> getFirstLogRecordAsync();
+
+ /**
* Get the earliest Transaction Id available in the log.
*
* @return earliest transaction id