Enable CheckStyle Plugin in managed-ledger module (#13619)
Motivation
Enable CheckStyle Plugin in managed-ledger module.
Documentation
- [x] `no-need-doc`
diff --git a/docker/pulsar/scripts/gen-yml-from-env.py b/docker/pulsar/scripts/gen-yml-from-env.py
index 8aee68b..f2e1031 100755
--- a/docker/pulsar/scripts/gen-yml-from-env.py
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -104,6 +104,6 @@
conf.pop('processContainerFactory', None)
# Store back the updated config in the same file
- f = open(conf_filename , 'w')
+ f = open(conf_filename, 'w')
yaml.dump(conf, f, default_flow_style=False)
f.close()
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 663c73d..aa57e16 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -144,6 +144,19 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>checkstyle</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 7205b77..e5fd0dd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -18,11 +18,10 @@
*/
package org.apache.bookkeeper.mledger;
+import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-
-import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index ead9db7..194bfe4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -77,12 +77,12 @@
CompletableFuture<OffloadResult> getOffloadResultAsync();
/**
- * Manually close current offloading segment
+ * Manually close current offloading segment.
* @return true if the segment is not already closed
*/
boolean close();
- default CompletableFuture<Boolean> AsyncClose() {
+ default CompletableFuture<Boolean> asyncClose() {
return CompletableFuture.completedFuture(close());
}
}
@@ -158,7 +158,8 @@
* ensuring that subsequent calls will not attempt to offload the same ledger
* again.
*
- * @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been successful.
+ * @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been
+ * successful.
*/
default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uid, long beginLedger,
long beginEntry,
@@ -201,20 +202,19 @@
throw new UnsupportedOperationException();
}
- default CompletableFuture<Void> deleteOffloaded(UUID uid,
- Map<String, String> offloadDriverMetadata) {
+ default CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> offloadDriverMetadata) {
throw new UnsupportedOperationException();
}
/**
- * Get offload policies of this LedgerOffloader
+ * Get offload policies of this LedgerOffloader.
*
* @return offload policies
*/
OffloadPoliciesImpl getOffloadPolicies();
/**
- * Close the resources if necessary
+ * Close the resources if necessary.
*/
void close();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1fb90a..6930db1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -20,11 +20,9 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Range;
-
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -71,7 +69,7 @@
long getLastActive();
/**
- * Update the last active time of the cursor
+ * Update the last active time of the cursor.
*
*/
void updateLastActive();
@@ -476,7 +474,7 @@
* opaque context
*/
void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries,
- final SkipEntriesCallback callback, Object ctx);
+ SkipEntriesCallback callback, Object ctx);
/**
* Find the newest entry that matches the given predicate. Will only search among active entries
@@ -501,7 +499,8 @@
* @throws InterruptedException
* @throws ManagedLedgerException
*/
- Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
+ Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition)
+ throws InterruptedException, ManagedLedgerException;
/**
* Find the newest entry that matches the given predicate.
@@ -526,7 +525,7 @@
* @throws InterruptedException
* @throws ManagedLedgerException
*/
- void resetCursor(final Position position) throws InterruptedException, ManagedLedgerException;
+ void resetCursor(Position position) throws InterruptedException, ManagedLedgerException;
/**
* reset the cursor to specified position to enable replay of messages.
@@ -536,7 +535,7 @@
* @param callback
* callback object
*/
- void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback);
+ void asyncResetCursor(Position position, AsyncCallbacks.ResetCursorCallback callback);
/**
* Read the specified set of positions from ManagedLedger.
@@ -657,7 +656,7 @@
int getNonContiguousDeletedMessagesRangeSerializedSize();
/**
- * Returns the estimated size of the unacknowledged backlog for this cursor
+ * Returns the estimated size of the unacknowledged backlog for this cursor.
*
* @return the estimated size from the mark delete position of the cursor
*/
@@ -677,20 +676,20 @@
void setThrottleMarkDelete(double throttleMarkDelete);
/**
- * Get {@link ManagedLedger} attached with cursor
+ * Get {@link ManagedLedger} attached with cursor.
*
* @return ManagedLedger
*/
ManagedLedger getManagedLedger();
/**
- * Get last individual deleted range
+ * Get last individual deleted range.
* @return range
*/
Range<PositionImpl> getLastIndividualDeletedRange();
/**
- * Trim delete entries for the given entries
+ * Trim delete entries for the given entries.
*/
void trimDeletedEntries(List<Entry> entries);
@@ -715,5 +714,5 @@
* Checks if the cursor is closed.
* @return whether this cursor is closed.
*/
- public boolean isClosed();
+ boolean isClosed();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 74ab2ce..0e58d1e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -129,7 +129,8 @@
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
*/
- Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException;
+ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
+ ManagedLedgerException;
/**
* Append a new entry asynchronously.
@@ -165,7 +166,8 @@
* @param ctx
* opaque context
*/
- void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback, Object ctx);
+ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback,
+ Object ctx);
/**
@@ -222,7 +224,8 @@
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
- ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;
+ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException,
+ ManagedLedgerException;
/**
* Open a ManagedCursor in this ManagedLedger.
@@ -240,7 +243,8 @@
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
- ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties) throws InterruptedException, ManagedLedgerException;
+ ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
+ throws InterruptedException, ManagedLedgerException;
/**
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
@@ -331,10 +335,11 @@
* @param ctx
* opaque context
*/
- void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, OpenCursorCallback callback, Object ctx);
+ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
+ OpenCursorCallback callback, Object ctx);
/**
- * Get a list of all the cursors reading from this ManagedLedger
+ * Get a list of all the cursors reading from this ManagedLedger.
*
* @return a list of cursors
*/
@@ -397,7 +402,7 @@
CompletableFuture<Long> getEarliestMessagePublishTimeInBacklog();
/**
- * Return the size of all ledgers offloaded to 2nd tier storage
+ * Return the size of all ledgers offloaded to 2nd tier storage.
*/
long getOffloadedSize();
@@ -535,7 +540,7 @@
Position getLastConfirmedEntry();
/**
- * Signaling managed ledger that we can resume after BK write failure
+ * Signaling managed ledger that we can resume after BK write failure.
*/
void readyToCreateNewLedger();
@@ -564,7 +569,7 @@
* @param callback a callback which will be supplied with the newest properties in managedLedger.
* @param ctx a context object which will be passed to the callback on completion.
**/
- void asyncSetProperty(String key, String value, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+ void asyncSetProperty(String key, String value, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
/**
* Delete the property by key.
@@ -582,7 +587,7 @@
* @param callback a callback which will be supplied with the newest properties in managedLedger.
* @param ctx a context object which will be passed to the callback on completion.
*/
- void asyncDeleteProperty(String key, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+ void asyncDeleteProperty(String key, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
/**
* Update managed-ledger's properties.
@@ -600,17 +605,17 @@
* @param callback a callback which will be supplied with the newest properties in managedLedger.
* @param ctx a context object which will be passed to the callback on completion.
*/
- void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.UpdatePropertiesCallback callback,
+ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePropertiesCallback callback,
Object ctx);
/**
- * Trim consumed ledgers in background
+ * Trim consumed ledgers in background.
* @param promise
*/
void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
/**
- * Roll current ledger if it is full
+ * Roll current ledger if it is full.
*/
@Deprecated
void rollCurrentLedgerIfFull();
@@ -638,7 +643,7 @@
CompletableFuture<Void> asyncTruncate();
/**
- * Get managed ledger internal stats
+ * Get managed ledger internal stats.
*
* @param includeLedgerMetadata the flag to control managed ledger internal stats include ledger metadata
* @return the future of managed ledger internal stats
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 2f4e098..e18e337 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -19,20 +19,16 @@
package org.apache.bookkeeper.mledger;
import static com.google.common.base.Preconditions.checkArgument;
-
import com.google.common.base.Charsets;
import java.time.Clock;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
-
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
@@ -73,7 +69,7 @@
private boolean unackedRangesOpenCacheSetEnabled = true;
private Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName;
private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
- private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+ private LedgerOffloader ledgerOffloader = NullLedgerOffloader.instance_;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;
@@ -391,7 +387,7 @@
* <p>
* A retention time of 0 (default) will make data to be deleted immediately.
* <p>
- * A retention time of -1 , means to have an unlimited retention time.
+ * A retention time of -1, means to have an unlimited retention time.
*
* @param retentionTime
* duration for which messages should be retained
@@ -421,7 +417,7 @@
* <p>
* A retention size of 0 (default) will make data to be deleted immediately.
* <p>
- * A retention size of -1 , means to have an unlimited retention size.
+ * A retention size of -1, means to have an unlimited retention size.
*
* @param retentionSizeInMB
* quota for message retention
@@ -510,7 +506,7 @@
}
/**
- * Get clock to use to time operations
+ * Get clock to use to time operations.
*
* @return a clock
*/
@@ -519,7 +515,7 @@
}
/**
- * Set clock to use for time operations
+ * Set clock to use for time operations.
*
* @param clock the clock to use
*/
@@ -530,7 +526,7 @@
/**
*
- * Ledger-Op (Create/Delete) timeout
+ * Ledger-Op (Create/Delete) timeout.
*
* @return
*/
@@ -539,7 +535,7 @@
}
/**
- * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure
+ * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure.
*
* @param metadataOperationsTimeoutSeconds
*/
@@ -549,7 +545,7 @@
}
/**
- * Ledger read-entry timeout
+ * Ledger read-entry timeout.
*
* @return
*/
@@ -558,7 +554,7 @@
}
/**
- * Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting
+ * Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting.
* readTimeoutSeconds <= 0)
*
* @param readEntryTimeoutSeconds
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 682ce90..1b6d80d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -91,7 +91,7 @@
Supplier<Boolean> mlOwnershipChecker, Object ctx);
/**
- * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger
+ * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
*
* @param managedLedgerName
* @param startPosition
@@ -103,7 +103,7 @@
throws InterruptedException, ManagedLedgerException;
/**
- * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger
+ * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
*
* @param managedLedgerName
* @param startPosition
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
index 202ecda..28454d6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
@@ -18,10 +18,9 @@
*/
package org.apache.bookkeeper.mledger;
-import java.util.List;
-
import com.google.common.base.Predicate;
import com.google.common.collect.Range;
+import java.util.List;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -106,7 +105,8 @@
* @throws InterruptedException
* @throws ManagedLedgerException
*/
- Position findNewestMatching(ManagedCursor.FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
+ Position findNewestMatching(ManagedCursor.FindPositionConstraint constraint, Predicate<Entry> condition)
+ throws InterruptedException, ManagedLedgerException;
/**
* Return the number of messages that this cursor still has to read.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
index df70188..4c26a93 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
@@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.reverseOrder;
-
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index 515cf2f..1a6986a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -21,17 +21,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
-
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
-
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -301,8 +297,7 @@
try {
// We got the entries, we need to transform them to a List<> type
long totalSize = 0;
- final List<EntryImpl> entriesToReturn
- = Lists.newArrayListWithExpectedSize(entriesToRead);
+ final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = EntryCacheManager.create(e, interceptor);
@@ -319,7 +314,7 @@
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
if (exception instanceof BKException
- && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
+ && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
} else {
ml.invalidateLedgerHandle(lh);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 9a1d631..ab71ea5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -20,7 +20,6 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
@@ -242,7 +241,8 @@
ml.getMBean().addReadEntriesSample(1, returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
} else {
- callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx);
+ callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
+ ctx);
}
} finally {
ledgerEntries.close();
@@ -273,8 +273,8 @@
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
- ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(),ledgerEntry.getEntryId(),
- ledgerEntry.getLength(),processorHandle.getProcessedPayload());
+ ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
+ ledgerEntry.getLength(), processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index e25b52a..3050309 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -20,18 +20,17 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;
-
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
-public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
+public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>,
+ ReferenceCounted {
private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
@Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 5f85b6e..8049220 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -19,29 +19,26 @@
package org.apache.bookkeeper.mledger.impl;
import com.google.common.collect.ImmutableMap;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
/**
* Utilities for managing BookKeeper Ledgers custom metadata.
*/
public final class LedgerMetadataUtils {
private static final String METADATA_PROPERTY_APPLICATION = "application";
- private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR
- = "pulsar".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = "pulsar".getBytes(StandardCharsets.UTF_8);
private static final String METADATA_PROPERTY_COMPONENT = "component";
- private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER
- = "managed-ledger".getBytes(StandardCharsets.UTF_8);
- private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER
- = "compacted-ledger".getBytes(StandardCharsets.UTF_8);
- private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA
- = "schema".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
+ "managed-ledger".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
+ "compacted-ledger".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = "schema".getBytes(StandardCharsets.UTF_8);
private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger";
private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
@@ -80,7 +77,8 @@
* @param compactedToMessageId last mesasgeId.
* @return an immutable map which describes the compacted ledger
*/
- public static Map<String, byte[]> buildMetadataForCompactedLedger(String compactedTopic, byte[] compactedToMessageId) {
+ public static Map<String, byte[]> buildMetadataForCompactedLedger(String compactedTopic,
+ byte[] compactedToMessageId) {
return ImmutableMap.of(
METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER,
@@ -90,7 +88,7 @@
}
/**
- * Build additional metadata for a Schema
+ * Build additional metadata for a Schema.
*
* @param schemaId id of the schema
* @return an immutable map which describes the schema
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index c631cdc..65d2541 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -199,7 +199,7 @@
}
/**
- * Check whether there are any cursors
+ * Check whether there are any cursors.
* @return true is there are no cursors and false if there are
*/
public boolean isEmpty() {
@@ -219,7 +219,7 @@
}
/**
- * Check whether that are any durable cursors
+ * Check whether that are any durable cursors.
* @return true if there are durable cursors and false if there are not
*/
public boolean hasDurableCursors() {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5c1455b..70f0efe 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -26,7 +26,6 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
@@ -38,9 +37,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
-
import io.netty.util.concurrent.FastThreadLocal;
-
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -62,7 +59,6 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -80,13 +76,13 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
-import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
@@ -125,8 +121,9 @@
// keeps sample of last read-position for validation and monitoring if read-position is not moving forward.
protected volatile PositionImpl statsLastReadPosition;
- protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, MarkDeleteEntry.class, "lastMarkDeleteEntry");
+ protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry>
+ LAST_MARK_DELETE_ENTRY_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
+ MarkDeleteEntry.class, "lastMarkDeleteEntry");
protected volatile MarkDeleteEntry lastMarkDeleteEntry;
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
@@ -183,7 +180,7 @@
private boolean alwaysInactive = false;
- /** used temporary variables to {@link #getNumIndividualDeletedEntriesToSkip(long)} **/
+ /** used temporary variables to {@link #getNumIndividualDeletedEntriesToSkip(long)}. **/
private static final FastThreadLocal<Long> tempTotalEntriesToSkip = new FastThreadLocal<>();
private static final FastThreadLocal<Long> tempDeletedMessages = new FastThreadLocal<>();
private static final FastThreadLocal<PositionImpl> tempStartPosition = new FastThreadLocal<>();
@@ -499,7 +496,8 @@
}
}
- private void recoverBatchDeletedIndexes (List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
+ private void recoverBatchDeletedIndexes (
+ List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
lock.writeLock().lock();
try {
this.batchDeletedIndexes.clear();
@@ -594,7 +592,7 @@
counter.countDown();
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
counter.await();
@@ -726,7 +724,7 @@
counter.countDown();
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
counter.await();
@@ -942,7 +940,8 @@
}
@Override
- public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException {
+ public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition)
+ throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedLedgerException exception = null;
@@ -1031,9 +1030,9 @@
}
protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
- if (position.equals(PositionImpl.earliest)) {
+ if (position.equals(PositionImpl.EARLIEST)) {
position = ledger.getFirstPosition();
- } else if (position.equals(PositionImpl.latest)) {
+ } else if (position.equals(PositionImpl.LATEST)) {
position = ledger.getLastPosition().getNext();
}
@@ -1070,9 +1069,8 @@
Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
}
markDeletePosition = newMarkDeletePosition;
- lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ?
- getProperties() : Collections.emptyMap(),
- null, null);
+ lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
+ ? getProperties() : Collections.emptyMap(), null, null);
individualDeletedMessages.clear();
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
@@ -1121,7 +1119,8 @@
};
- internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
+ internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
+ new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
finalCallback.operationComplete();
@@ -1143,15 +1142,15 @@
ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
PositionImpl actualPosition = newPosition;
- if (!ledger.isValidPosition(actualPosition) &&
- !actualPosition.equals(PositionImpl.earliest) &&
- !actualPosition.equals(PositionImpl.latest)) {
+ if (!ledger.isValidPosition(actualPosition)
+ && !actualPosition.equals(PositionImpl.EARLIEST)
+ && !actualPosition.equals(PositionImpl.LATEST)) {
actualPosition = ledger.getNextValidPosition(actualPosition);
if (actualPosition == null) {
// next valid position would only return null when newPos
// is larger than all available positions, then it's latest in effect.
- actualPosition = PositionImpl.latest;
+ actualPosition = PositionImpl.LATEST;
}
}
@@ -1631,8 +1630,8 @@
// read position forward
PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
if (log.isDebugEnabled()) {
- log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
- currentReadPosition, newReadPosition, markDeletePosition);
+ log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
+ ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
}
return newReadPosition;
} else {
@@ -1683,7 +1682,7 @@
batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
newPosition = ledger.getPreviousPosition(newPosition);
}
- Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, newPosition);
+ Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
} else if (newPosition.ackSet != null) {
@@ -1707,9 +1706,8 @@
ledger.getName(), markDeletePosition, newPosition);
} else {
if (log.isDebugEnabled()) {
- log.debug(
- "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
- ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
+ log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
+ + " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
@@ -1800,7 +1798,9 @@
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
- Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true);
+ Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
+ false, PositionImpl.get(mdEntry.newPosition.getLedgerId(),
+ mdEntry.newPosition.getEntryId()), true);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
}
@@ -1935,8 +1935,8 @@
if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
if (log.isDebugEnabled()) {
log.debug(
- "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
- ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
+ "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} "
+ + "for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
}
callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
@@ -1962,11 +1962,11 @@
bitSetRecyclable.recycle();
}
}
- // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
- // the RangeSet recognize the "continuity" between adjacent Positions
+ // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
+ // make the RangeSet recognize the "continuity" between adjacent Positions.
PositionImpl previousPosition = ledger.getPreviousPosition(position);
- individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
- position.getLedgerId(), position.getEntryId());
+ individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
+ previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
@@ -1974,13 +1974,15 @@
individualDeletedMessages);
}
} else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
- BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));
+ BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) ->
+ BitSetRecyclable.create().resetWords(position.ackSet));
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
bitSet.and(givenBitSet);
givenBitSet.recycle();
if (bitSet.isEmpty()) {
PositionImpl previousPosition = ledger.getPreviousPosition(position);
- individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
+ individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
+ previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
@@ -2082,8 +2084,8 @@
log.debug("[{}] [{}] Filtering entries {} - alreadyDeleted: {}", ledger.getName(), name, entriesRange,
individualDeletedMessages);
}
- if (individualDeletedMessages.isEmpty() || individualDeletedMessages.span() == null ||
- !entriesRange.isConnected(individualDeletedMessages.span())) {
+ if (individualDeletedMessages.isEmpty() || individualDeletedMessages.span() == null
+ || !entriesRange.isConnected(individualDeletedMessages.span())) {
// There are no individually deleted messages in this entry list, no need to perform filtering
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] No filtering needed for entries {}", ledger.getName(), name, entriesRange);
@@ -2330,8 +2332,8 @@
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
log.debug(
- "[{}] Failed to refresh cursor metadata-version for {} due to {}",
- ledger.name, name, e.getMessage());
+ "[{}] Failed to refresh cursor metadata-version for {} due "
+ + "to {}", ledger.name, name, e.getMessage());
}
}
});
@@ -2548,13 +2550,14 @@
private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
- if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
+ if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null
+ || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
- MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo
- .newBuilder();
+ MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats
+ .BatchedEntryDeletionIndexInfo.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
@@ -2624,8 +2627,8 @@
public void operationComplete(Void result, Stat stat) {
if (log.isDebugEnabled()) {
log.debug(
- "[{}][{}] Updated cursor in meta store after previous failure in ledger at position {}",
- ledger.getName(), name, position);
+ "[{}][{}] Updated cursor in meta store after previous failure in ledger at position"
+ + " {}", ledger.getName(), name, position);
}
mbean.persistToZookeeper(true);
callback.operationComplete();
@@ -2645,8 +2648,8 @@
boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
- if (ledger.factory.isMetadataServiceAvailable() &&
- (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
+ if (ledger.factory.isMetadataServiceAvailable()
+ && (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
// It's safe to modify the timestamp since this method will be only called from a callback, implying that
@@ -2896,7 +2899,8 @@
public boolean isMessageDeleted(Position position) {
checkArgument(position instanceof PositionImpl);
return individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(),
- ((PositionImpl) position).getEntryId()) || ((PositionImpl) position).compareTo(markDeletePosition) <= 0 ;
+ ((PositionImpl) position).getEntryId())
+ || ((PositionImpl) position).compareTo(markDeletePosition) <= 0;
}
//this method will return a copy of the position's ack set
@@ -2925,7 +2929,8 @@
* @return next available position
*/
public PositionImpl getNextAvailablePosition(PositionImpl position) {
- Range<PositionImpl> range = individualDeletedMessages.rangeContaining(position.getLedgerId(), position.getEntryId());
+ Range<PositionImpl> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
+ position.getEntryId());
if (range != null) {
PositionImpl nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext();
@@ -3052,7 +3057,7 @@
return 1;
}
- int maxEntriesBasedOnSize = (int)(maxSizeBytes / avgEntrySize);
+ int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
if (maxEntriesBasedOnSize < 1) {
// We need to read at least one entry
return 1;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index e176181..49728ec 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -111,7 +111,7 @@
private volatile boolean closed;
/**
- * Keep a flag to indicate whether we're currently connected to the metadata service
+ * Keep a flag to indicate whether we're currently connected to the metadata service.
*/
@Getter
private boolean metadataServiceAvailable;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fc42fe2..04bfd71 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -197,15 +197,15 @@
private ScheduledFuture<?> checkLedgerRollTask;
/**
- * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
- * version, we cannot have multiple concurrent updates.
+ * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store.
+ * Since we use the store version, we cannot have multiple concurrent updates.
*/
private final CallbackMutex metadataMutex = new CallbackMutex();
private final CallbackMutex trimmerMutex = new CallbackMutex();
private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
- .completedFuture(PositionImpl.latest);
+ .completedFuture(PositionImpl.LATEST);
private volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
@@ -251,8 +251,8 @@
startIncluded, startExcluded
}
- private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater
- .newUpdater(ManagedLedgerImpl.class, State.class, "state");
+ private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
private final OrderedScheduler scheduledExecutor;
@@ -270,17 +270,18 @@
// last read-operation's callback to check read-timeout on it.
private volatile ReadEntryCallbackWrapper lastReadCallback = null;
- private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
+ private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper>
+ LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
/**
- * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
+ * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is.
* created asynchronously and hence there is no ready ledger to write into.
*/
final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>();
/**
- * This variable is used for testing the tests
+ * This variable is used for testing the tests.
* {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
*/
@VisibleForTesting
@@ -551,7 +552,7 @@
// Lazily recover cursors by put them to uninitializedCursors map.
for (final String cursorName : consumers) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Recovering cursor {} lazily" , name, cursorName);
+ log.debug("[{}] Recovering cursor {} lazily", name, cursorName);
}
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName);
@@ -561,8 +562,8 @@
cursor.recover(new VoidCallback() {
@Override
public void operationComplete() {
- log.info("[{}] Lazy recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
- cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
+ log.info("[{}] Lazy recovery for cursor {} completed. pos={} -- todo={}", name,
+ cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
synchronized (ManagedLedgerImpl.this) {
cursors.add(cursor);
@@ -643,7 +644,8 @@
}
@Override
- public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException {
+ public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
+ ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
// Result list will contain the status exception and the resulting
// position
@@ -690,8 +692,8 @@
}
@Override
- public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, int length, final AddEntryCallback callback,
- final Object ctx) {
+ public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, int length,
+ final AddEntryCallback callback, final Object ctx) {
ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length);
asyncAddEntry(buffer, numberOfMessages, callback, ctx);
}
@@ -757,9 +759,8 @@
if (State.CreatingLedger == state) {
long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) {
- log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" +
- " and creation timeout task didn't kick in as well. Force to fail the create ledger operation.",
- name, elapsedMs);
+ log.info("[{}] Ledger creation was initiated {} ms ago but it never completed and creation timeout"
+ + " task didn't kick in as well. Force to fail the create ledger operation.", name, elapsedMs);
this.createComplete(Code.TimeoutException, null, null);
}
}
@@ -894,7 +895,8 @@
}
if (uninitializedCursors.containsKey(cursorName)) {
- uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx)).exceptionally(ex -> {
+ uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx))
+ .exceptionally(ex -> {
callback.openCursorFailed((ManagedLedgerException) ex, ctx);
return null;
});
@@ -1032,13 +1034,14 @@
}
@Override
- public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException {
+ public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName)
+ throws ManagedLedgerException {
return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false);
}
@Override
- public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition,
- boolean isReadCompacted)
+ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
+ InitialPosition initialPosition, boolean isReadCompacted)
throws ManagedLedgerException {
Objects.requireNonNull(cursorName, "cursor name can't be null");
checkManagedLedgerIsOpen();
@@ -1469,8 +1472,8 @@
metadataMutex.unlock();
updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
- mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
- TimeUnit.MILLISECONDS);
+ mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
+ - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
}
// May need to update the cursor position
@@ -1501,7 +1504,8 @@
// Return ManagedLedgerFencedException to addFailed callback
// to indicate that the ledger is now fenced and topic needs to be closed
clearPendingAddEntries(new ManagedLedgerFencedException(e));
- // Do not need to unlock ledgersListMutex here because we are going to close to topic anyways
+ // Do not need to unlock ledgersListMutex here because we are going to close to topic
+ // anyways
return;
}
}
@@ -1550,7 +1554,8 @@
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
- existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
+ existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data,
+ existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
@@ -1628,7 +1633,7 @@
}
synchronized void createLedgerAfterClosed() {
- if(isNeededCreateNewLedgerAfterCloseLedger()) {
+ if (isNeededCreateNewLedgerAfterCloseLedger()) {
log.info("[{}] Creating a new ledger", name);
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
@@ -1654,9 +1659,8 @@
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object o) {
- checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",
- currentLedger.getId(),
- lh.getId());
+ checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with "
+ + "acked ledgerId %s", currentLedger.getId(), lh.getId());
if (rc == BKException.Code.OK) {
log.debug("Successfully closed ledger {}", lh.getId());
@@ -1692,7 +1696,8 @@
future.complete(null);
return;
}
- log.info("[{}] Unable to find position for predicate {}. Use the first position {} instead.", name, predicate, startPosition);
+ log.info("[{}] Unable to find position for predicate {}. Use the first position {} instead.", name,
+ predicate, startPosition);
} else {
finalPosition = getNextValidPosition((PositionImpl) position);
}
@@ -1700,7 +1705,8 @@
}
@Override
- public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
+ Object ctx) {
log.warn("[{}] Unable to find position for predicate {}.", name, predicate);
future.complete(null);
}
@@ -1750,7 +1756,8 @@
}
// Get a ledger handle to read from
- getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex -> {
+ getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex
+ -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
ex.getMessage());
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
@@ -1869,7 +1876,8 @@
if (position.getLedgerId() == currentLedger.getId()) {
asyncReadEntry(currentLedger, position, callback, ctx);
} else if (ledgers.containsKey(position.getLedgerId())) {
- getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback, ctx)).exceptionally(ex -> {
+ getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback,
+ ctx)).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
@@ -1978,8 +1986,8 @@
long ledgerId;
long entryId;
volatile long readOpCount = -1;
- private static final AtomicLongFieldUpdater<ReadEntryCallbackWrapper> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
- .newUpdater(ReadEntryCallbackWrapper.class, "readOpCount");
+ private static final AtomicLongFieldUpdater<ReadEntryCallbackWrapper> READ_OP_COUNT_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ReadEntryCallbackWrapper.class, "readOpCount");
volatile long createdTime = -1;
volatile Object cntx;
@@ -2190,8 +2198,9 @@
PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
if (null == ledgerId) {
- opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
- "least key greater than or equal to the given key, or null if there is no such key"), null);
+ opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key"
+ + ") method is used to return the least key greater than or equal to the given key, "
+ + "or null if there is no such key"), null);
}
if (ledgerId != position.getLedgerId()) {
@@ -2238,8 +2247,8 @@
if (currPointedLedger != null) {
if (nextPointedLedger != null) {
- if (lastAckedPosition.getEntryId() != -1 &&
- lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
+ if (lastAckedPosition.getEntryId() != -1
+ && lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
lastAckedPosition = new PositionImpl(nextPointedLedger.getLedgerId(), -1);
}
} else {
@@ -2263,7 +2272,7 @@
}
private void trimConsumedLedgersInBackground() {
- trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ trimConsumedLedgersInBackground(Futures.nullPromise_);
}
@Override
@@ -2276,12 +2285,13 @@
}
private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
- scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS);
+ scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100,
+ TimeUnit.MILLISECONDS);
}
private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getLedgerOffloader() != null
- && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+ && config.getLedgerOffloader() != NullLedgerOffloader.instance_
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
@@ -2305,10 +2315,12 @@
});
if (config.getLedgerOffloader() != null
- && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+ && config.getLedgerOffloader() != NullLedgerOffloader.instance_
&& config.getLedgerOffloader().getOffloadPolicies() != null
- && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null) {
- long threshold = config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes();
+ && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
+ != null) {
+ long threshold = config.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes();
long sizeSummed = 0;
long alreadyOffloadedSize = 0;
@@ -2341,7 +2353,7 @@
name, sizeSummed, alreadyOffloadedSize, threshold);
}
- offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
+ offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
}
}
}
@@ -2389,7 +2401,7 @@
List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
- && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+ && config.getLedgerOffloader() != NullLedgerOffloader.instance_
? config.getLedgerOffloader().getOffloadPolicies()
: null);
synchronized (this) {
@@ -2502,8 +2514,8 @@
for (LedgerInfo ls : ledgersToDelete) {
if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
// this info is relevant because the lastMessageId won't be available anymore
- log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name,
- ls.getLedgerId(), currentLastConfirmedEntry);
+ log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+ + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
}
invalidateReadHandle(ls.getLedgerId());
@@ -2564,11 +2576,12 @@
/**
* Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
- * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark delete position
- * happens to be the last entry in a ledger. If the ledger is deleted, then subsequent calculations for backlog
- * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to fetch
- * the ledger info of a deleted ledger. Thus, we need to update the mark delete position to the "-1" entry of the first ledger
- * that is not marked for deletion.
+ * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark
+ * delete position happens to be the last entry in a ledger. If the ledger is deleted, then subsequent
+ * calculations for backlog
+ * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to
+ * fetch the ledger info of a deleted ledger. Thus, we need to update the mark delete position to the "-1" entry
+ * of the first ledger that is not marked for deletion.
* This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
* entries and the stats are reported correctly.
*/
@@ -2578,16 +2591,19 @@
}
// need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
- // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results
+ // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
+ // incorrect results
long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
cursors.forEach(cursor -> {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
- // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
+ // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be
+ // larger than the last add confirmed
if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
- && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
- && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
+ && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger()
+ .getLastConfirmedEntry()) <= 0 && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl
+ && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
cursor.asyncMarkDelete(highestPositionToDelete, cursor.getProperties(), new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
@@ -2718,7 +2734,8 @@
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
- scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
+ scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)),
+ DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted ledger {}", name, ledgerId);
@@ -2816,14 +2833,15 @@
@Override
public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
PositionImpl requestOffloadTo = (PositionImpl) pos;
- if (!isValidPosition(requestOffloadTo) &&
+ if (!isValidPosition(requestOffloadTo)
// Also consider the case where the last ledger is currently
// empty. In this the passed position is not technically
// "valid", per the above check, given that it's not written
// yes, but it will be valid for the logic here
- !(requestOffloadTo.getLedgerId() == currentLedger.getId()
+ && !(requestOffloadTo.getLedgerId() == currentLedger.getId()
&& requestOffloadTo.getEntryId() == 0)) {
- log.warn("[{}] Cannot start offload at position {} - LastConfirmedEntry: {}", name, pos, lastConfirmedEntry);
+ log.warn("[{}] Cannot start offload at position {} - LastConfirmedEntry: {}", name, pos,
+ lastConfirmedEntry);
callback.offloadFailed(new InvalidCursorPositionException("Invalid position for offload: " + pos), ctx);
return;
}
@@ -2919,7 +2937,7 @@
.thenCompose((ignore) -> {
return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
TimeUnit.SECONDS.toHours(1)).limit(10),
- FAIL_ON_CONFLICT,
+ failOnConflict_,
() -> completeLedgerInfoForOffloaded(ledgerId, uuid),
scheduledExecutor, name)
.whenComplete((ignore2, exception) -> {
@@ -2936,7 +2954,8 @@
.whenComplete((ignore, exception) -> {
if (exception != null) {
lastOffloadFailureTimestamp = System.currentTimeMillis();
- log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name, ledgerId, lastOffloadFailureTimestamp, exception);
+ log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name,
+ ledgerId, lastOffloadFailureTimestamp, exception);
PositionImpl newFirstUnoffloaded = PositionImpl.get(ledgerId, 0);
if (newFirstUnoffloaded.compareTo(firstUnoffloaded) > 0) {
newFirstUnoffloaded = firstUnoffloaded;
@@ -2954,7 +2973,8 @@
errorToReport);
} else {
lastOffloadSuccessTimestamp = System.currentTimeMillis();
- log.info("[{}] offload for ledgerId {} timestamp {} succeed", name, ledgerId, lastOffloadSuccessTimestamp);
+ log.info("[{}] offload for ledgerId {} timestamp {} succeed", name, ledgerId,
+ lastOffloadSuccessTimestamp);
lastOffloadLedgerId = ledgerId;
invalidateReadHandle(ledgerId);
offloadLoop(promise, ledgersToOffload, firstUnoffloaded, firstError);
@@ -2967,7 +2987,7 @@
LedgerInfo transform(LedgerInfo oldInfo) throws ManagedLedgerException;
}
- static Predicate<Throwable> FAIL_ON_CONFLICT = (throwable) -> {
+ static Predicate<Throwable> failOnConflict_ = (throwable) -> {
return !(throwable instanceof OffloadConflict) && Retries.NonFatalPredicate.test(throwable);
};
@@ -3343,7 +3363,8 @@
} catch (NullPointerException e) {
next = lastConfirmedEntry.getNext();
if (log.isDebugEnabled()) {
- log.debug("[{}] Can't find next valid position : {}, fall back to the next position of the last position : {}.", position, name, next, e);
+ log.debug("[{}] Can't find next valid position : {}, fall back to the next position of the last "
+ + "position : {}.", position, name, next, e);
}
}
return next;
@@ -3405,7 +3426,7 @@
}
/**
- * Get the first position written in the managed ledger, alongside with the associated counter
+ * Get the first position written in the managed ledger, alongside with the associated counter.
*/
Pair<PositionImpl, Long> getFirstPositionAndCounter() {
PositionImpl pos;
@@ -3690,8 +3711,8 @@
log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds",
name, finalMetadata, config.getMetadataOperationsTimeoutSeconds());
try {
- bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
- digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+ bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
+ config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
} catch (Throwable cause) {
log.error("[{}] Encountered unexpected error when creating ledger",
name, cause);
@@ -3778,7 +3799,8 @@
}
ReadEntryCallbackWrapper callback = this.lastReadCallback;
long readOpCount = callback != null ? callback.readOpCount : 0;
- boolean timeout = callback != null && (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec);
+ boolean timeout = callback != null && (TimeUnit.NANOSECONDS
+ .toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec);
if (readOpCount > 0 && timeout) {
log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, this.lastReadCallback.ledgerId,
this.lastReadCallback.entryId, timeoutSec);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
index 9f1563b..aca8e4e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
@@ -20,7 +20,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -52,7 +51,8 @@
* whether the managed ledger metadata should be created if it doesn't exist already
* @throws MetaStoreException
*/
- void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback<ManagedLedgerInfo> callback);
+ void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
+ MetaStoreCallback<ManagedLedgerInfo> callback);
/**
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 5ad62b2..ac2e746 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -19,19 +19,16 @@
package org.apache.bookkeeper.mledger.impl;
import com.google.protobuf.InvalidProtocolBufferException;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
import java.util.concurrent.RejectedExecutionException;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
@@ -140,9 +137,11 @@
String path = PREFIX + ledgerName;
store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion()))
- .thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion), executor.chooseThread(ledgerName))
+ .thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion),
+ executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+ .operationFailed(getException(ex))));
return null;
});
}
@@ -155,9 +154,11 @@
String path = PREFIX + ledgerName;
store.getChildren(path)
- .thenAcceptAsync(cursors -> callback.operationComplete(cursors, null), executor.chooseThread(ledgerName))
+ .thenAcceptAsync(cursors -> callback.operationComplete(cursors, null), executor
+ .chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+ .operationFailed(getException(ex))));
return null;
});
}
@@ -184,7 +185,8 @@
}
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+ .operationFailed(getException(ex))));
return null;
});
}
@@ -213,9 +215,11 @@
}
store.put(path, content, Optional.of(expectedVersion))
- .thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor.chooseThread(ledgerName))
+ .thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
+ .chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+ .operationFailed(getException(ex))));
return null;
});
}
@@ -233,7 +237,8 @@
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+ .operationFailed(getException(ex))));
return null;
});
}
@@ -251,7 +256,8 @@
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+ .operationFailed(getException(ex))));
return null;
});
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 1d545bd..436d397 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -36,7 +36,8 @@
private final boolean readCompacted;
NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
- PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) {
+ PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition,
+ boolean isReadCompacted) {
super(bookkeeper, config, ledger, cursorName);
this.readCompacted = isReadCompacted;
@@ -53,7 +54,7 @@
initializeCursorPosition(ledger.getFirstPositionAndCounter());
break;
}
- } else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) {
+ } else if (startCursorPosition.getLedgerId() == PositionImpl.EARLIEST.getLedgerId()) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
} else {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index b9a4df5..d9da6f5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -21,7 +21,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -30,7 +29,7 @@
* Null implementation that throws an error on any invokation.
*/
public class NullLedgerOffloader implements LedgerOffloader {
- public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader();
+ public static NullLedgerOffloader instance_ = new NullLedgerOffloader();
@Override
public String getOffloadDriverName() {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
index cca1676..8716d5a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
@@ -40,8 +40,8 @@
public final long beginLedgerId;
public final long beginEntryId;
public final String driverName;
- volatile private long endLedgerId;
- volatile private long endEntryId;
+ private volatile long endLedgerId;
+ private volatile long endEntryId;
volatile boolean closed = false;
public final Map<String, String> driverMetadata;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index e390621..226ad04 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -18,10 +18,14 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static com.google.common.base.Preconditions.checkArgument;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -33,11 +37,6 @@
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.SafeRunnable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import static com.google.common.base.Preconditions.checkArgument;
/**
* Handles the life-cycle of an addEntry() operation.
@@ -66,8 +65,8 @@
private int dataLength;
private ManagedLedgerInterceptor.PayloadProcessorHandle payloadProcessorHandle = null;
- private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater
- .newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
+ private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
volatile State state;
enum State {
@@ -77,7 +76,8 @@
CLOSED
}
- public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+ public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback,
+ Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
@@ -85,7 +85,8 @@
return op;
}
- public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
+ public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages,
+ AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
@@ -94,7 +95,8 @@
return op;
}
- private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+ private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data,
+ AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
@@ -129,7 +131,8 @@
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
if (ml.getManagedLedgerInterceptor() != null) {
- payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, duplicateBuffer);
+ payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this,
+ duplicateBuffer);
if (payloadProcessorHandle != null) {
duplicateBuffer = payloadProcessorHandle.getProcessedPayload();
}
@@ -156,7 +159,8 @@
public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) {
if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) {
- log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId);
+ log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(),
+ entryId);
OpAddEntry.this.recycle();
return;
}
@@ -268,7 +272,7 @@
}
/**
- * Checks if add-operation is completed
+ * Checks if add-operation is completed.
*
* @return true if task is not already completed else returns false.
*/
@@ -370,12 +374,12 @@
public String toString() {
ManagedLedgerImpl ml = this.ml;
LedgerHandle ledger = this.ledger;
- return "OpAddEntry{" +
- "mlName=" + ml != null ? ml.getName() : "null" +
- ", ledgerId=" + ledger != null ? String.valueOf(ledger.getId()) : "null" +
- ", entryId=" + entryId +
- ", startTime=" + startTime +
- ", dataLength=" + dataLength +
- '}';
+ return "OpAddEntry{"
+ + "mlName=" + ml != null ? ml.getName() : "null"
+ + ", ledgerId=" + ledger != null ? String.valueOf(ledger.getId()) : "null"
+ + ", entryId=" + entryId
+ + ", startTime=" + startTime
+ + ", dataLength=" + dataLength
+ + '}';
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 91567fc..812c535 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -19,12 +19,10 @@
package org.apache.bookkeeper.mledger.impl;
import com.google.common.base.Predicate;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
-
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
@@ -100,7 +98,8 @@
searchPosition = ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded);
if (lastPosition.compareTo(searchPosition) < 0) {
if (log.isDebugEnabled()) {
- log.debug("first position {} matches, last should be {}, but moving to lastPos {}", position, searchPosition, lastPosition);
+ log.debug("first position {} matches, last should be {}, but moving to lastPos {}", position,
+ searchPosition, lastPosition);
}
searchPosition = lastPosition;
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 91a6e26..210ad31 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -54,7 +54,7 @@
op.callback = callback;
op.entries = Lists.newArrayList();
if (maxPosition == null) {
- maxPosition = PositionImpl.latest;
+ maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
op.ctx = ctx;
@@ -134,8 +134,8 @@
}
void checkReadCompletion() {
- if (entries.size() < count && cursor.hasMoreEntries() &&
- ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
+ if (entries.size() < count && cursor.hasMoreEntries()
+ && ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index 92baf42..2672c23 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -31,8 +31,8 @@
protected long entryId;
protected long[] ackSet;
- public static final PositionImpl earliest = new PositionImpl(-1, -1);
- public static final PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
+ public static final PositionImpl EARLIEST = new PositionImpl(-1, -1);
+ public static final PositionImpl LATEST = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
public PositionImpl(PositionInfo pi) {
this.ledgerId = pi.getLedgerId();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java
index ad95520..d2114ac 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java
@@ -18,10 +18,9 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import org.apache.bookkeeper.mledger.Position;
-
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import org.apache.bookkeeper.mledger.Position;
public class PositionImplRecyclable extends PositionImpl implements Position {
@@ -35,7 +34,7 @@
};
private PositionImplRecyclable(Handle<PositionImplRecyclable> recyclerHandle) {
- super(PositionImpl.earliest);
+ super(PositionImpl.EARLIEST);
this.recyclerHandle = recyclerHandle;
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 7a0445e..8c20e0d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -19,9 +19,7 @@
package org.apache.bookkeeper.mledger.impl;
import com.google.common.collect.Range;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -36,7 +34,7 @@
PositionImpl startPosition, String cursorName) {
super(bookkeeper, config, ledger, cursorName);
- if (startPosition.equals(PositionImpl.earliest)) {
+ if (startPosition.equals(PositionImpl.EARLIEST)) {
readPosition = ledger.getFirstPosition().getNext();
} else {
readPosition = startPosition;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 13cff9f..214c3af 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -18,10 +18,9 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import com.google.common.collect.Range;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-
-import com.google.common.collect.Range;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -121,7 +120,7 @@
private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
if (ledgers.isEmpty()) {
- lastConfirmedEntry = PositionImpl.earliest;
+ lastConfirmedEntry = PositionImpl.EARLIEST;
} else if (ledgers.lastEntry().getValue().getEntries() > 0) {
// Last ledger has some of the entries
lastConfirmedEntry = new PositionImpl(ledgers.lastKey(), ledgers.lastEntry().getValue().getEntries() - 1);
@@ -132,7 +131,7 @@
LedgerInfo li = ledgers.headMap(lastLedgerId, false).lastEntry().getValue();
lastConfirmedEntry = new PositionImpl(li.getLedgerId(), li.getEntries() - 1);
} else {
- lastConfirmedEntry = PositionImpl.earliest;
+ lastConfirmedEntry = PositionImpl.EARLIEST;
}
}
@@ -144,7 +143,8 @@
this.getLedgerHandle(position.getLedgerId())
.thenAccept((ledger) -> asyncReadEntry(ledger, position, callback, ctx))
.exceptionally((ex) -> {
- log.error("[{}] Error opening ledger for reading at position {} - {}", this.name, position, ex.getMessage());
+ log.error("[{}] Error opening ledger for reading at position {} - {}", this.name, position,
+ ex.getMessage());
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
});
@@ -152,7 +152,7 @@
@Override
public long getNumberOfEntries() {
- return getNumberOfEntries(Range.openClosed(PositionImpl.earliest, getLastPosition()));
+ return getNumberOfEntries(Range.openClosed(PositionImpl.EARLIEST, getLastPosition()));
}
@Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
index 13fc530..cb5ba0c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
@@ -19,14 +19,13 @@
package org.apache.bookkeeper.mledger.intercept;
import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
/**
* Interceptor for ManagedLedger.
* */
@@ -61,17 +60,17 @@
void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);
/**
- * A reference handle to the payload processor
+ * A reference handle to the payload processor.
*/
interface PayloadProcessorHandle {
/**
- * To obtain the processed data
+ * To obtain the processed data.
* @return processed data
*/
ByteBuf getProcessedPayload();
/**
- * To release resources used in processor, if any
+ * To release resources used in processor, if any.
*/
void release();
}
@@ -85,12 +84,13 @@
}
/**
- * Intercept before payload gets written to ledger
+ * Intercept before payload gets written to ledger.
* @param ledgerWriteOp OpAddEntry used to trigger ledger write.
* @param dataToBeStoredInLedger data to be stored in ledger
* @return handle to the processor
*/
- default PayloadProcessorHandle processPayloadBeforeLedgerWrite(OpAddEntry ledgerWriteOp, ByteBuf dataToBeStoredInLedger){
+ default PayloadProcessorHandle processPayloadBeforeLedgerWrite(OpAddEntry ledgerWriteOp,
+ ByteBuf dataToBeStoredInLedger){
return null;
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/package-info.java
new file mode 100644
index 0000000..58ef67b
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.intercept;
\ No newline at end of file
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
index f4064e7..4e019bd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -19,14 +19,12 @@
package org.apache.bookkeeper.mledger.offload;
import com.google.common.collect.Maps;
-
+import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-
-import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
@@ -109,7 +107,8 @@
.setAckQuorumSize(metadata.getAckQuorumSize())
.setEnsembleSize(metadata.getEnsembleSize())
.setLength(metadata.getLength())
- .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : DataFormats.LedgerMetadataFormat.State.OPEN)
+ .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED :
+ DataFormats.LedgerMetadataFormat.State.OPEN)
.setLastEntryId(metadata.getLastEntryId())
.setCtime(metadata.getCtime())
.setDigestType(BookKeeper.DigestType.toProtoDigestType(
@@ -130,7 +129,8 @@
}
public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) throws IOException {
- DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
+ DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder()
+ .mergeFrom(bytes).build();
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
.withLastEntryId(ledgerMetadataFormat.getLastEntryId())
.withPassword(ledgerMetadataFormat.getPassword().toByteArray())
@@ -175,7 +175,8 @@
builder.withDigestType(DigestType.DUMMY);
break;
default:
- throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType());
+ throw new IllegalArgumentException("Unable to convert digest type "
+ + ledgerMetadataFormat.getDigestType());
}
return builder.build();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 8eabccd..f229603 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -34,7 +34,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
- * Utils to load offloaders
+ * Utils to load offloaders.
*/
@Slf4j
public class OffloaderUtils {
@@ -48,7 +48,9 @@
* @return the offloader class name
* @throws IOException when fail to retrieve the pulsar offloader class
*/
- static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath, String narExtractionDirectory) throws IOException {
+ static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath,
+ String narExtractionDirectory)
+ throws IOException {
// need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case
// LedgerOffloaderFactory is loaded by a classloader that is not the default classloader
// as is the case for the pulsar presto plugin
@@ -73,8 +75,8 @@
try {
Object offloader = factoryClass.getDeclaredConstructor().newInstance();
if (!(offloader instanceof LedgerOffloaderFactory)) {
- throw new IOException("Class " + conf.getOffloaderFactoryClass() + " does not implement interface "
- + LedgerOffloaderFactory.class.getName());
+ throw new IOException("Class " + conf.getOffloaderFactoryClass() + " does not implement "
+ + "interface " + LedgerOffloaderFactory.class.getName());
}
loadFuture.complete((LedgerOffloaderFactory) offloader);
} catch (Throwable t) {
@@ -106,15 +108,18 @@
}
}
- public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory) throws IOException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
+ public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory)
+ throws IOException {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
+ narExtractionDirectory)) {
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class);
}
}
- public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory) throws IOException {
+ public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory)
+ throws IOException {
Path path = Paths.get(offloadersPath).toAbsolutePath();
log.info("Searching for offloaders in {}", path);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
index dfe4c31..caf8d5e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
@@ -39,8 +39,8 @@
return factory.getRight();
}
}
- throw new IOException("No offloader found for driver '" + driverName + "'." +
- " Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.");
+ throw new IOException("No offloader found for driver '" + driverName + "'."
+ + " Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.");
}
@Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
index e80c75b..307b7a5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
@@ -18,11 +18,10 @@
*/
package org.apache.bookkeeper.mledger.offload;
-import lombok.extern.slf4j.Slf4j;
-
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
/**
* Implementation of an Offloaders. The main purpose of this class is to
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/package-info.java
new file mode 100644
index 0000000..1be0b86
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
\ No newline at end of file
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java
index 6adc2ee..8b0e25f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java
@@ -23,16 +23,14 @@
* The semantic was changed in https://github.com/netty/netty/commit/83a19d565064ee36998eb94f946e5a4264001065#diff-b9443e2689a46b3647fe6a8de0fdf3b2
*/
-import static io.netty.util.internal.ObjectUtil.checkPositive;
-
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCounted;
-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* Abstract base class for classes wants to implement {@link ReferenceCounted}.
*/
+
public abstract class AbstractCASReferenceCounted implements ReferenceCounted {
private static final AtomicIntegerFieldUpdater<AbstractCASReferenceCounted> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractCASReferenceCounted.class, "refCnt");
@@ -45,7 +43,7 @@
}
/**
- * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
+ * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly.
*/
protected final void setRefCnt(int refCnt) {
refCntUpdater.set(this, refCnt);
@@ -58,7 +56,7 @@
@Override
public ReferenceCounted retain(int increment) {
- return retain0(checkPositive(increment, "increment"));
+ return retain0(io.netty.util.internal.ObjectUtil.checkPositive(increment, "increment"));
}
private ReferenceCounted retain0(int increment) {
@@ -89,7 +87,7 @@
@Override
public boolean release(int decrement) {
- return release0(checkPositive(decrement, "decrement"));
+ return release0(io.netty.util.internal.ObjectUtil.checkPositive(decrement, "decrement"));
}
private boolean release0(int decrement) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
index 9fd4a07..de0a5f5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
@@ -29,7 +29,7 @@
*/
public class Futures {
- public static CompletableFuture<Void> NULL_PROMISE = CompletableFuture.completedFuture(null);
+ public static CompletableFuture<Void> nullPromise_ = CompletableFuture.completedFuture(null);
/**
* Adapts a {@link CloseCallback} to a {@link CompletableFuture}.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
index 958f706..47d4bc2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
@@ -57,10 +57,10 @@
//This method is compare two position which position is bigger than another one.
//When the ledgerId and entryId in this position is same to another one and two position all have ack set, it will
//compare the ack set next bit index is bigger than another one.
- public static int compareToWithAckSet(PositionImpl currentPosition,PositionImpl otherPosition) {
- if (currentPosition == null || otherPosition ==null) {
- throw new IllegalArgumentException("Two positions can't be null! " +
- "current position : [" + currentPosition + "] other position : [" + otherPosition + "]");
+ public static int compareToWithAckSet(PositionImpl currentPosition, PositionImpl otherPosition) {
+ if (currentPosition == null || otherPosition == null) {
+ throw new IllegalArgumentException("Two positions can't be null! "
+ + "current position : [" + currentPosition + "] other position : [" + otherPosition + "]");
}
int result = ComparisonChain.start().compare(currentPosition.getLedgerId(),
otherPosition.getLedgerId()).compare(currentPosition.getEntryId(), otherPosition.getEntryId())
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index a5786ad..a5b3133 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.util;
import static com.google.common.base.Preconditions.checkArgument;
-
import com.google.common.collect.Lists;
import io.netty.util.ReferenceCounted;
import java.util.Collection;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
index dd77988..02021c5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.util;
import static com.google.common.base.Preconditions.checkArgument;
-
import java.util.Arrays;
import java.util.concurrent.atomic.LongAdder;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 315eafa..364b2fb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -369,7 +369,7 @@
EntryCache entryCache = cacheManager.getEntryCache(ml1);
final CountDownLatch counter = new CountDownLatch(1);
- entryCache.asyncReadEntry(lh, new PositionImpl(1L ,1L), new AsyncCallbacks.ReadEntryCallback() {
+ entryCache.asyncReadEntry(lh, new PositionImpl(1L,1L), new AsyncCallbacks.ReadEntryCallback() {
public void readEntryComplete(Entry entry, Object ctx) {
Assert.assertNotEquals(entry, null);
entry.release();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index af30c9c..bbcfd9e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -414,13 +414,13 @@
container.add(cursor2);
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));
- // Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5
+ // Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5
position = PositionImpl.get(5,6);
container.cursorUpdated(cursor2, position);
doReturn(position).when(cursor2).getReadPosition();
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
- // Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6
+ // Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6
position = PositionImpl.get(5,8);
doReturn(position).when(cursor1).getReadPosition();
container.cursorUpdated(cursor1, position);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2f80bc8..9a4d117 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -388,7 +388,7 @@
fail(exception.getMessage());
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
counter.await();
}
@@ -416,7 +416,7 @@
fail("async-call should not have failed");
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
counter.await();
@@ -438,7 +438,7 @@
counter2.countDown();
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
counter2.await();
}
@@ -465,7 +465,7 @@
counter.countDown();
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
counter.await();
}
@@ -621,7 +621,7 @@
final AtomicBoolean moveStatus = new AtomicBoolean(false);
// reset to earliest
- PositionImpl earliest = PositionImpl.earliest;
+ PositionImpl earliest = PositionImpl.EARLIEST;
try {
cursor.resetCursor(earliest);
moveStatus.set(true);
@@ -659,7 +659,7 @@
moveStatus.set(false);
// reset to latest should point to the first non-exist entry in the last ledger
- PositionImpl anotherLast = PositionImpl.latest;
+ PositionImpl anotherLast = PositionImpl.LATEST;
try {
cursor.resetCursor(anotherLast);
moveStatus.set(true);
@@ -1757,7 +1757,7 @@
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("Error reading", exception);
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
}
ledger.addEntry("test".getBytes());
@@ -2439,17 +2439,17 @@
for(int i = 0; i < 10; i++) {
ledger.addEntry(("entry" + i).getBytes(Encoding));
}
- PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
- PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
- PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
- PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+ PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1);
+ PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 2);
+ PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 5);
+ PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 6);
c1.delete(Lists.newArrayList(p1, p2, p3, p4));
assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p3.getLedgerId(),
p3.getEntryId() - 1), p4));
- PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 8);
+ PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 8);
c1.delete(p5);
assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p5.getLedgerId(),
@@ -2466,10 +2466,10 @@
for(int i = 0; i < 10; i++) {
ledger.addEntry(("entry" + i).getBytes(Encoding));
}
- PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
- PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
- PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
- PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+ PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1);
+ PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 2);
+ PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 5);
+ PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 6);
c1.delete(Lists.newArrayList(p1, p2, p3, p4));
@@ -2477,12 +2477,12 @@
EntryImpl entry2 = EntryImpl.create(p2, ByteBufAllocator.DEFAULT.buffer(0));
EntryImpl entry3 = EntryImpl.create(p3, ByteBufAllocator.DEFAULT.buffer(0));
EntryImpl entry4 = EntryImpl.create(p4, ByteBufAllocator.DEFAULT.buffer(0));
- EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 7,
+ EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 7,
ByteBufAllocator.DEFAULT.buffer(0));
List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5);
c1.trimDeletedEntries(entries);
assertEquals(entries.size(), 1);
- assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId() ,
+ assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId(),
markDeletedPosition.getEntryId() + 7));
}
@@ -2634,7 +2634,7 @@
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.countDown();
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
assertTrue(c1.cancelPendingReadRequest());
@@ -2650,7 +2650,7 @@
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter2.countDown();
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
ledger.addEntry("entry-1".getBytes(Encoding));
@@ -3131,7 +3131,7 @@
@Test
void testNonDurableCursorActive() throws Exception {
ManagedLedger ml = factory.open("testInactive");
- ManagedCursor cursor = ml.newNonDurableCursor(PositionImpl.latest, "c1");
+ ManagedCursor cursor = ml.newNonDurableCursor(PositionImpl.LATEST, "c1");
assertTrue(cursor.isActive());
@@ -3353,8 +3353,8 @@
int sendNumber = 20;
ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxPosition");
ManagedCursor c = ledger.openCursor("c");
- Position position = PositionImpl.earliest;
- Position maxCanReadPosition = PositionImpl.earliest;
+ Position position = PositionImpl.EARLIEST;
+ Position maxCanReadPosition = PositionImpl.EARLIEST;
for (int i = 0; i < sendNumber; i++) {
if (i == readMaxNumber - 1) {
position = ledger.addEntry(new byte[1024]);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index 8d1d4ea..9dc88ef 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -147,7 +147,7 @@
}
}, null);
- factory.asyncOpenReadOnlyCursor(ledgerName, PositionImpl.earliest, new ManagedLedgerConfig(),
+ factory.asyncOpenReadOnlyCursor(ledgerName, PositionImpl.EARLIEST, new ManagedLedgerConfig(),
new AsyncCallbacks.OpenReadOnlyCursorCallback() {
@Override
public void openReadOnlyCursorComplete(ReadOnlyCursor cursor, Object ctx) {
@@ -174,6 +174,6 @@
Assert.assertThrows(ManagedLedgerException.ManagedLedgerFactoryClosedException.class,
() -> factory.open(ledgerName));
Assert.assertThrows(ManagedLedgerException.ManagedLedgerFactoryClosedException.class,
- () -> factory.openReadOnlyCursor(ledgerName, PositionImpl.earliest, new ManagedLedgerConfig()));
+ () -> factory.openReadOnlyCursor(ledgerName, PositionImpl.EARLIEST, new ManagedLedgerConfig()));
}
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java
index 2fb4964..85755db 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java
@@ -141,7 +141,7 @@
assertTrue(ledger.isTerminated());
assertEquals(lastPosition, p1);
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
List<Entry> entries = c1.readEntries(10);
assertEquals(entries.size(), 2);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d837651..79eab77 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -343,7 +343,7 @@
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
- }, cursor, PositionImpl.latest);
+ }, cursor, PositionImpl.LATEST);
}
@Override
@@ -2706,11 +2706,11 @@
String ctxStr = "timeoutCtx";
CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
ReadHandle ledgerHandle = mock(ReadHandle.class);
- doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.earliest.getLedgerId(),
- PositionImpl.earliest.getEntryId());
+ doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.EARLIEST.getLedgerId(),
+ PositionImpl.EARLIEST.getEntryId());
// (1) test read-timeout for: ManagedLedger.asyncReadEntry(..)
- ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest, new ReadEntryCallback() {
+ ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
responseException1.set(null);
@@ -2729,7 +2729,7 @@
// (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
- PositionImpl readPositionRef = PositionImpl.earliest;
+ PositionImpl readPositionRef = PositionImpl.EARLIEST;
ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() {
@@ -2743,8 +2743,8 @@
responseException2.set(exception);
}
- }, null, PositionImpl.latest);
- ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
+ }, null, PositionImpl.LATEST);
+ ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
false, opReadEntry, ctxStr);
retryStrategically((test) -> {
return responseException2.get() != null;
@@ -3128,7 +3128,7 @@
entries.forEach(Entry::release);
// Now we update the cursors that are still subscribing to ledgers that has been consumed completely
managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
- managedLedger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
+ managedLedger.internalTrimConsumedLedgers(Futures.nullPromise_);
ManagedLedgerImpl finalManagedLedger = managedLedger;
Awaitility.await().untilAsserted(() -> {
// We only have one empty ledger at last [{entries=0}]
@@ -3224,7 +3224,7 @@
assertEquals(ledger.ledgerCache.size(), 2);
cursor.clearBacklog();
cursor2.clearBacklog();
- ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 1);
assertEquals(ledger.ledgerCache.size(), 0);
@@ -3256,7 +3256,7 @@
assertEquals(ledger.ledgerCache.size(), 2);
cursor.clearBacklog();
cursor2.clearBacklog();
- ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 0);
@@ -3268,7 +3268,7 @@
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
cursor3.clearBacklog();
- ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 0);
@@ -3307,7 +3307,7 @@
ledgerOffloader = mock(NullLedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);
- ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
+ ledger.internalTrimConsumedLedgers(Futures.nullPromise_);
verify(ledgerOffloader, times(1)).getOffloadPolicies();
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 4c2944f..392f777 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -64,7 +64,7 @@
void readFromEmptyLedger() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
List<Entry> entries = c1.readEntries(10);
assertEquals(entries.size(), 0);
entries.forEach(e -> e.release());
@@ -107,7 +107,7 @@
void testZNodeBypassed() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
assertFalse(Iterables.isEmpty(ledger.getCursors()));
c1.close();
@@ -123,8 +123,8 @@
ManagedLedger ledger = factory.open("my_test_ledger",
new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
- ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
+ ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("entry-1".getBytes(Encoding));
ledger.addEntry("entry-2".getBytes(Encoding));
@@ -156,8 +156,8 @@
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)
.setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
- ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
+ ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("entry-1".getBytes(Encoding));
ledger.addEntry("entry-2".getBytes(Encoding));
@@ -186,7 +186,7 @@
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)
.setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.close();
@@ -203,15 +203,15 @@
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)
.setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
- ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
- ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("dummy-entry-3".getBytes(Encoding));
- ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("dummy-entry-4".getBytes(Encoding));
- ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.LATEST);
assertEquals(c1.getNumberOfEntries(), 4);
assertTrue(c1.hasMoreEntries());
@@ -240,15 +240,15 @@
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)
.setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
- ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
- ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.LATEST);
Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
- ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.LATEST);
Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
- ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.LATEST);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
assertEquals(c2.getNumberOfEntriesInBacklog(false), 3);
@@ -338,7 +338,7 @@
void testResetCursor() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
- ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -362,7 +362,7 @@
void testasyncResetCursor() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
- ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.LATEST);
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -395,7 +395,7 @@
void rewind() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)
.setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -446,7 +446,7 @@
@Test(timeOut = 20000)
void markDeleteSkippingMessage() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
- ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -542,7 +542,7 @@
void testSingleDelete() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3)
.setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
- ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest);
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.LATEST);
Position p1 = ledger.addEntry("entry1".getBytes());
Position p2 = ledger.addEntry("entry2".getBytes());
@@ -588,7 +588,7 @@
Position p3 = ledger.addEntry("entry-3".getBytes());
Thread.sleep(300);
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
assertEquals(c1.getReadPosition(), p3);
assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(5, -1));
}
@@ -605,7 +605,7 @@
/* Position p5 = */ ledger.addEntry("entry-5".getBytes());
/* Position p6 = */ ledger.addEntry("entry-6".getBytes());
- ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
assertEquals(c1.getReadPosition(), p1);
assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(3, -1));
assertEquals(c1.getNumberOfEntries(), 6);
@@ -698,7 +698,7 @@
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
ManagedCursor c1 = ledger.openCursor("c1");
- ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
@@ -756,7 +756,7 @@
void deleteNonDurableCursorWithName() throws Exception {
ManagedLedger ledger = factory.open("deleteManagedLedgerWithNonDurableCursor");
- ManagedCursor c = ledger.newNonDurableCursor(PositionImpl.earliest, "custom-name");
+ ManagedCursor c = ledger.newNonDurableCursor(PositionImpl.EARLIEST, "custom-name");
assertEquals(Iterables.size(ledger.getCursors()), 1);
ledger.deleteCursor(c.getName());
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index e761b04..bbaef69 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -93,7 +93,7 @@
UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
- ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
int i = 0;
for (Entry e : cursor.readEntries(10)) {
assertEquals(new String(e.getData()), "entry-" + i++);
@@ -163,7 +163,7 @@
UUID secondLedgerUUID = new UUID(secondLedger.getOffloadContext().getUidMsb(),
secondLedger.getOffloadContext().getUidLsb());
- ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
int i = 0;
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index a7092e4..d0da8e6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -147,13 +147,13 @@
assertEquals(ledger.getLedgersInfoAsList().size(), 3);
try {
- ledger.offloadPrefix(PositionImpl.earliest);
+ ledger.offloadPrefix(PositionImpl.EARLIEST);
fail("Should have thrown an exception");
} catch (ManagedLedgerException.InvalidCursorPositionException e) {
// expected
}
try {
- ledger.offloadPrefix(PositionImpl.latest);
+ ledger.offloadPrefix(PositionImpl.LATEST);
fail("Should have thrown an exception");
} catch (ManagedLedgerException.InvalidCursorPositionException e) {
// expected
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java
index 05ddd1d..340acfe 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java
@@ -40,7 +40,7 @@
@Test
void notFound() throws Exception {
try {
- factory.openReadOnlyCursor("notFound", PositionImpl.earliest, new ManagedLedgerConfig());
+ factory.openReadOnlyCursor("notFound", PositionImpl.EARLIEST, new ManagedLedgerConfig());
fail("Should have failed");
} catch (ManagedLedgerNotFoundException e) {
// Expected
@@ -59,7 +59,7 @@
ledger.addEntry(("entry-" + i).getBytes());
}
- ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.earliest, new ManagedLedgerConfig());
+ ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.EARLIEST, new ManagedLedgerConfig());
assertEquals(cursor.getNumberOfEntries(), N);
assertTrue(cursor.hasMoreEntries());
@@ -78,7 +78,7 @@
}
// Open a new cursor
- cursor = factory.openReadOnlyCursor("simple", PositionImpl.earliest, new ManagedLedgerConfig());
+ cursor = factory.openReadOnlyCursor("simple", PositionImpl.EARLIEST, new ManagedLedgerConfig());
assertEquals(cursor.getNumberOfEntries(), 2 * N);
assertTrue(cursor.hasMoreEntries());
@@ -114,7 +114,7 @@
ledger.addEntry(("entry-" + i).getBytes());
}
- ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.earliest, new ManagedLedgerConfig());
+ ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.EARLIEST, new ManagedLedgerConfig());
assertEquals(cursor.getNumberOfEntries(), N);
assertTrue(cursor.hasMoreEntries());
@@ -138,7 +138,7 @@
ledger.addEntry(("entry-" + i).getBytes());
}
- ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip-all", PositionImpl.earliest,
+ ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip-all", PositionImpl.EARLIEST,
new ManagedLedgerConfig());
assertEquals(cursor.getNumberOfEntries(), N);
@@ -166,7 +166,7 @@
ledger.addEntry(("entry-" + i).getBytes());
}
- ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.earliest, new ManagedLedgerConfig());
+ ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.EARLIEST, new ManagedLedgerConfig());
assertEquals(cursor.getNumberOfEntries(), N);
assertTrue(cursor.hasMoreEntries());
@@ -188,7 +188,7 @@
void empty() throws Exception {
factory.open("empty", new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS));
- ReadOnlyCursor cursor = factory.openReadOnlyCursor("empty", PositionImpl.earliest, new ManagedLedgerConfig());
+ ReadOnlyCursor cursor = factory.openReadOnlyCursor("empty", PositionImpl.EARLIEST, new ManagedLedgerConfig());
assertEquals(cursor.getNumberOfEntries(), 0);
assertFalse(cursor.hasMoreEntries());
@@ -206,7 +206,7 @@
ledger.addEntry(("entry-" + i).getBytes());
}
- ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.earliest, new ManagedLedgerConfig());
+ ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.EARLIEST, new ManagedLedgerConfig());
assertEquals(cursor.getNumberOfEntries(), N);
assertTrue(cursor.hasMoreEntries());
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index f1c0a4a..fd2b4ca 100644
--- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -90,14 +90,14 @@
loginContextName = config.getSaslJaasServerSectionName();
if (jaasCredentialsContainer == null) {
- log.info("JAAS loginContext is: {}." , loginContextName);
+ log.info("JAAS loginContext is: {}.", loginContextName);
try {
jaasCredentialsContainer = new JAASCredentialsContainer(
loginContextName,
new PulsarSaslServer.SaslServerCallbackHandler(allowedIdsPattern),
configuration);
} catch (LoginException e) {
- log.error("JAAS login in broker failed" , e);
+ log.error("JAAS login in broker failed", e);
throw new IOException(e);
}
}
@@ -122,7 +122,7 @@
PulsarSaslServer server = new PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern);
return new SaslAuthenticationState(server);
} catch (Throwable t) {
- log.error("Failed create sasl auth state" , t);
+ log.error("Failed create sasl auth state", t);
throw new AuthenticationException(t.getMessage());
}
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 7cf1a8b..3d0f995 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -247,7 +247,7 @@
// if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
if (totalAvailableBookiesInPrimaryGroup < ensembleSize) {
log.info(
- "Not found enough available-bookies from primary isolation group [{}] , checking secondary group [{}]",
+ "Not found enough available-bookies from primary isolation group [{}], checking secondary group [{}]",
primaryIsolationGroup, secondaryIsolationGroup);
for (String group : secondaryIsolationGroup) {
Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 76d2d6c..eed476c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -263,7 +263,7 @@
private final ReentrantLock mutex = new ReentrantLock();
private final Condition isClosedCondition = mutex.newCondition();
private volatile CompletableFuture<Void> closeFuture;
- // key is listener name , value is pulsar address and pulsar ssl address
+ // key is listener name, value is pulsar address and pulsar ssl address
private Map<String, AdvertisedListener> advertisedListeners;
private NamespaceName heartbeatNamespaceV2;
@@ -1218,7 +1218,7 @@
}
} else {
LOG.info("No ledger offloader configured, using NULL instance");
- return NullLedgerOffloader.INSTANCE;
+ return NullLedgerOffloader.instance_;
}
} catch (Throwable t) {
throw new PulsarServerException(t);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index df0f380..e54bb59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2546,7 +2546,7 @@
} catch (Exception exception) {
exception.printStackTrace();
log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(), messagePosition,
- topicName , exception);
+ topicName, exception);
throw new RestException(exception);
}
}
@@ -4239,7 +4239,7 @@
}));
}
- protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold , boolean isGlobal) {
+ protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold, boolean isGlobal) {
if (compactionThreshold != null && compactionThreshold < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 9d26e30..211707d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -894,7 +894,7 @@
//both namespace-level and topic-level policy are not set, try to use broker-level policy
ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (publishRate != null) {
- //publishRate is not null , use namespace-level policy
+ //publishRate is not null, use namespace-level policy
updatePublishDispatcher(publishRate);
} else {
PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0451571..c2a27ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1683,7 +1683,7 @@
if (t instanceof PersistentTopic) {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
- managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ managedLedger.trimConsumedLedgersInBackground(Futures.nullPromise_);
}
);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 468946d..dc4019d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -235,7 +235,7 @@
}
// Note
- // Must ensure that the message is written to the pendingAcks before sent is first , because this consumer
+ // Must ensure that the message is written to the pendingAcks before sent is first, because this consumer
// is possible to disconnect at this time.
if (pendingAcks != null) {
for (int i = 0; i < entries.size(); i++) {
@@ -347,7 +347,7 @@
log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring",
subscription, consumerId);
}
- PositionImpl position = PositionImpl.earliest;
+ PositionImpl position = PositionImpl.EARLIEST;
if (ack.getMessageIdsCount() == 1) {
MessageIdData msgId = ack.getMessageIdAt(0);
if (msgId.getAckSetsCount() > 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5b4d1f5..f4c8335 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2195,7 +2195,7 @@
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnPartition fail ! topic {} , "
+ log.error("handleEndTxnOnPartition fail ! topic {}, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
@@ -2205,7 +2205,7 @@
});
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnPartition fail ! topic {} , "
+ log.error("handleEndTxnOnPartition fail ! topic {}, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
@@ -2248,7 +2248,7 @@
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
completableFuture.whenComplete((ignored, e) -> {
if (e != null) {
- log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+ log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
@@ -2266,7 +2266,7 @@
.thenAccept((b) -> {
if (b) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, "
- + "subscription: {} ,txnId: [{}], txnAction: [{}]", topic, subName,
+ + "subscription: {}, txnId: [{}], txnAction: [{}]", topic, subName,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
@@ -2280,7 +2280,7 @@
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}"
+ log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
@@ -2290,7 +2290,7 @@
});
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+ log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 555da88..af9907d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -187,7 +187,7 @@
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
- }, null, PositionImpl.latest);
+ }, null, PositionImpl.LATEST);
}
public Status getStatus() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 4d79c9a..f7aebc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -242,7 +242,7 @@
messagesToRead);
}
cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this,
- null, PositionImpl.latest);
+ null, PositionImpl.LATEST);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index d8e48f9..88a9419 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -162,7 +162,7 @@
try {
locatorEntry = getLocator(key).get();
} catch (Exception e) {
- log.warn("Failed to get list of schema-storage ledger for {} , the exception as follow: \n {}", key,
+ log.warn("Failed to get list of schema-storage ledger for {}, the exception as follow: \n {}", key,
(e instanceof ExecutionException ? e.getCause() : e));
throw new IOException("Failed to get schema ledger for" + key);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java
index cdd4081..b0ab159 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java
@@ -35,7 +35,7 @@
}
if (descriptor == null) {
throw new InvalidSchemaDataException(
- "protobuf root message descriptor is null ,"
+ "protobuf root message descriptor is null,"
+ " please recheck rootMessageTypeName or rootFileDescriptorName conf. ");
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 98d572c..261b059 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -371,7 +371,7 @@
@Override
public PositionImpl getMaxReadPosition() {
- return PositionImpl.latest;
+ return PositionImpl.LATEST;
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 9978f6f..d6b053c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -472,7 +472,7 @@
if (checkIfReady() || checkIfNoSnapshot()) {
return this.maxReadPosition;
} else {
- return PositionImpl.earliest;
+ return PositionImpl.EARLIEST;
}
}
@@ -510,7 +510,7 @@
private final TopicTransactionBufferRecoverCallBack callBack;
- private Position startReadCursorPosition = PositionImpl.earliest;
+ private Position startReadCursorPosition = PositionImpl.EARLIEST;
private final SpscArrayQueue<Entry> entryQueue;
@@ -653,7 +653,7 @@
if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
- cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.latest);
+ cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.LATEST);
}
}
return isReadable;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index f7e147d..5ccec5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -90,7 +90,7 @@
@Override
public PositionImpl getMaxReadPosition() {
- return PositionImpl.latest;
+ return PositionImpl.LATEST;
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 1592318..1b63c11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -110,7 +110,7 @@
//TODO can control the number of entry to read
private void readAsync(int numberOfEntriesToRead,
AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
- cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.latest);
+ cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.LATEST);
}
@Override
@@ -285,7 +285,7 @@
buf.release();
completableFuture.completeExceptionally(new PersistenceException(exception));
}
- } , null);
+ }, null);
return completableFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index aac213f..f379500 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -90,13 +90,13 @@
synchronized (this) {
PositionImpl cursorPosition;
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
- cursorPosition = PositionImpl.earliest;
+ cursorPosition = PositionImpl.EARLIEST;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
}
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
- cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest);
+ cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.LATEST);
} else {
compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
@@ -110,7 +110,7 @@
}
if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer,
- PositionImpl.latest);
+ PositionImpl.LATEST);
return CompletableFuture.completedFuture(null);
} else {
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 6cd362e..4870aa6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1210,7 +1210,7 @@
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getDispatchRate(topic)));
- //2 Remove level policy ,DispatchRateLimiter should us ns level policy
+ //2 Remove level policy, DispatchRateLimiter should us ns level policy
Awaitility.await()
.untilAsserted(() -> {
DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 1e44ea0..2acb24b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -434,7 +434,7 @@
admin.topics().removeInactiveTopicPolicies(topic);
//Only the broker-level policies is set, so after removing the topic-level policies
- // , the topic will use the broker-level policies
+ //, the topic will use the broker-level policies
Awaitility.await().untilAsserted(()
-> assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).getInactiveTopicPolicies()
, defaultPolicy));
@@ -446,7 +446,7 @@
assertEquals(policies, admin.topics().getInactiveTopicPolicies(topic2));
inactiveTopicPolicies.setMaxInactiveDurationSeconds(999);
//Both broker level and namespace level policies are set, so after removing the topic level policies
- // , the topic will use the namespace level policies
+ //, the topic will use the namespace level policies
admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
//wait for zk
Awaitility.await().until(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 786d7a2..570c6cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1547,9 +1547,9 @@
assertNotNull(map);
assertEquals((long) map.get("brk_connection_created_total_count"), 2);
assertEquals((long) map.get("brk_active_connections"), 0);
- assertEquals((long) map.get("brk_connection_closed_total_count") , 2);
+ assertEquals((long) map.get("brk_connection_closed_total_count"), 2);
assertEquals((long) map.get("brk_connection_create_success_count"), 1);
- assertEquals((long) map.get("brk_connection_create_fail_count") , 1);
+ assertEquals((long) map.get("brk_connection_create_fail_count"), 1);
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index cd3f3cc..95e9741 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -312,7 +312,7 @@
final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0];
final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[1];
final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[2];
- callback.addComplete(PositionImpl.latest, payload, ctx);
+ callback.addComplete(PositionImpl.LATEST, payload, ctx);
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
@@ -329,8 +329,8 @@
final Topic.PublishContext publishContext = new Topic.PublishContext() {
@Override
public void completed(Exception e, long ledgerId, long entryId) {
- assertEquals(ledgerId, PositionImpl.latest.getLedgerId());
- assertEquals(entryId, PositionImpl.latest.getEntryId());
+ assertEquals(ledgerId, PositionImpl.LATEST.getLedgerId());
+ assertEquals(entryId, PositionImpl.LATEST.getEntryId());
latch.countDown();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index c88a464..0c6e7e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1120,7 +1120,7 @@
@DataProvider(name = "topicPrefix")
public static Object[][] topicPrefix() {
- return new Object[][] { { "persistent://" , "/persistent" }, { "non-persistent://" , "/non-persistent" } };
+ return new Object[][] { { "persistent://", "/persistent" }, { "non-persistent://", "/non-persistent" } };
}
@Test(dataProvider = "topicPrefix")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 8c5e969..6c27e2c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -339,7 +339,7 @@
received = consumer.receive();
assertEquals(received.getMessageId(), messageIds.get(messageIds.size() - 1));
- admin.topics().resetCursor(topicName, subscriptionName, new BatchMessageIdImpl(-1, -1, -1 ,10), true);
+ admin.topics().resetCursor(topicName, subscriptionName, new BatchMessageIdImpl(-1, -1, -1,10), true);
// Wait consumer reconnect
Awaitility.await().until(consumer::isConnected);
received = consumer.receive();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index c8f09fa..7b63ea5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -160,7 +160,7 @@
Awaitility.await().untilAsserted(() -> assertFalse(messageDeduplication.isEnabled()));
producer.newMessage().value("msg").sequenceId(1).send();
checkDeduplicationDisabled(producerName, messageDeduplication);
- //remove namespace-level , use broker-level
+ //remove namespace-level, use broker-level
admin.namespaces().removeDeduplicationStatus(myNamespace);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled()));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0c629f1..0d1bbda 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -209,7 +209,7 @@
}
return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
TopicName.get(topic).getPersistenceNamingEncoding(),
- PositionImpl.earliest, new ManagedLedgerConfig());
+ PositionImpl.EARLIEST, new ManagedLedgerConfig());
} catch (Exception e) {
log.error("Failed to get origin topic readonly cursor.", e);
Assert.fail("Failed to get origin topic readonly cursor.");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 3cc44ba..97907fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -133,7 +133,7 @@
}
txn.abort().get();
- // commit this txn , normalAckMessageIds are in pending ack state
+ // commit this txn, normalAckMessageIds are in pending ack state
commitTxn.commit().get();
// abort this txn, pendingAckMessageIds are delete from pending ack state
abortTxn.abort().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index dbcaae1..cca5b77 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -92,7 +92,7 @@
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
- consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+ consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
@@ -275,7 +275,7 @@
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
- consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+ consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
@@ -429,7 +429,7 @@
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
- consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+ consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
int totalInDeadLetter = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 63dbb8f..6728fc5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -3168,7 +3168,7 @@
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
- .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+ .autoUpdatePartitionsInterval(2, TimeUnit.SECONDS)
.create();
// 1. produce 5 messages
@@ -3181,7 +3181,7 @@
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(receiverQueueSize)
- .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+ .autoUpdatePartitionsInterval(2, TimeUnit.SECONDS)
.subscriptionName("test-multi-topic-consumer").subscribe();
int counter = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index 641b9a0..95cfdd9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -99,7 +99,7 @@
.create();
ManagedCursor cursor = ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get())
- .getManagedLedger().newNonDurableCursor(PositionImpl.earliest);
+ .getManagedLedger().newNonDurableCursor(PositionImpl.EARLIEST);
if (batchEnabled) {
for (int i = 0; i < n - 1; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index b7215f4..5eaeea1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -752,7 +752,7 @@
doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
.getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
- // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
+ // 7. call recheckTopics to unsubscribe topic 1,3, verify topics number: 2=6-1-3
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 69886a6..14db9a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -660,7 +660,7 @@
}
}, 20, 150);
- //change the schema ,the function should not run, resulting in no messages to consume
+ //change the schema, the function should not run, resulting in no messages to consume
schemaInput.put(sourceTopic, "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"false\",\"__alwaysAllowNull\":\"false\"}}");
localRunner = LocalRunner.builder()
.functionConfig(functionConfig)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 6d4b29b..b7a9dd9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -289,7 +289,7 @@
* consumer.acknowledge(msg);
* } catch (Throwable t) {
* log.warn("Failed to process message");
- * consumer.reconsumeLater(msg, 1000 , TimeUnit.MILLISECONDS);
+ * consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS);
* }
* }
* </code></pre>
@@ -322,7 +322,7 @@
* consumer.acknowledge(msg);
* } catch (Throwable t) {
* log.warn("Failed to process message");
- * consumer.reconsumeLater(msg, 1000 , TimeUnit.MILLISECONDS);
+ * consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS);
* }
* }
* </code></pre>
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index 38c53c3..3a19ba8 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -143,7 +143,7 @@
if (!initializedJAAS) {
synchronized (this) {
if (jaasCredentialsContainer == null) {
- log.info("JAAS loginContext is: {}." , loginContextName);
+ log.info("JAAS loginContext is: {}.", loginContextName);
try {
jaasCredentialsContainer = new JAASCredentialsContainer(
loginContextName,
@@ -151,7 +151,7 @@
configuration);
initializedJAAS = true;
} catch (LoginException e) {
- log.error("JAAS login in client failed" , e);
+ log.error("JAAS login in client failed", e);
throw new PulsarClientException(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
index d5b451f..b0b701a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
@@ -50,7 +50,7 @@
//extract root message path
String rootMessageTypeName = descriptor.getFullName();
String rootFileDescriptorName = descriptor.getFile().getFullName();
- //build FileDescriptorSet , this is equal to < protoc --include_imports --descriptor_set_out >
+ //build FileDescriptorSet, this is equal to < protoc --include_imports --descriptor_set_out >
byte[] fileDescriptorSet = FileDescriptorSet.newBuilder().addAllFile(fileDescriptorProtoCache.values()).build().toByteArray();
//serialize to bytes
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
index 084a583..717949d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
@@ -58,7 +58,7 @@
if (log.isDebugEnabled()) {
log.debug("execute with retry fail, will retry in {} ms", next, e);
}
- log.info("Because of {} , will retry in {} ms", e.getMessage(), next);
+ log.info("Because of {}, will retry in {} ms", e.getMessage(), next);
scheduledExecutorService.schedule(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback),
next, TimeUnit.MILLISECONDS);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 2d4e9df..b29d3cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -87,7 +87,7 @@
// if lower and upper has different key/ledger then set ranges for lower-key only if
// a. bitSet already exist and given value is not the last value in the bitset.
// it will prevent setting up values which are not actually expected to set
- // eg: (2:10..4:10] in this case , don't set any value for 2:10 and set [4:0..4:10]
+ // eg: (2:10..4:10] in this case, don't set any value for 2:10 and set [4:0..4:10]
if (rangeBitSet != null && (rangeBitSet.previousSetBit(rangeBitSet.size()) > lowerValueOpen)) {
int lastValue = rangeBitSet.previousSetBit(rangeBitSet.size());
rangeBitSet.set((int) lowerValue, (int) Math.max(lastValue, lowerValue) + 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
index 037a6ff..99a5f02 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
@@ -43,7 +43,7 @@
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY_ID).setValue(HARD_CODE_KEY_ID_VALUE);
- RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null , 0, 0, 0);
+ RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null, 0, 0, 0);
Map<String, String> properties = msg.getProperties();
assertEquals(properties.get(HARD_CODE_KEY), KEY_VALUE_SECOND);
assertEquals(properties.get(HARD_CODE_KEY_ID), HARD_CODE_KEY_ID_VALUE);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index e04960f..3db2ec9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -133,7 +133,7 @@
_statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);
statTotalSysExceptions = collectorRegistry.registerIfNotExist(
- PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL ,
+ PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 9b8d8d4..dfb0b9e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -117,7 +117,7 @@
.setIsRegexPattern(false)
.build());
} catch (JsonProcessingException e) {
- throw new IllegalArgumentException(String.format("Incorrect custom schema inputs ,Topic %s ", topicName));
+ throw new IllegalArgumentException(String.format("Incorrect custom schema inputs,Topic %s ", topicName));
}
});
}
@@ -212,7 +212,7 @@
sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
}
} catch (JsonProcessingException e) {
- throw new IllegalArgumentException(String.format("Incorrect custom schema outputs ,Topic %s ", functionConfig.getOutput()));
+ throw new IllegalArgumentException(String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
}
}
if (typeArgs != null) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 26479ba..69f8b38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -183,7 +183,7 @@
@Override
public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
uri, String clientRole) {
- delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole ,null);
+ delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole, null);
return Response.ok().build();
}
diff --git a/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
index 4a7a8fd..d5f6284 100644
--- a/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
+++ b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
@@ -63,7 +63,7 @@
} catch (Exception e) {
log.error("Failed to initialize AwsCredentialProviderPlugin {}", pluginFQClassName, e);
throw new IllegalArgumentException(
- String.format("invalid authplugin name %s , failed to init %s", pluginFQClassName, e.getMessage()));
+ String.format("invalid authplugin name %s, failed to init %s", pluginFQClassName, e.getMessage()));
}
}
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
index 9e85b43..77ee5bb 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
@@ -90,7 +90,7 @@
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+ throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:"
+ entry.toString(), e);
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index ead243a..cdd208d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -144,7 +144,7 @@
if (service.getProxyLogLevel() == 2) {
//Set a map between inbound and outbound,
//so can find inbound by outbound or find outbound by inbound
- inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
+ inboundOutboundChannelMap.put(outboundChannel.id(), inboundChannel.id());
}
if (!config.isHaProxyProtocolEnabled()) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 40c05a3..fea1a40 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -116,13 +116,13 @@
ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(),
cmd.getProducer().getTopic());
- logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName()
+ logging(ctx.channel(), cmd.getType(), "{producer:" + cmd.getProducer().getProducerName()
+ ",topic:" + cmd.getProducer().getTopic() + "}", null);
break;
case SEND:
if (service.getProxyLogLevel() != 2) {
- logging(ctx.channel() , cmd.getType() , "", null);
+ logging(ctx.channel(), cmd.getType(), "", null);
break;
}
topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + ","
@@ -137,20 +137,20 @@
TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
topic -> new TopicStats());
topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
- logging(ctx.channel() , cmd.getType() , "" , messages);
+ logging(ctx.channel(), cmd.getType(), "", messages);
break;
case SUBSCRIBE:
ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
+ ctx.channel().id(), cmd.getSubscribe().getTopic());
- logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName()
- + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
+ logging(ctx.channel(), cmd.getType(), "{consumer:" + cmd.getSubscribe().getConsumerName()
+ + ",topic:" + cmd.getSubscribe().getTopic() + "}", null);
break;
case MESSAGE:
if (service.getProxyLogLevel() != 2) {
- logging(ctx.channel() , cmd.getType() , "" , null);
+ logging(ctx.channel(), cmd.getType(), "", null);
break;
}
topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
@@ -165,17 +165,15 @@
topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
topic -> new TopicStats());
topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
- logging(ctx.channel() , cmd.getType() , "" , messages);
+ logging(ctx.channel(), cmd.getType(), "", messages);
break;
default:
- logging(ctx.channel() , cmd.getType() , "" , null);
+ logging(ctx.channel(), cmd.getType(), "", null);
break;
}
} catch (Exception e){
-
- log.error("{},{},{}" , e.getMessage() , e.getStackTrace() , e.getCause());
-
+ log.error("channelRead error ", e);
} finally {
buffer.resetReaderIndex();
buffer.resetWriterIndex();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index b0d53ad..953773c 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -82,7 +82,7 @@
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
Optional<Integer> proxyLogLevel = Optional.of(2);
- assertEquals( proxyLogLevel , proxyService.getConfiguration().getProxyLogLevel());
+ assertEquals( proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
proxyService.start();
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 9a64c05..16a838f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -178,7 +178,7 @@
}
} else {
log.info("No ledger offloader configured, using NULL instance");
- return NullLedgerOffloader.INSTANCE;
+ return NullLedgerOffloader.instance_;
}
} catch (Throwable t) {
throw new RuntimeException(t);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index d839e05..df46991 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -395,7 +395,7 @@
// if the available size is invalid and the entry queue size is 0, read one entry
outstandingReadsRequests.decrementAndGet();
cursor.asyncReadEntries(batchSize, entryQueueCacheSizeAllocator.getAvailableCacheSize(),
- this, System.nanoTime(), PositionImpl.latest);
+ this, System.nanoTime(), PositionImpl.LATEST);
}
// stats for successful read request
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 96a100e..ee64432 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -270,7 +270,7 @@
try {
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
- PositionImpl.earliest, managedLedgerConfig);
+ PositionImpl.EARLIEST, managedLedgerConfig);
long numEntries = readOnlyCursor.getNumberOfEntries();
if (numEntries <= 0) {
@@ -362,7 +362,7 @@
try {
readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
topicNamePersistenceEncoding,
- PositionImpl.earliest, managedLedgerConfig);
+ PositionImpl.EARLIEST, managedLedgerConfig);
if (tupleDomain.getDomains().isPresent()) {
Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
index f1410e6..255f266 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
@@ -31,7 +31,7 @@
public void testFormatMessage() {
String producerName = "producer-1";
long msgId = 3;
- byte[] message = "{ \"producer\": \"%p\", \"msgId\": %i, \"nanoTime\": %t, \"float1\": %5.2f, \"float2\": %-5.2f, \"long1\": %12l, \"long2\": %l, \"int1\": %d, \"int2\": %1d , \"long3\": %5l, \"str\": \"%5s\" }".getBytes();
+ byte[] message = "{ \"producer\": \"%p\", \"msgId\": %i, \"nanoTime\": %t, \"float1\": %5.2f, \"float2\": %-5.2f, \"long1\": %12l, \"long2\": %l, \"int1\": %d, \"int2\": %1d, \"long3\": %5l, \"str\": \"%5s\" }".getBytes();
byte[] formatted = new DefaultMessageFormatter().formatMessage(producerName, msgId, message);
String jsonString = new String(formatted, StandardCharsets.UTF_8);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 8bf2ebf..a4c347a 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -123,7 +123,7 @@
private void readAsync(int numberOfEntriesToRead,
AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
- cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.latest);
+ cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.LATEST);
}
@Override
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 7f1d8a7..28f41ee 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -453,7 +453,7 @@
try {
builder.cryptoFailureAction(ConsumerCryptoFailureAction.valueOf(action));
} catch (Exception e) {
- log.warn("Failed to configure cryptoFailureAction {} , {}", action, e.getMessage());
+ log.warn("Failed to configure cryptoFailureAction {}, {}", action, e.getMessage());
}
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index ef0279d..21cd10b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -98,7 +98,7 @@
try {
builder.cryptoFailureAction(ConsumerCryptoFailureAction.valueOf(action));
} catch (Exception e) {
- log.warn("Failed to configure cryptoFailureAction {} , {}", action, e.getMessage());
+ log.warn("Failed to configure cryptoFailureAction {}, {}", action, e.getMessage());
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 62f59c3..dca6be4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -241,7 +241,7 @@
.create();
for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
- final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
+ final Stock stock = new Stock(i, "STOCK_" + i, 100.0 + i * 10);
producer.send(stock);
}
producer.flush();
@@ -277,8 +277,8 @@
for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
int j = 100 * i;
- final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
- final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
+ final Stock stock1 = new Stock(j, "STOCK_" + j, 100.0 + j * 10);
+ final Stock stock2 = new Stock(i, "STOCK_" + i, 100.0 + i * 10);
producer.send(new KeyValue<>(stock1, stock2));
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 4c129fa..2d3e96a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -150,7 +150,7 @@
int sendMessageCnt = 0;
while (true) {
Stock stock = new Stock(
- sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10);
+ sendMessageCnt,"STOCK_" + sendMessageCnt, 100.0 + sendMessageCnt * 10);
MessageIdImpl messageId = (MessageIdImpl) producer.send(stock);
sendMessageCnt ++;
if (firstLedgerId == -1) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index f9c86fd..817c6a7 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -94,7 +94,7 @@
final private long maxBufferLength;
final private ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue<>();
private CompletableFuture<OffloadResult> offloadResult;
- private volatile PositionImpl lastOfferedPosition = PositionImpl.latest;
+ private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
private final Duration maxSegmentCloseTime;
private final long minSegmentCloseTimeMillis;
private final long segmentBeginTimeMillis;