Enable CheckStyle Plugin in managed-ledger module (#13619)

Motivation
Enable CheckStyle Plugin in managed-ledger module.

Documentation
- [x] `no-need-doc` 
diff --git a/docker/pulsar/scripts/gen-yml-from-env.py b/docker/pulsar/scripts/gen-yml-from-env.py
index 8aee68b..f2e1031 100755
--- a/docker/pulsar/scripts/gen-yml-from-env.py
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -104,6 +104,6 @@
         conf.pop('processContainerFactory', None)
 
     # Store back the updated config in the same file
-    f = open(conf_filename , 'w')
+    f = open(conf_filename, 'w')
     yaml.dump(conf, f, default_flow_style=False)
     f.close()
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 663c73d..aa57e16 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -144,6 +144,19 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>checkstyle</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 7205b77..e5fd0dd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -18,11 +18,10 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import io.netty.buffer.ByteBuf;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
-import io.netty.buffer.ByteBuf;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index ead9db7..194bfe4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -77,12 +77,12 @@
         CompletableFuture<OffloadResult> getOffloadResultAsync();
 
         /**
-         * Manually close current offloading segment
+         * Manually close current offloading segment.
          * @return true if the segment is not already closed
          */
         boolean close();
 
-        default CompletableFuture<Boolean> AsyncClose() {
+        default CompletableFuture<Boolean> asyncClose() {
             return CompletableFuture.completedFuture(close());
         }
     }
@@ -158,7 +158,8 @@
      * ensuring that subsequent calls will not attempt to offload the same ledger
      * again.
      *
-     * @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been successful.
+     * @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been
+     * successful.
      */
     default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uid, long beginLedger,
                                                               long beginEntry,
@@ -201,20 +202,19 @@
         throw new UnsupportedOperationException();
     }
 
-    default CompletableFuture<Void> deleteOffloaded(UUID uid,
-                                                    Map<String, String> offloadDriverMetadata) {
+    default CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> offloadDriverMetadata) {
         throw new UnsupportedOperationException();
     }
 
     /**
-     * Get offload policies of this LedgerOffloader
+     * Get offload policies of this LedgerOffloader.
      *
      * @return offload policies
      */
     OffloadPoliciesImpl getOffloadPolicies();
 
     /**
-     * Close the resources if necessary
+     * Close the resources if necessary.
      */
     void close();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1fb90a..6930db1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -20,11 +20,9 @@
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Range;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -71,7 +69,7 @@
     long getLastActive();
 
     /**
-     * Update the last active time of the cursor
+     * Update the last active time of the cursor.
      *
      */
     void updateLastActive();
@@ -476,7 +474,7 @@
      *            opaque context
      */
     void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries,
-            final SkipEntriesCallback callback, Object ctx);
+             SkipEntriesCallback callback, Object ctx);
 
     /**
      * Find the newest entry that matches the given predicate.  Will only search among active entries
@@ -501,7 +499,8 @@
      * @throws InterruptedException
      * @throws ManagedLedgerException
      */
-    Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
+    Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition)
+            throws InterruptedException, ManagedLedgerException;
 
     /**
      * Find the newest entry that matches the given predicate.
@@ -526,7 +525,7 @@
      * @throws InterruptedException
      * @throws ManagedLedgerException
      */
-    void resetCursor(final Position position) throws InterruptedException, ManagedLedgerException;
+    void resetCursor(Position position) throws InterruptedException, ManagedLedgerException;
 
     /**
      * reset the cursor to specified position to enable replay of messages.
@@ -536,7 +535,7 @@
      * @param callback
      *            callback object
      */
-    void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback);
+    void asyncResetCursor(Position position, AsyncCallbacks.ResetCursorCallback callback);
 
     /**
      * Read the specified set of positions from ManagedLedger.
@@ -657,7 +656,7 @@
     int getNonContiguousDeletedMessagesRangeSerializedSize();
 
     /**
-     * Returns the estimated size of the unacknowledged backlog for this cursor
+     * Returns the estimated size of the unacknowledged backlog for this cursor.
      *
      * @return the estimated size from the mark delete position of the cursor
      */
@@ -677,20 +676,20 @@
     void setThrottleMarkDelete(double throttleMarkDelete);
 
     /**
-     * Get {@link ManagedLedger} attached with cursor
+     * Get {@link ManagedLedger} attached with cursor.
      *
      * @return ManagedLedger
      */
     ManagedLedger getManagedLedger();
 
     /**
-     * Get last individual deleted range
+     * Get last individual deleted range.
      * @return range
      */
     Range<PositionImpl> getLastIndividualDeletedRange();
 
     /**
-     * Trim delete entries for the given entries
+     * Trim delete entries for the given entries.
      */
     void trimDeletedEntries(List<Entry> entries);
 
@@ -715,5 +714,5 @@
      * Checks if the cursor is closed.
      * @return whether this cursor is closed.
      */
-    public boolean isClosed();
+    boolean isClosed();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 74ab2ce..0e58d1e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -129,7 +129,8 @@
      * @return the Position at which the entry has been inserted
      * @throws ManagedLedgerException
      */
-    Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException;
+    Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
+            ManagedLedgerException;
 
     /**
      * Append a new entry asynchronously.
@@ -165,7 +166,8 @@
      * @param ctx
      *            opaque context
      */
-    void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback, Object ctx);
+    void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback,
+                       Object ctx);
 
 
     /**
@@ -222,7 +224,8 @@
      * @return the ManagedCursor
      * @throws ManagedLedgerException
      */
-    ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;
+    ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException,
+            ManagedLedgerException;
 
     /**
      * Open a ManagedCursor in this ManagedLedger.
@@ -240,7 +243,8 @@
      * @return the ManagedCursor
      * @throws ManagedLedgerException
      */
-    ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties) throws InterruptedException, ManagedLedgerException;
+    ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
+            throws InterruptedException, ManagedLedgerException;
 
     /**
      * Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
@@ -331,10 +335,11 @@
      * @param ctx
      *            opaque context
      */
-    void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, OpenCursorCallback callback, Object ctx);
+    void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
+                         OpenCursorCallback callback, Object ctx);
 
     /**
-     * Get a list of all the cursors reading from this ManagedLedger
+     * Get a list of all the cursors reading from this ManagedLedger.
      *
      * @return a list of cursors
      */
@@ -397,7 +402,7 @@
     CompletableFuture<Long> getEarliestMessagePublishTimeInBacklog();
 
     /**
-     * Return the size of all ledgers offloaded to 2nd tier storage
+     * Return the size of all ledgers offloaded to 2nd tier storage.
      */
     long getOffloadedSize();
 
@@ -535,7 +540,7 @@
     Position getLastConfirmedEntry();
 
     /**
-     * Signaling managed ledger that we can resume after BK write failure
+     * Signaling managed ledger that we can resume after BK write failure.
      */
     void readyToCreateNewLedger();
 
@@ -564,7 +569,7 @@
      * @param callback a callback which will be supplied with the newest properties in managedLedger.
      * @param ctx      a context object which will be passed to the callback on completion.
      **/
-    void asyncSetProperty(String key, String value, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+    void asyncSetProperty(String key, String value, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
 
     /**
      * Delete the property by key.
@@ -582,7 +587,7 @@
      * @param callback a callback which will be supplied with the newest properties in managedLedger.
      * @param ctx      a context object which will be passed to the callback on completion.
      */
-    void asyncDeleteProperty(String key, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+    void asyncDeleteProperty(String key, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
 
     /**
      * Update managed-ledger's properties.
@@ -600,17 +605,17 @@
      * @param callback   a callback which will be supplied with the newest properties in managedLedger.
      * @param ctx        a context object which will be passed to the callback on completion.
      */
-    void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.UpdatePropertiesCallback callback,
+    void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePropertiesCallback callback,
         Object ctx);
 
     /**
-     * Trim consumed ledgers in background
+     * Trim consumed ledgers in background.
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
 
     /**
-     * Roll current ledger if it is full
+     * Roll current ledger if it is full.
      */
     @Deprecated
     void rollCurrentLedgerIfFull();
@@ -638,7 +643,7 @@
     CompletableFuture<Void> asyncTruncate();
 
     /**
-     * Get managed ledger internal stats
+     * Get managed ledger internal stats.
      *
      * @param includeLedgerMetadata the flag to control managed ledger internal stats include ledger metadata
      * @return the future of managed ledger internal stats
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 2f4e098..e18e337 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -19,20 +19,16 @@
 package org.apache.bookkeeper.mledger;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.base.Charsets;
 import java.time.Clock;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.api.DigestType;
-
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
 
@@ -73,7 +69,7 @@
     private boolean unackedRangesOpenCacheSetEnabled = true;
     private Class<? extends EnsemblePlacementPolicy>  bookKeeperEnsemblePlacementPolicyClassName;
     private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
-    private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+    private LedgerOffloader ledgerOffloader = NullLedgerOffloader.instance_;
     private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
     private ManagedLedgerInterceptor managedLedgerInterceptor;
@@ -391,7 +387,7 @@
      * <p>
      * A retention time of 0 (default) will make data to be deleted immediately.
      * <p>
-     * A retention time of -1 , means to have an unlimited retention time.
+     * A retention time of -1, means to have an unlimited retention time.
      *
      * @param retentionTime
      *            duration for which messages should be retained
@@ -421,7 +417,7 @@
      * <p>
      * A retention size of 0 (default) will make data to be deleted immediately.
      * <p>
-     * A retention size of -1 , means to have an unlimited retention size.
+     * A retention size of -1, means to have an unlimited retention size.
      *
      * @param retentionSizeInMB
      *            quota for message retention
@@ -510,7 +506,7 @@
     }
 
     /**
-     * Get clock to use to time operations
+     * Get clock to use to time operations.
      *
      * @return a clock
      */
@@ -519,7 +515,7 @@
     }
 
     /**
-     * Set clock to use for time operations
+     * Set clock to use for time operations.
      *
      * @param clock the clock to use
      */
@@ -530,7 +526,7 @@
 
     /**
      *
-     * Ledger-Op (Create/Delete) timeout
+     * Ledger-Op (Create/Delete) timeout.
      *
      * @return
      */
@@ -539,7 +535,7 @@
     }
 
     /**
-     * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure
+     * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure.
      *
      * @param metadataOperationsTimeoutSeconds
      */
@@ -549,7 +545,7 @@
     }
 
     /**
-     * Ledger read-entry timeout
+     * Ledger read-entry timeout.
      *
      * @return
      */
@@ -558,7 +554,7 @@
     }
 
     /**
-     * Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting
+     * Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting.
      * readTimeoutSeconds <= 0)
      *
      * @param readEntryTimeoutSeconds
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 682ce90..1b6d80d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -91,7 +91,7 @@
             Supplier<Boolean> mlOwnershipChecker, Object ctx);
 
     /**
-     * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger
+     * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
      *
      * @param managedLedgerName
      * @param startPosition
@@ -103,7 +103,7 @@
             throws InterruptedException, ManagedLedgerException;
 
     /**
-     * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger
+     * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
      *
      * @param managedLedgerName
      * @param startPosition
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
index 202ecda..28454d6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
@@ -18,10 +18,9 @@
  */
 package org.apache.bookkeeper.mledger;
 
-import java.util.List;
-
 import com.google.common.base.Predicate;
 import com.google.common.collect.Range;
+import java.util.List;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -106,7 +105,8 @@
      * @throws InterruptedException
      * @throws ManagedLedgerException
      */
-    Position findNewestMatching(ManagedCursor.FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
+    Position findNewestMatching(ManagedCursor.FindPositionConstraint constraint, Predicate<Entry> condition)
+            throws InterruptedException, ManagedLedgerException;
 
     /**
      * Return the number of messages that this cursor still has to read.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
index df70188..4c26a93 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
@@ -20,7 +20,6 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Collections.reverseOrder;
-
 import com.google.common.collect.Lists;
 import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index 515cf2f..1a6986a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -21,17 +21,13 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
-
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
-
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -301,8 +297,7 @@
                         try {
                             // We got the entries, we need to transform them to a List<> type
                             long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn
-                                = Lists.newArrayListWithExpectedSize(entriesToRead);
+                            final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
                             for (LedgerEntry e : ledgerEntries) {
                                 EntryImpl entry = EntryCacheManager.create(e, interceptor);
 
@@ -319,7 +314,7 @@
                         }
                     }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
                         if (exception instanceof BKException
-                                && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
+                                && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
                             callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
                         } else {
                             ml.invalidateLedgerHandle(lh);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 9a1d631..ab71ea5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -20,7 +20,6 @@
 
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.primitives.Longs;
@@ -242,7 +241,8 @@
                                 ml.getMBean().addReadEntriesSample(1, returnEntry.getLength());
                                 callback.readEntryComplete(returnEntry, ctx);
                             } else {
-                                callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx);
+                                callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
+                                        ctx);
                             }
                         } finally {
                             ledgerEntries.close();
@@ -273,8 +273,8 @@
             processorHandle = interceptor
                     .processPayloadBeforeEntryCache(duplicateBuffer);
             if (processorHandle != null) {
-                ledgerEntry  = LedgerEntryImpl.create(ledgerEntry.getLedgerId(),ledgerEntry.getEntryId(),
-                        ledgerEntry.getLength(),processorHandle.getProcessedPayload());
+                ledgerEntry  = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
+                        ledgerEntry.getLength(), processorHandle.getProcessedPayload());
             } else {
                 duplicateBuffer.release();
             }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index e25b52a..3050309 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -20,18 +20,17 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCounted;
-
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
 
-public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
+public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>,
+        ReferenceCounted {
 
     private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
         @Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 5f85b6e..8049220 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -19,29 +19,26 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import com.google.common.collect.ImmutableMap;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
 /**
  * Utilities for managing BookKeeper Ledgers custom metadata.
  */
 public final class LedgerMetadataUtils {
 
     private static final String METADATA_PROPERTY_APPLICATION = "application";
-    private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR
-            = "pulsar".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = "pulsar".getBytes(StandardCharsets.UTF_8);
 
     private static final String METADATA_PROPERTY_COMPONENT = "component";
-    private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER
-            = "managed-ledger".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER
-            = "compacted-ledger".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA
-            = "schema".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
+            "managed-ledger".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
+            "compacted-ledger".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = "schema".getBytes(StandardCharsets.UTF_8);
 
     private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger";
     private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
@@ -80,7 +77,8 @@
      * @param compactedToMessageId last mesasgeId.
      * @return an immutable map which describes the compacted ledger
      */
-    public static Map<String, byte[]> buildMetadataForCompactedLedger(String compactedTopic, byte[] compactedToMessageId) {
+    public static Map<String, byte[]> buildMetadataForCompactedLedger(String compactedTopic,
+                                                                      byte[] compactedToMessageId) {
         return ImmutableMap.of(
                 METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
                 METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER,
@@ -90,7 +88,7 @@
     }
 
     /**
-     * Build additional metadata for a Schema
+     * Build additional metadata for a Schema.
      *
      * @param schemaId id of the schema
      * @return an immutable map which describes the schema
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index c631cdc..65d2541 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -199,7 +199,7 @@
     }
 
     /**
-     *  Check whether there are any cursors
+     *  Check whether there are any cursors.
      * @return true is there are no cursors and false if there are
      */
     public boolean isEmpty() {
@@ -219,7 +219,7 @@
     }
 
     /**
-     * Check whether that are any durable cursors
+     * Check whether that are any durable cursors.
      * @return true if there are durable cursors and false if there are not
      */
     public boolean hasDurableCursors() {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5c1455b..70f0efe 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -26,7 +26,6 @@
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Predicate;
@@ -38,9 +37,7 @@
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
-
 import io.netty.util.concurrent.FastThreadLocal;
-
 import java.time.Clock;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -62,7 +59,6 @@
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -80,13 +76,13 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
-import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
@@ -125,8 +121,9 @@
     // keeps sample of last read-position for validation and monitoring if read-position is not moving forward.
     protected volatile PositionImpl statsLastReadPosition;
 
-    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, MarkDeleteEntry.class, "lastMarkDeleteEntry");
+    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry>
+            LAST_MARK_DELETE_ENTRY_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
+            MarkDeleteEntry.class, "lastMarkDeleteEntry");
     protected volatile MarkDeleteEntry lastMarkDeleteEntry;
 
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
@@ -183,7 +180,7 @@
 
     private boolean alwaysInactive = false;
 
-    /** used temporary variables to {@link #getNumIndividualDeletedEntriesToSkip(long)} **/
+    /** used temporary variables to {@link #getNumIndividualDeletedEntriesToSkip(long)}. **/
     private static final FastThreadLocal<Long> tempTotalEntriesToSkip = new FastThreadLocal<>();
     private static final FastThreadLocal<Long> tempDeletedMessages = new FastThreadLocal<>();
     private static final FastThreadLocal<PositionImpl> tempStartPosition = new FastThreadLocal<>();
@@ -499,7 +496,8 @@
         }
     }
 
-    private void recoverBatchDeletedIndexes (List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
+    private void recoverBatchDeletedIndexes (
+            List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
         lock.writeLock().lock();
         try {
             this.batchDeletedIndexes.clear();
@@ -594,7 +592,7 @@
                 counter.countDown();
             }
 
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         counter.await();
 
@@ -726,7 +724,7 @@
                 counter.countDown();
             }
 
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         counter.await();
 
@@ -942,7 +940,8 @@
     }
 
     @Override
-    public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException {
+    public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition)
+            throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
             ManagedLedgerException exception = null;
@@ -1031,9 +1030,9 @@
     }
 
     protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
-        if (position.equals(PositionImpl.earliest)) {
+        if (position.equals(PositionImpl.EARLIEST)) {
             position = ledger.getFirstPosition();
-        } else if (position.equals(PositionImpl.latest)) {
+        } else if (position.equals(PositionImpl.LATEST)) {
             position = ledger.getLastPosition().getNext();
         }
 
@@ -1070,9 +1069,8 @@
                                 Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
                     }
                     markDeletePosition = newMarkDeletePosition;
-                    lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ?
-                            getProperties() : Collections.emptyMap(),
-                            null, null);
+                    lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
+                            ? getProperties() : Collections.emptyMap(), null, null);
                     individualDeletedMessages.clear();
                     if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
                         batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
@@ -1121,7 +1119,8 @@
 
         };
 
-        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
+        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
+                new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 finalCallback.operationComplete();
@@ -1143,15 +1142,15 @@
         ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
             PositionImpl actualPosition = newPosition;
 
-            if (!ledger.isValidPosition(actualPosition) &&
-                !actualPosition.equals(PositionImpl.earliest) &&
-                !actualPosition.equals(PositionImpl.latest)) {
+            if (!ledger.isValidPosition(actualPosition)
+                    && !actualPosition.equals(PositionImpl.EARLIEST)
+                    && !actualPosition.equals(PositionImpl.LATEST)) {
                 actualPosition = ledger.getNextValidPosition(actualPosition);
 
                 if (actualPosition == null) {
                     // next valid position would only return null when newPos
                     // is larger than all available positions, then it's latest in effect.
-                    actualPosition = PositionImpl.latest;
+                    actualPosition = PositionImpl.LATEST;
                 }
             }
 
@@ -1631,8 +1630,8 @@
                 // read position forward
                 PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
-                            currentReadPosition, newReadPosition, markDeletePosition);
+                    log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
+                            ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
                 }
                 return newReadPosition;
             } else {
@@ -1683,7 +1682,7 @@
                 batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
                 newPosition = ledger.getPreviousPosition(newPosition);
             }
-            Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, newPosition);
+            Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition);
             subMap.values().forEach(BitSetRecyclable::recycle);
             subMap.clear();
         } else if (newPosition.ackSet != null) {
@@ -1707,9 +1706,8 @@
                         ledger.getName(), markDeletePosition, newPosition);
             } else {
                 if (log.isDebugEnabled()) {
-                    log.debug(
-                            "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
-                            ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
+                    log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
+                             + " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
                 }
                 callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
                 return;
@@ -1800,7 +1798,9 @@
                     individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
                             mdEntry.newPosition.getEntryId());
                     if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
-                        Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true);
+                        Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
+                                false, PositionImpl.get(mdEntry.newPosition.getLedgerId(),
+                                mdEntry.newPosition.getEntryId()), true);
                         subMap.values().forEach(BitSetRecyclable::recycle);
                         subMap.clear();
                     }
@@ -1935,8 +1935,8 @@
                 if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
                     if (log.isDebugEnabled()) {
                         log.debug(
-                            "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
-                            ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
+                            "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} "
+                            + "for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
                     }
                     callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
                     return;
@@ -1962,11 +1962,11 @@
                             bitSetRecyclable.recycle();
                         }
                     }
-                    // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
-                    // the RangeSet recognize the "continuity" between adjacent Positions
+                    // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
+                    // make the RangeSet recognize the "continuity" between adjacent Positions.
                     PositionImpl previousPosition = ledger.getPreviousPosition(position);
-                    individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
-                        position.getLedgerId(), position.getEntryId());
+                    individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
+                        previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
                     MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
 
                     if (log.isDebugEnabled()) {
@@ -1974,13 +1974,15 @@
                             individualDeletedMessages);
                     }
                 } else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
-                    BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));
+                    BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) ->
+                        BitSetRecyclable.create().resetWords(position.ackSet));
                     BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
                     bitSet.and(givenBitSet);
                     givenBitSet.recycle();
                     if (bitSet.isEmpty()) {
                         PositionImpl previousPosition = ledger.getPreviousPosition(position);
-                        individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
+                        individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
+                            previousPosition.getEntryId(),
                             position.getLedgerId(), position.getEntryId());
                         MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
                         BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
@@ -2082,8 +2084,8 @@
                 log.debug("[{}] [{}] Filtering entries {} - alreadyDeleted: {}", ledger.getName(), name, entriesRange,
                         individualDeletedMessages);
             }
-            if (individualDeletedMessages.isEmpty() || individualDeletedMessages.span() == null ||
-                    !entriesRange.isConnected(individualDeletedMessages.span())) {
+            if (individualDeletedMessages.isEmpty() || individualDeletedMessages.span() == null
+                    || !entriesRange.isConnected(individualDeletedMessages.span())) {
                 // There are no individually deleted messages in this entry list, no need to perform filtering
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] No filtering needed for entries {}", ledger.getName(), name, entriesRange);
@@ -2330,8 +2332,8 @@
                                             public void operationFailed(MetaStoreException e) {
                                                 if (log.isDebugEnabled()) {
                                                     log.debug(
-                                                            "[{}] Failed to refresh cursor metadata-version for {} due to {}",
-                                                            ledger.name, name, e.getMessage());
+                                                            "[{}] Failed to refresh cursor metadata-version for {} due "
+                                                            + "to {}", ledger.name, name, e.getMessage());
                                                 }
                                             }
                                         });
@@ -2548,13 +2550,14 @@
     private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
         lock.readLock().lock();
         try {
-            if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
+            if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null
+                    || batchDeletedIndexes.isEmpty()) {
                 return Collections.emptyList();
             }
             MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
                     .newBuilder();
-            MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo
-                    .newBuilder();
+            MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats
+                    .BatchedEntryDeletionIndexInfo.newBuilder();
             List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
             Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
             while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
@@ -2624,8 +2627,8 @@
                     public void operationComplete(Void result, Stat stat) {
                         if (log.isDebugEnabled()) {
                             log.debug(
-                                    "[{}][{}] Updated cursor in meta store after previous failure in ledger at position {}",
-                                    ledger.getName(), name, position);
+                                    "[{}][{}] Updated cursor in meta store after previous failure in ledger at position"
+                                    + " {}", ledger.getName(), name, position);
                         }
                         mbean.persistToZookeeper(true);
                         callback.operationComplete();
@@ -2645,8 +2648,8 @@
 
     boolean shouldCloseLedger(LedgerHandle lh) {
         long now = clock.millis();
-        if (ledger.factory.isMetadataServiceAvailable() &&
-                (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
+        if (ledger.factory.isMetadataServiceAvailable()
+                && (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
                 || lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
                 && (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
             // It's safe to modify the timestamp since this method will be only called from a callback, implying that
@@ -2896,7 +2899,8 @@
     public boolean isMessageDeleted(Position position) {
         checkArgument(position instanceof PositionImpl);
         return individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(),
-                ((PositionImpl) position).getEntryId()) || ((PositionImpl) position).compareTo(markDeletePosition) <= 0 ;
+                ((PositionImpl) position).getEntryId())
+                || ((PositionImpl) position).compareTo(markDeletePosition) <= 0;
     }
 
     //this method will return a copy of the position's ack set
@@ -2925,7 +2929,8 @@
      * @return next available position
      */
     public PositionImpl getNextAvailablePosition(PositionImpl position) {
-        Range<PositionImpl> range = individualDeletedMessages.rangeContaining(position.getLedgerId(), position.getEntryId());
+        Range<PositionImpl> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
+                position.getEntryId());
         if (range != null) {
             PositionImpl nextPosition = range.upperEndpoint().getNext();
             return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext();
@@ -3052,7 +3057,7 @@
             return 1;
         }
 
-        int maxEntriesBasedOnSize = (int)(maxSizeBytes / avgEntrySize);
+        int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
         if (maxEntriesBasedOnSize < 1) {
             // We need to read at least one entry
             return 1;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index e176181..49728ec 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -111,7 +111,7 @@
     private volatile boolean closed;
 
     /**
-     * Keep a flag to indicate whether we're currently connected to the metadata service
+     * Keep a flag to indicate whether we're currently connected to the metadata service.
      */
     @Getter
     private boolean metadataServiceAvailable;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fc42fe2..04bfd71 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -197,15 +197,15 @@
     private ScheduledFuture<?> checkLedgerRollTask;
 
     /**
-     * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
-     * version, we cannot have multiple concurrent updates.
+     * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store.
+     * Since we use the store version, we cannot have multiple concurrent updates.
      */
     private final CallbackMutex metadataMutex = new CallbackMutex();
     private final CallbackMutex trimmerMutex = new CallbackMutex();
 
     private final CallbackMutex offloadMutex = new CallbackMutex();
     private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
-            .completedFuture(PositionImpl.latest);
+            .completedFuture(PositionImpl.LATEST);
     private volatile LedgerHandle currentLedger;
     private long currentLedgerEntries = 0;
     private long currentLedgerSize = 0;
@@ -251,8 +251,8 @@
         startIncluded, startExcluded
     }
 
-    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater
-            .newUpdater(ManagedLedgerImpl.class, State.class, "state");
+    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
     protected volatile State state = null;
 
     private final OrderedScheduler scheduledExecutor;
@@ -270,17 +270,18 @@
 
     // last read-operation's callback to check read-timeout on it.
     private volatile ReadEntryCallbackWrapper lastReadCallback = null;
-    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
+    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper>
+            LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
             .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
 
     /**
-     * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
+     * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is.
      * created asynchronously and hence there is no ready ledger to write into.
      */
     final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>();
 
     /**
-     * This variable is used for testing the tests
+     * This variable is used for testing the tests.
      * {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
      */
     @VisibleForTesting
@@ -551,7 +552,7 @@
                     // Lazily recover cursors by put them to uninitializedCursors map.
                     for (final String cursorName : consumers) {
                         if (log.isDebugEnabled()) {
-                            log.debug("[{}] Recovering cursor {} lazily" , name, cursorName);
+                            log.debug("[{}] Recovering cursor {} lazily", name, cursorName);
                         }
                         final ManagedCursorImpl cursor;
                         cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName);
@@ -561,8 +562,8 @@
                         cursor.recover(new VoidCallback() {
                             @Override
                             public void operationComplete() {
-                                log.info("[{}] Lazy recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
-                                        cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
+                                log.info("[{}] Lazy recovery for cursor {} completed. pos={} -- todo={}", name,
+                                        cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
                                 cursor.setActive();
                                 synchronized (ManagedLedgerImpl.this) {
                                     cursors.add(cursor);
@@ -643,7 +644,8 @@
     }
 
     @Override
-    public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException {
+    public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
+            ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         // Result list will contain the status exception and the resulting
         // position
@@ -690,8 +692,8 @@
     }
 
     @Override
-    public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, int length, final AddEntryCallback callback,
-                              final Object ctx) {
+    public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, int length,
+                              final AddEntryCallback callback, final Object ctx) {
         ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length);
         asyncAddEntry(buffer, numberOfMessages, callback, ctx);
     }
@@ -757,9 +759,8 @@
             if (State.CreatingLedger == state) {
                 long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
                 if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) {
-                    log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" +
-                        " and creation timeout task didn't kick in as well. Force to fail the create ledger operation.",
-                            name, elapsedMs);
+                    log.info("[{}] Ledger creation was initiated {} ms ago but it never completed and creation timeout"
+                        + " task didn't kick in as well. Force to fail the create ledger operation.", name, elapsedMs);
                     this.createComplete(Code.TimeoutException, null, null);
                 }
             }
@@ -894,7 +895,8 @@
         }
 
         if (uninitializedCursors.containsKey(cursorName)) {
-            uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx)).exceptionally(ex -> {
+            uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx))
+                    .exceptionally(ex -> {
                 callback.openCursorFailed((ManagedLedgerException) ex, ctx);
                 return null;
             });
@@ -1032,13 +1034,14 @@
     }
 
     @Override
-    public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException {
+    public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName)
+            throws ManagedLedgerException {
         return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false);
     }
 
     @Override
-    public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition,
-                                             boolean isReadCompacted)
+    public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
+                                             InitialPosition initialPosition, boolean isReadCompacted)
             throws ManagedLedgerException {
         Objects.requireNonNull(cursorName, "cursor name can't be null");
         checkManagedLedgerIsOpen();
@@ -1469,8 +1472,8 @@
                     metadataMutex.unlock();
                     updateLedgersIdsComplete(stat);
                     synchronized (ManagedLedgerImpl.this) {
-                        mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
-                                TimeUnit.MILLISECONDS);
+                        mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
+                                - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
                     }
 
                     // May need to update the cursor position
@@ -1501,7 +1504,8 @@
                             // Return ManagedLedgerFencedException to addFailed callback
                             // to indicate that the ledger is now fenced and topic needs to be closed
                             clearPendingAddEntries(new ManagedLedgerFencedException(e));
-                            // Do not need to unlock ledgersListMutex here because we are going to close to topic anyways
+                            // Do not need to unlock ledgersListMutex here because we are going to close to topic
+                            // anyways
                             return;
                         }
                     }
@@ -1550,7 +1554,8 @@
                 // If op is used by another ledger handle, we need to close it and create a new one
                 if (existsOp.ledger != null) {
                     existsOp.close();
-                    existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
+                    existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data,
+                            existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
                 }
                 existsOp.setLedger(currentLedger);
                 pendingAddEntries.add(existsOp);
@@ -1628,7 +1633,7 @@
     }
 
     synchronized void createLedgerAfterClosed() {
-        if(isNeededCreateNewLedgerAfterCloseLedger()) {
+        if (isNeededCreateNewLedgerAfterCloseLedger()) {
             log.info("[{}] Creating a new ledger", name);
             STATE_UPDATER.set(this, State.CreatingLedger);
             this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
@@ -1654,9 +1659,8 @@
             currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
                 @Override
                 public void closeComplete(int rc, LedgerHandle lh, Object o) {
-                    checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",
-                            currentLedger.getId(),
-                            lh.getId());
+                    checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with "
+                                  + "acked ledgerId %s", currentLedger.getId(), lh.getId());
 
                     if (rc == BKException.Code.OK) {
                         log.debug("Successfully closed ledger {}", lh.getId());
@@ -1692,7 +1696,8 @@
                         future.complete(null);
                         return;
                     }
-                    log.info("[{}] Unable to find position for predicate {}. Use the first position {} instead.", name, predicate, startPosition);
+                    log.info("[{}] Unable to find position for predicate {}. Use the first position {} instead.", name,
+                            predicate, startPosition);
                 } else {
                     finalPosition = getNextValidPosition((PositionImpl) position);
                 }
@@ -1700,7 +1705,8 @@
             }
 
             @Override
-            public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
+            public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
+                                        Object ctx) {
                 log.warn("[{}] Unable to find position for predicate {}.", name, predicate);
                 future.complete(null);
             }
@@ -1750,7 +1756,8 @@
             }
 
             // Get a ledger handle to read from
-            getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex -> {
+            getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex
+                    -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
                         ex.getMessage());
                 opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
@@ -1869,7 +1876,8 @@
         if (position.getLedgerId() == currentLedger.getId()) {
             asyncReadEntry(currentLedger, position, callback, ctx);
         } else if (ledgers.containsKey(position.getLedgerId())) {
-            getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback, ctx)).exceptionally(ex -> {
+            getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback,
+                    ctx)).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
                 callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
                 return null;
@@ -1978,8 +1986,8 @@
         long ledgerId;
         long entryId;
         volatile long readOpCount = -1;
-        private static final AtomicLongFieldUpdater<ReadEntryCallbackWrapper> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
-                .newUpdater(ReadEntryCallbackWrapper.class, "readOpCount");
+        private static final AtomicLongFieldUpdater<ReadEntryCallbackWrapper> READ_OP_COUNT_UPDATER =
+                AtomicLongFieldUpdater.newUpdater(ReadEntryCallbackWrapper.class, "readOpCount");
         volatile long createdTime = -1;
         volatile Object cntx;
 
@@ -2190,8 +2198,9 @@
     PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
         if (null == ledgerId) {
-            opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
-                    "least key greater than or equal to the given key, or null if there is no such key"), null);
+            opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key"
+                    + ") method is used to return the least key greater than or equal to the given key, "
+                    + "or null if there is no such key"), null);
         }
 
         if (ledgerId != position.getLedgerId()) {
@@ -2238,8 +2247,8 @@
 
             if (currPointedLedger != null) {
                 if (nextPointedLedger != null) {
-                    if (lastAckedPosition.getEntryId() != -1 &&
-                            lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
+                    if (lastAckedPosition.getEntryId() != -1
+                            && lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
                         lastAckedPosition = new PositionImpl(nextPointedLedger.getLedgerId(), -1);
                     }
                 } else {
@@ -2263,7 +2272,7 @@
     }
 
     private void trimConsumedLedgersInBackground() {
-        trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        trimConsumedLedgersInBackground(Futures.nullPromise_);
     }
 
     @Override
@@ -2276,12 +2285,13 @@
     }
 
     private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
-        scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS);
+        scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100,
+                TimeUnit.MILLISECONDS);
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
         if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                && config.getLedgerOffloader() != NullLedgerOffloader.instance_
                 && config.getLedgerOffloader().getOffloadPolicies() != null
                 && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
                 && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
@@ -2305,10 +2315,12 @@
                 });
 
             if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                    && config.getLedgerOffloader() != NullLedgerOffloader.instance_
                     && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes();
+                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
+                    != null) {
+                long threshold = config.getLedgerOffloader().getOffloadPolicies()
+                        .getManagedLedgerOffloadThresholdInBytes();
 
                 long sizeSummed = 0;
                 long alreadyOffloadedSize = 0;
@@ -2341,7 +2353,7 @@
                             name, sizeSummed, alreadyOffloadedSize, threshold);
                 }
 
-                offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
+                offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
             }
         }
     }
@@ -2389,7 +2401,7 @@
         List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
         List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
         Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                && config.getLedgerOffloader() != NullLedgerOffloader.instance_
                 ? config.getLedgerOffloader().getOffloadPolicies()
                 : null);
         synchronized (this) {
@@ -2502,8 +2514,8 @@
             for (LedgerInfo ls : ledgersToDelete) {
                 if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
                     // this info is relevant because the lastMessageId won't be available anymore
-                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name,
-                            ls.getLedgerId(), currentLastConfirmedEntry);
+                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+                             + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
                 }
 
                 invalidateReadHandle(ls.getLedgerId());
@@ -2564,11 +2576,12 @@
 
     /**
      * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
-     * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark delete position
-     * happens to be the last entry in a ledger.  If the ledger is deleted, then subsequent calculations for backlog
-     * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to fetch
-     * the ledger info of a deleted ledger.  Thus, we need to update the mark delete position to the "-1" entry of the first ledger
-     * that is not marked for deletion.
+     * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark
+     * delete position happens to be the last entry in a ledger.  If the ledger is deleted, then subsequent
+     * calculations for backlog
+     * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to
+     * fetch the ledger info of a deleted ledger. Thus, we need to update the mark delete position to the "-1" entry
+     * of the first ledger that is not marked for deletion.
      * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
      * entries and the stats are reported correctly.
      */
@@ -2578,16 +2591,19 @@
         }
 
         // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
-        // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results
+        // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
+        // incorrect results
         long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
         PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
 
         cursors.forEach(cursor -> {
             // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
-            // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
+            // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be
+            // larger than the last add confirmed
             if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
-                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
-                    && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
+                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger()
+                    .getLastConfirmedEntry()) <= 0 && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl
+                    && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
                 cursor.asyncMarkDelete(highestPositionToDelete, cursor.getProperties(), new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
@@ -2718,7 +2734,8 @@
                 log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
             } else if (rc != BKException.Code.OK) {
                 log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
-                scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
+                scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)),
+                        DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Deleted ledger {}", name, ledgerId);
@@ -2816,14 +2833,15 @@
     @Override
     public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
         PositionImpl requestOffloadTo = (PositionImpl) pos;
-        if (!isValidPosition(requestOffloadTo) &&
+        if (!isValidPosition(requestOffloadTo)
                 // Also consider the case where the last ledger is currently
                 // empty. In this the passed position is not technically
                 // "valid", per the above check, given that it's not written
                 // yes, but it will be valid for the logic here
-                !(requestOffloadTo.getLedgerId() == currentLedger.getId()
+                && !(requestOffloadTo.getLedgerId() == currentLedger.getId()
                         && requestOffloadTo.getEntryId() == 0)) {
-            log.warn("[{}] Cannot start offload at position {} - LastConfirmedEntry: {}", name, pos, lastConfirmedEntry);
+            log.warn("[{}] Cannot start offload at position {} - LastConfirmedEntry: {}", name, pos,
+                    lastConfirmedEntry);
             callback.offloadFailed(new InvalidCursorPositionException("Invalid position for offload: " + pos), ctx);
             return;
         }
@@ -2919,7 +2937,7 @@
                 .thenCompose((ignore) -> {
                         return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
                                                                        TimeUnit.SECONDS.toHours(1)).limit(10),
-                                           FAIL_ON_CONFLICT,
+                                            failOnConflict_,
                                            () -> completeLedgerInfoForOffloaded(ledgerId, uuid),
                                            scheduledExecutor, name)
                             .whenComplete((ignore2, exception) -> {
@@ -2936,7 +2954,8 @@
                 .whenComplete((ignore, exception) -> {
                         if (exception != null) {
                             lastOffloadFailureTimestamp = System.currentTimeMillis();
-                            log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name, ledgerId, lastOffloadFailureTimestamp, exception);
+                            log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name,
+                                    ledgerId, lastOffloadFailureTimestamp, exception);
                             PositionImpl newFirstUnoffloaded = PositionImpl.get(ledgerId, 0);
                             if (newFirstUnoffloaded.compareTo(firstUnoffloaded) > 0) {
                                 newFirstUnoffloaded = firstUnoffloaded;
@@ -2954,7 +2973,8 @@
                                         errorToReport);
                         } else {
                             lastOffloadSuccessTimestamp = System.currentTimeMillis();
-                            log.info("[{}] offload for ledgerId {} timestamp {} succeed", name, ledgerId, lastOffloadSuccessTimestamp);
+                            log.info("[{}] offload for ledgerId {} timestamp {} succeed", name, ledgerId,
+                                    lastOffloadSuccessTimestamp);
                             lastOffloadLedgerId = ledgerId;
                             invalidateReadHandle(ledgerId);
                             offloadLoop(promise, ledgersToOffload, firstUnoffloaded, firstError);
@@ -2967,7 +2987,7 @@
         LedgerInfo transform(LedgerInfo oldInfo) throws ManagedLedgerException;
     }
 
-    static Predicate<Throwable> FAIL_ON_CONFLICT = (throwable) -> {
+    static Predicate<Throwable> failOnConflict_ = (throwable) -> {
         return !(throwable instanceof OffloadConflict) && Retries.NonFatalPredicate.test(throwable);
     };
 
@@ -3343,7 +3363,8 @@
         } catch (NullPointerException e) {
             next = lastConfirmedEntry.getNext();
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Can't find next valid position : {}, fall back to the next position of the last position : {}.", position, name, next, e);
+                log.debug("[{}] Can't find next valid position : {}, fall back to the next position of the last "
+                        + "position : {}.", position, name, next, e);
             }
         }
         return next;
@@ -3405,7 +3426,7 @@
     }
 
     /**
-     * Get the first position written in the managed ledger, alongside with the associated counter
+     * Get the first position written in the managed ledger, alongside with the associated counter.
      */
     Pair<PositionImpl, Long> getFirstPositionAndCounter() {
         PositionImpl pos;
@@ -3690,8 +3711,8 @@
         log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds",
             name, finalMetadata, config.getMetadataOperationsTimeoutSeconds());
         try {
-            bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
-                digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+            bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
@@ -3778,7 +3799,8 @@
         }
         ReadEntryCallbackWrapper callback = this.lastReadCallback;
         long readOpCount = callback != null ? callback.readOpCount : 0;
-        boolean timeout = callback != null && (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec);
+        boolean timeout = callback != null && (TimeUnit.NANOSECONDS
+                .toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec);
         if (readOpCount > 0 && timeout) {
             log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, this.lastReadCallback.ledgerId,
                     this.lastReadCallback.entryId, timeoutSec);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
index 9f1563b..aca8e4e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
@@ -20,7 +20,6 @@
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -52,7 +51,8 @@
      *            whether the managed ledger metadata should be created if it doesn't exist already
      * @throws MetaStoreException
      */
-    void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback<ManagedLedgerInfo> callback);
+    void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
+                              MetaStoreCallback<ManagedLedgerInfo> callback);
 
     /**
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 5ad62b2..ac2e746 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -19,19 +19,16 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
 import java.util.concurrent.RejectedExecutionException;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
@@ -140,9 +137,11 @@
 
         String path = PREFIX + ledgerName;
         store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion()))
-                .thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion), executor.chooseThread(ledgerName))
+                .thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion),
+                        executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+                            .operationFailed(getException(ex))));
                     return null;
                 });
     }
@@ -155,9 +154,11 @@
 
         String path = PREFIX + ledgerName;
         store.getChildren(path)
-                .thenAcceptAsync(cursors -> callback.operationComplete(cursors, null), executor.chooseThread(ledgerName))
+                .thenAcceptAsync(cursors -> callback.operationComplete(cursors, null), executor
+                        .chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+                            .operationFailed(getException(ex))));
                     return null;
                 });
     }
@@ -184,7 +185,8 @@
                     }
                 }, executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+                            .operationFailed(getException(ex))));
                     return null;
                 });
     }
@@ -213,9 +215,11 @@
         }
 
         store.put(path, content, Optional.of(expectedVersion))
-                .thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor.chooseThread(ledgerName))
+                .thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
+                        .chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+                            .operationFailed(getException(ex))));
                     return null;
                 });
     }
@@ -233,7 +237,8 @@
                     callback.operationComplete(null, null);
                 }, executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+                            .operationFailed(getException(ex))));
                     return null;
                 });
     }
@@ -251,7 +256,8 @@
                     callback.operationComplete(null, null);
                 }, executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
+                            .operationFailed(getException(ex))));
                     return null;
                 });
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 1d545bd..436d397 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -36,7 +36,8 @@
     private final boolean readCompacted;
 
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
-                         PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) {
+                         PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition,
+                         boolean isReadCompacted) {
         super(bookkeeper, config, ledger, cursorName);
         this.readCompacted = isReadCompacted;
 
@@ -53,7 +54,7 @@
                     initializeCursorPosition(ledger.getFirstPositionAndCounter());
                     break;
             }
-        } else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) {
+        } else if (startCursorPosition.getLedgerId() == PositionImpl.EARLIEST.getLedgerId()) {
             // Start from invalid ledger to read from first available entry
             recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
         } else {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index b9a4df5..d9da6f5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -21,7 +21,6 @@
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -30,7 +29,7 @@
  * Null implementation that throws an error on any invokation.
  */
 public class NullLedgerOffloader implements LedgerOffloader {
-    public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader();
+    public static NullLedgerOffloader instance_ = new NullLedgerOffloader();
 
     @Override
     public String getOffloadDriverName() {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
index cca1676..8716d5a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
@@ -40,8 +40,8 @@
     public final long beginLedgerId;
     public final long beginEntryId;
     public final String driverName;
-    volatile private long endLedgerId;
-    volatile private long endEntryId;
+    private volatile long endLedgerId;
+    private volatile long endEntryId;
     volatile boolean closed = false;
     public final Map<String, String> driverMetadata;
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index e390621..226ad04 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -18,10 +18,14 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -33,11 +37,6 @@
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.SafeRunnable;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import static com.google.common.base.Preconditions.checkArgument;
 
 /**
  * Handles the life-cycle of an addEntry() operation.
@@ -66,8 +65,8 @@
     private int dataLength;
     private ManagedLedgerInterceptor.PayloadProcessorHandle payloadProcessorHandle = null;
 
-    private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater
-            .newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
+    private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
     volatile State state;
 
     enum State {
@@ -77,7 +76,8 @@
         CLOSED
     }
 
-    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback,
+                                                  Object ctx) {
         OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -85,7 +85,8 @@
         return op;
     }
 
-    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages,
+                                                  AddEntryCallback callback, Object ctx) {
         OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
         op.numberOfMessages = numberOfMessages;
         if (log.isDebugEnabled()) {
@@ -94,7 +95,8 @@
         return op;
     }
 
-    private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+    private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data,
+                                                             AddEntryCallback callback, Object ctx) {
         OpAddEntry op = RECYCLER.get();
         op.ml = ml;
         op.ledger = null;
@@ -129,7 +131,8 @@
             addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
             lastInitTime = System.nanoTime();
             if (ml.getManagedLedgerInterceptor() != null) {
-                payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, duplicateBuffer);
+                payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this,
+                        duplicateBuffer);
                 if (payloadProcessorHandle != null) {
                     duplicateBuffer = payloadProcessorHandle.getProcessedPayload();
                 }
@@ -156,7 +159,8 @@
     public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) {
 
         if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) {
-            log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId);
+            log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(),
+                    entryId);
             OpAddEntry.this.recycle();
             return;
         }
@@ -268,7 +272,7 @@
     }
 
     /**
-     * Checks if add-operation is completed
+     * Checks if add-operation is completed.
      *
      * @return true if task is not already completed else returns false.
      */
@@ -370,12 +374,12 @@
     public String toString() {
         ManagedLedgerImpl ml = this.ml;
         LedgerHandle ledger = this.ledger;
-        return "OpAddEntry{" +
-                "mlName=" + ml != null ? ml.getName() : "null" +
-                ", ledgerId=" + ledger != null ? String.valueOf(ledger.getId()) : "null" +
-                ", entryId=" + entryId +
-                ", startTime=" + startTime +
-                ", dataLength=" + dataLength +
-                '}';
+        return "OpAddEntry{"
+                + "mlName=" + ml != null ? ml.getName() : "null"
+                + ", ledgerId=" + ledger != null ? String.valueOf(ledger.getId()) : "null"
+                + ", entryId=" + entryId
+                + ", startTime=" + startTime
+                + ", dataLength=" + dataLength
+                + '}';
     }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 91567fc..812c535 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -19,12 +19,10 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import com.google.common.base.Predicate;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
-
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
@@ -100,7 +98,8 @@
                 searchPosition = ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded);
                 if (lastPosition.compareTo(searchPosition) < 0) {
                     if (log.isDebugEnabled()) {
-                        log.debug("first position {} matches, last should be {}, but moving to lastPos {}", position, searchPosition, lastPosition);
+                        log.debug("first position {} matches, last should be {}, but moving to lastPos {}", position,
+                                searchPosition, lastPosition);
                     }
                     searchPosition = lastPosition;
                 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 91a6e26..210ad31 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -54,7 +54,7 @@
         op.callback = callback;
         op.entries = Lists.newArrayList();
         if (maxPosition == null) {
-            maxPosition = PositionImpl.latest;
+            maxPosition = PositionImpl.LATEST;
         }
         op.maxPosition = maxPosition;
         op.ctx = ctx;
@@ -134,8 +134,8 @@
     }
 
     void checkReadCompletion() {
-        if (entries.size() < count && cursor.hasMoreEntries() &&
-                ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
+        if (entries.size() < count && cursor.hasMoreEntries()
+                && ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
             // We still have more entries to read from the next ledger, schedule a new async operation
             if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
                 cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index 92baf42..2672c23 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -31,8 +31,8 @@
     protected long entryId;
     protected long[] ackSet;
 
-    public static final PositionImpl earliest = new PositionImpl(-1, -1);
-    public static final PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
+    public static final PositionImpl EARLIEST = new PositionImpl(-1, -1);
+    public static final PositionImpl LATEST = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
 
     public PositionImpl(PositionInfo pi) {
         this.ledgerId = pi.getLedgerId();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java
index ad95520..d2114ac 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java
@@ -18,10 +18,9 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import org.apache.bookkeeper.mledger.Position;
-
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import org.apache.bookkeeper.mledger.Position;
 
 public class PositionImplRecyclable extends PositionImpl implements Position {
 
@@ -35,7 +34,7 @@
     };
 
     private PositionImplRecyclable(Handle<PositionImplRecyclable> recyclerHandle) {
-        super(PositionImpl.earliest);
+        super(PositionImpl.EARLIEST);
         this.recyclerHandle = recyclerHandle;
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 7a0445e..8c20e0d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -19,9 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import com.google.common.collect.Range;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -36,7 +34,7 @@
                               PositionImpl startPosition, String cursorName) {
         super(bookkeeper, config, ledger, cursorName);
 
-        if (startPosition.equals(PositionImpl.earliest)) {
+        if (startPosition.equals(PositionImpl.EARLIEST)) {
             readPosition = ledger.getFirstPosition().getNext();
         } else {
             readPosition = startPosition;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 13cff9f..214c3af 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -18,10 +18,9 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import com.google.common.collect.Range;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-
-import com.google.common.collect.Range;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -121,7 +120,7 @@
 
     private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
         if (ledgers.isEmpty()) {
-            lastConfirmedEntry = PositionImpl.earliest;
+            lastConfirmedEntry = PositionImpl.EARLIEST;
         } else if (ledgers.lastEntry().getValue().getEntries() > 0) {
             // Last ledger has some of the entries
             lastConfirmedEntry = new PositionImpl(ledgers.lastKey(), ledgers.lastEntry().getValue().getEntries() - 1);
@@ -132,7 +131,7 @@
                 LedgerInfo li = ledgers.headMap(lastLedgerId, false).lastEntry().getValue();
                 lastConfirmedEntry = new PositionImpl(li.getLedgerId(), li.getEntries() - 1);
             } else {
-                lastConfirmedEntry = PositionImpl.earliest;
+                lastConfirmedEntry = PositionImpl.EARLIEST;
             }
         }
 
@@ -144,7 +143,8 @@
             this.getLedgerHandle(position.getLedgerId())
                     .thenAccept((ledger) -> asyncReadEntry(ledger, position, callback, ctx))
                     .exceptionally((ex) -> {
-                        log.error("[{}] Error opening ledger for reading at position {} - {}", this.name, position, ex.getMessage());
+                        log.error("[{}] Error opening ledger for reading at position {} - {}", this.name, position,
+                                ex.getMessage());
                         callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
                         return null;
                     });
@@ -152,7 +152,7 @@
 
     @Override
     public long getNumberOfEntries() {
-        return getNumberOfEntries(Range.openClosed(PositionImpl.earliest, getLastPosition()));
+        return getNumberOfEntries(Range.openClosed(PositionImpl.EARLIEST, getLastPosition()));
     }
 
     @Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
index 13fc530..cb5ba0c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
@@ -19,14 +19,13 @@
 package org.apache.bookkeeper.mledger.intercept;
 
 import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
 /**
  * Interceptor for ManagedLedger.
  * */
@@ -61,17 +60,17 @@
     void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);
 
     /**
-     * A reference handle to the payload processor
+     * A reference handle to the payload processor.
      */
     interface PayloadProcessorHandle {
         /**
-         * To obtain the processed data
+         * To obtain the processed data.
          * @return processed data
          */
         ByteBuf getProcessedPayload();
 
         /**
-         * To release resources used in processor, if any
+         * To release resources used in processor, if any.
          */
         void release();
     }
@@ -85,12 +84,13 @@
     }
 
     /**
-     * Intercept before payload gets written to ledger
+     * Intercept before payload gets written to ledger.
      * @param ledgerWriteOp OpAddEntry used to trigger ledger write.
      * @param dataToBeStoredInLedger data to be stored in ledger
      * @return handle to the processor
      */
-    default PayloadProcessorHandle processPayloadBeforeLedgerWrite(OpAddEntry ledgerWriteOp, ByteBuf dataToBeStoredInLedger){
+    default PayloadProcessorHandle processPayloadBeforeLedgerWrite(OpAddEntry ledgerWriteOp,
+                                                                   ByteBuf dataToBeStoredInLedger){
         return null;
     }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/package-info.java
new file mode 100644
index 0000000..58ef67b
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.intercept;
\ No newline at end of file
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
index f4064e7..4e019bd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -19,14 +19,12 @@
 package org.apache.bookkeeper.mledger.offload;
 
 import com.google.common.collect.Maps;
-
+import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-
-import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
@@ -109,7 +107,8 @@
                 .setAckQuorumSize(metadata.getAckQuorumSize())
                 .setEnsembleSize(metadata.getEnsembleSize())
                 .setLength(metadata.getLength())
-                .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : DataFormats.LedgerMetadataFormat.State.OPEN)
+                .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED :
+                        DataFormats.LedgerMetadataFormat.State.OPEN)
                 .setLastEntryId(metadata.getLastEntryId())
                 .setCtime(metadata.getCtime())
                 .setDigestType(BookKeeper.DigestType.toProtoDigestType(
@@ -130,7 +129,8 @@
     }
 
     public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) throws IOException {
-        DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
+        DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder()
+                .mergeFrom(bytes).build();
         LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
                 .withLastEntryId(ledgerMetadataFormat.getLastEntryId())
                 .withPassword(ledgerMetadataFormat.getPassword().toByteArray())
@@ -175,7 +175,8 @@
                 builder.withDigestType(DigestType.DUMMY);
                 break;
             default:
-                throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType());
+                throw new IllegalArgumentException("Unable to convert digest type "
+                        + ledgerMetadataFormat.getDigestType());
         }
 
         return builder.build();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 8eabccd..f229603 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -34,7 +34,7 @@
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 /**
- * Utils to load offloaders
+ * Utils to load offloaders.
  */
 @Slf4j
 public class OffloaderUtils {
@@ -48,7 +48,9 @@
      * @return the offloader class name
      * @throws IOException when fail to retrieve the pulsar offloader class
      */
-    static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath, String narExtractionDirectory) throws IOException {
+    static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath,
+                                                                            String narExtractionDirectory)
+            throws IOException {
         // need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case
         // LedgerOffloaderFactory is loaded by a classloader that is not the default classloader
         // as is the case for the pulsar presto plugin
@@ -73,8 +75,8 @@
                 try {
                     Object offloader = factoryClass.getDeclaredConstructor().newInstance();
                     if (!(offloader instanceof LedgerOffloaderFactory)) {
-                        throw new IOException("Class " + conf.getOffloaderFactoryClass() + " does not implement interface "
-                            + LedgerOffloaderFactory.class.getName());
+                        throw new IOException("Class " + conf.getOffloaderFactoryClass() + " does not implement "
+                                + "interface " + LedgerOffloaderFactory.class.getName());
                     }
                     loadFuture.complete((LedgerOffloaderFactory) offloader);
                 } catch (Throwable t) {
@@ -106,15 +108,18 @@
         }
     }
 
-    public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
+    public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory)
+            throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
+                narExtractionDirectory)) {
             String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
 
             return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class);
         }
     }
 
-    public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory) throws IOException {
+    public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory)
+            throws IOException {
         Path path = Paths.get(offloadersPath).toAbsolutePath();
         log.info("Searching for offloaders in {}", path);
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
index dfe4c31..caf8d5e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
@@ -39,8 +39,8 @@
                 return factory.getRight();
             }
         }
-        throw new IOException("No offloader found for driver '" + driverName + "'." +
-            " Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.");
+        throw new IOException("No offloader found for driver '" + driverName + "'."
+                + " Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.");
     }
 
     @Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
index e80c75b..307b7a5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
@@ -18,11 +18,10 @@
  */
 package org.apache.bookkeeper.mledger.offload;
 
-import lombok.extern.slf4j.Slf4j;
-
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Implementation of an Offloaders. The main purpose of this class is to
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/package-info.java
new file mode 100644
index 0000000..1be0b86
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
\ No newline at end of file
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java
index 6adc2ee..8b0e25f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java
@@ -23,16 +23,14 @@
  * The semantic was changed in https://github.com/netty/netty/commit/83a19d565064ee36998eb94f946e5a4264001065#diff-b9443e2689a46b3647fe6a8de0fdf3b2
  */
 
-import static io.netty.util.internal.ObjectUtil.checkPositive;
-
 import io.netty.util.IllegalReferenceCountException;
 import io.netty.util.ReferenceCounted;
-
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
  * Abstract base class for classes wants to implement {@link ReferenceCounted}.
  */
+
 public abstract class AbstractCASReferenceCounted implements ReferenceCounted {
     private static final AtomicIntegerFieldUpdater<AbstractCASReferenceCounted> refCntUpdater =
             AtomicIntegerFieldUpdater.newUpdater(AbstractCASReferenceCounted.class, "refCnt");
@@ -45,7 +43,7 @@
     }
 
     /**
-     * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
+     * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly.
      */
     protected final void setRefCnt(int refCnt) {
         refCntUpdater.set(this, refCnt);
@@ -58,7 +56,7 @@
 
     @Override
     public ReferenceCounted retain(int increment) {
-        return retain0(checkPositive(increment, "increment"));
+        return retain0(io.netty.util.internal.ObjectUtil.checkPositive(increment, "increment"));
     }
 
     private ReferenceCounted retain0(int increment) {
@@ -89,7 +87,7 @@
 
     @Override
     public boolean release(int decrement) {
-        return release0(checkPositive(decrement, "decrement"));
+        return release0(io.netty.util.internal.ObjectUtil.checkPositive(decrement, "decrement"));
     }
 
     private boolean release0(int decrement) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
index 9fd4a07..de0a5f5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
@@ -29,7 +29,7 @@
  */
 public class Futures {
 
-    public static CompletableFuture<Void> NULL_PROMISE = CompletableFuture.completedFuture(null);
+    public static CompletableFuture<Void> nullPromise_ = CompletableFuture.completedFuture(null);
 
     /**
      * Adapts a {@link CloseCallback} to a {@link CompletableFuture}.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
index 958f706..47d4bc2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
@@ -57,10 +57,10 @@
     //This method is compare two position which position is bigger than another one.
     //When the ledgerId and entryId in this position is same to another one and two position all have ack set, it will
     //compare the ack set next bit index is bigger than another one.
-    public static int compareToWithAckSet(PositionImpl currentPosition,PositionImpl otherPosition) {
-        if (currentPosition == null || otherPosition ==null) {
-            throw new IllegalArgumentException("Two positions can't be null! " +
-                    "current position : [" + currentPosition + "] other position : [" + otherPosition + "]");
+    public static int compareToWithAckSet(PositionImpl currentPosition, PositionImpl otherPosition) {
+        if (currentPosition == null || otherPosition == null) {
+            throw new IllegalArgumentException("Two positions can't be null! "
+                    + "current position : [" + currentPosition + "] other position : [" + otherPosition + "]");
         }
         int result = ComparisonChain.start().compare(currentPosition.getLedgerId(),
                 otherPosition.getLedgerId()).compare(currentPosition.getEntryId(), otherPosition.getEntryId())
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index a5786ad..a5b3133 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.collect.Lists;
 import io.netty.util.ReferenceCounted;
 import java.util.Collection;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
index dd77988..02021c5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import java.util.Arrays;
 import java.util.concurrent.atomic.LongAdder;
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 315eafa..364b2fb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -369,7 +369,7 @@
         EntryCache entryCache = cacheManager.getEntryCache(ml1);
 
         final CountDownLatch counter = new CountDownLatch(1);
-        entryCache.asyncReadEntry(lh, new PositionImpl(1L ,1L), new AsyncCallbacks.ReadEntryCallback() {
+        entryCache.asyncReadEntry(lh, new PositionImpl(1L,1L), new AsyncCallbacks.ReadEntryCallback() {
             public void readEntryComplete(Entry entry, Object ctx) {
                 Assert.assertNotEquals(entry, null);
                 entry.release();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index af30c9c..bbcfd9e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -414,13 +414,13 @@
         container.add(cursor2);
         assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));
 
-        // Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5
+        // Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5
         position = PositionImpl.get(5,6);
         container.cursorUpdated(cursor2, position);
         doReturn(position).when(cursor2).getReadPosition();
         assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
 
-        // Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6
+        // Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6
         position = PositionImpl.get(5,8);
         doReturn(position).when(cursor1).getReadPosition();
         container.cursorUpdated(cursor1, position);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2f80bc8..9a4d117 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -388,7 +388,7 @@
                 fail(exception.getMessage());
             }
 
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         counter.await();
     }
@@ -416,7 +416,7 @@
                 fail("async-call should not have failed");
             }
 
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         counter.await();
 
@@ -438,7 +438,7 @@
                 counter2.countDown();
             }
 
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         counter2.await();
     }
@@ -465,7 +465,7 @@
                 counter.countDown();
             }
 
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         counter.await();
     }
@@ -621,7 +621,7 @@
         final AtomicBoolean moveStatus = new AtomicBoolean(false);
 
         // reset to earliest
-        PositionImpl earliest = PositionImpl.earliest;
+        PositionImpl earliest = PositionImpl.EARLIEST;
         try {
             cursor.resetCursor(earliest);
             moveStatus.set(true);
@@ -659,7 +659,7 @@
         moveStatus.set(false);
 
         // reset to latest should point to the first non-exist entry in the last ledger
-        PositionImpl anotherLast = PositionImpl.latest;
+        PositionImpl anotherLast = PositionImpl.LATEST;
         try {
             cursor.resetCursor(anotherLast);
             moveStatus.set(true);
@@ -1757,7 +1757,7 @@
                 public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                     log.error("Error reading", exception);
                 }
-            }, null, PositionImpl.latest);
+            }, null, PositionImpl.LATEST);
         }
 
         ledger.addEntry("test".getBytes());
@@ -2439,17 +2439,17 @@
         for(int i = 0; i < 10; i++) {
             ledger.addEntry(("entry" + i).getBytes(Encoding));
         }
-        PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
-        PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
-        PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
-        PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+        PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1);
+        PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 2);
+        PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 5);
+        PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 6);
 
         c1.delete(Lists.newArrayList(p1, p2, p3, p4));
 
         assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p3.getLedgerId(),
                 p3.getEntryId() - 1), p4));
 
-        PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 8);
+        PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 8);
         c1.delete(p5);
 
         assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p5.getLedgerId(),
@@ -2466,10 +2466,10 @@
         for(int i = 0; i < 10; i++) {
             ledger.addEntry(("entry" + i).getBytes(Encoding));
         }
-        PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
-        PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
-        PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
-        PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+        PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 1);
+        PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 2);
+        PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 5);
+        PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 6);
 
         c1.delete(Lists.newArrayList(p1, p2, p3, p4));
 
@@ -2477,12 +2477,12 @@
         EntryImpl entry2 = EntryImpl.create(p2, ByteBufAllocator.DEFAULT.buffer(0));
         EntryImpl entry3 = EntryImpl.create(p3, ByteBufAllocator.DEFAULT.buffer(0));
         EntryImpl entry4 = EntryImpl.create(p4, ByteBufAllocator.DEFAULT.buffer(0));
-        EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 7,
+        EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 7,
                 ByteBufAllocator.DEFAULT.buffer(0));
         List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5);
         c1.trimDeletedEntries(entries);
         assertEquals(entries.size(), 1);
-        assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId() ,
+        assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId(),
                 markDeletedPosition.getEntryId() + 7));
     }
 
@@ -2634,7 +2634,7 @@
             public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 counter.countDown();
             }
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         assertTrue(c1.cancelPendingReadRequest());
 
@@ -2650,7 +2650,7 @@
             public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 counter2.countDown();
             }
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
 
         ledger.addEntry("entry-1".getBytes(Encoding));
 
@@ -3131,7 +3131,7 @@
     @Test
     void testNonDurableCursorActive() throws Exception {
         ManagedLedger ml = factory.open("testInactive");
-        ManagedCursor cursor = ml.newNonDurableCursor(PositionImpl.latest, "c1");
+        ManagedCursor cursor = ml.newNonDurableCursor(PositionImpl.LATEST, "c1");
 
         assertTrue(cursor.isActive());
 
@@ -3353,8 +3353,8 @@
         int sendNumber = 20;
         ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxPosition");
         ManagedCursor c = ledger.openCursor("c");
-        Position position = PositionImpl.earliest;
-        Position maxCanReadPosition = PositionImpl.earliest;
+        Position position = PositionImpl.EARLIEST;
+        Position maxCanReadPosition = PositionImpl.EARLIEST;
         for (int i = 0; i < sendNumber; i++) {
             if (i == readMaxNumber - 1) {
                 position = ledger.addEntry(new byte[1024]);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index 8d1d4ea..9dc88ef 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -147,7 +147,7 @@
             }
         }, null);
 
-        factory.asyncOpenReadOnlyCursor(ledgerName, PositionImpl.earliest, new ManagedLedgerConfig(),
+        factory.asyncOpenReadOnlyCursor(ledgerName, PositionImpl.EARLIEST, new ManagedLedgerConfig(),
                 new AsyncCallbacks.OpenReadOnlyCursorCallback() {
                     @Override
                     public void openReadOnlyCursorComplete(ReadOnlyCursor cursor, Object ctx) {
@@ -174,6 +174,6 @@
         Assert.assertThrows(ManagedLedgerException.ManagedLedgerFactoryClosedException.class,
                 () -> factory.open(ledgerName));
         Assert.assertThrows(ManagedLedgerException.ManagedLedgerFactoryClosedException.class,
-                () -> factory.openReadOnlyCursor(ledgerName, PositionImpl.earliest, new ManagedLedgerConfig()));
+                () -> factory.openReadOnlyCursor(ledgerName, PositionImpl.EARLIEST, new ManagedLedgerConfig()));
     }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java
index 2fb4964..85755db 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java
@@ -141,7 +141,7 @@
         assertTrue(ledger.isTerminated());
         assertEquals(lastPosition, p1);
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
 
         List<Entry> entries = c1.readEntries(10);
         assertEquals(entries.size(), 2);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d837651..79eab77 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -343,7 +343,7 @@
                                     public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                                         fail(exception.getMessage());
                                     }
-                                }, cursor, PositionImpl.latest);
+                                }, cursor, PositionImpl.LATEST);
                             }
 
                             @Override
@@ -2706,11 +2706,11 @@
         String ctxStr = "timeoutCtx";
         CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
         ReadHandle ledgerHandle = mock(ReadHandle.class);
-        doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.earliest.getLedgerId(),
-                PositionImpl.earliest.getEntryId());
+        doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.EARLIEST.getLedgerId(),
+                PositionImpl.EARLIEST.getEntryId());
 
         // (1) test read-timeout for: ManagedLedger.asyncReadEntry(..)
-        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest, new ReadEntryCallback() {
+        ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST, new ReadEntryCallback() {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 responseException1.set(null);
@@ -2729,7 +2729,7 @@
 
         // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
         AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
-        PositionImpl readPositionRef = PositionImpl.earliest;
+        PositionImpl readPositionRef = PositionImpl.EARLIEST;
         ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
         OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() {
 
@@ -2743,8 +2743,8 @@
                 responseException2.set(exception);
             }
 
-        }, null, PositionImpl.latest);
-        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
+        }, null, PositionImpl.LATEST);
+        ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
                 false, opReadEntry, ctxStr);
         retryStrategically((test) -> {
             return responseException2.get() != null;
@@ -3128,7 +3128,7 @@
         entries.forEach(Entry::release);
         // Now we update the cursors that are still subscribing to ledgers that has been consumed completely
         managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
-        managedLedger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
+        managedLedger.internalTrimConsumedLedgers(Futures.nullPromise_);
         ManagedLedgerImpl finalManagedLedger = managedLedger;
         Awaitility.await().untilAsserted(() -> {
             // We only have one empty ledger at last [{entries=0}]
@@ -3224,7 +3224,7 @@
         assertEquals(ledger.ledgerCache.size(), 2);
         cursor.clearBacklog();
         cursor2.clearBacklog();
-        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
         Awaitility.await().untilAsserted(() -> {
             assertEquals(ledger.ledgers.size(), 1);
             assertEquals(ledger.ledgerCache.size(), 0);
@@ -3256,7 +3256,7 @@
         assertEquals(ledger.ledgerCache.size(), 2);
         cursor.clearBacklog();
         cursor2.clearBacklog();
-        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
         Awaitility.await().untilAsserted(() -> {
             assertEquals(ledger.ledgers.size(), 3);
             assertEquals(ledger.ledgerCache.size(), 0);
@@ -3268,7 +3268,7 @@
         assertEquals(entryList.size(), 3);
         assertEquals(ledger.ledgerCache.size(), 2);
         cursor3.clearBacklog();
-        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        ledger.trimConsumedLedgersInBackground(Futures.nullPromise_);
         Awaitility.await().untilAsserted(() -> {
             assertEquals(ledger.ledgers.size(), 3);
             assertEquals(ledger.ledgerCache.size(), 0);
@@ -3307,7 +3307,7 @@
         ledgerOffloader = mock(NullLedgerOffloader.class);
         config.setLedgerOffloader(ledgerOffloader);
 
-        ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
+        ledger.internalTrimConsumedLedgers(Futures.nullPromise_);
         verify(ledgerOffloader, times(1)).getOffloadPolicies();
     }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 4c2944f..392f777 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -64,7 +64,7 @@
     void readFromEmptyLedger() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         List<Entry> entries = c1.readEntries(10);
         assertEquals(entries.size(), 0);
         entries.forEach(e -> e.release());
@@ -107,7 +107,7 @@
     void testZNodeBypassed() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         assertFalse(Iterables.isEmpty(ledger.getCursors()));
 
         c1.close();
@@ -123,8 +123,8 @@
         ManagedLedger ledger = factory.open("my_test_ledger",
                 new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
-        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
+        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
 
         ledger.addEntry("entry-1".getBytes(Encoding));
         ledger.addEntry("entry-2".getBytes(Encoding));
@@ -156,8 +156,8 @@
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)
                 .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
-        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
+        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
 
         ledger.addEntry("entry-1".getBytes(Encoding));
         ledger.addEntry("entry-2".getBytes(Encoding));
@@ -186,7 +186,7 @@
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)
                 .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
 
         ledger.close();
 
@@ -203,15 +203,15 @@
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)
                 .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         ledger.addEntry("dummy-entry-1".getBytes(Encoding));
-        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         ledger.addEntry("dummy-entry-2".getBytes(Encoding));
-        ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         ledger.addEntry("dummy-entry-3".getBytes(Encoding));
-        ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         ledger.addEntry("dummy-entry-4".getBytes(Encoding));
-        ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.LATEST);
 
         assertEquals(c1.getNumberOfEntries(), 4);
         assertTrue(c1.hasMoreEntries());
@@ -240,15 +240,15 @@
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)
                 .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
-        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c2 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         ledger.addEntry("dummy-entry-2".getBytes(Encoding));
-        ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c3 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
-        ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c4 = ledger.newNonDurableCursor(PositionImpl.LATEST);
         Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
-        ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.LATEST);
 
         assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
         assertEquals(c2.getNumberOfEntriesInBacklog(false), 3);
@@ -338,7 +338,7 @@
     void testResetCursor() throws Exception {
         ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
                 new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
-        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.LATEST);
         ledger.addEntry("dummy-entry-1".getBytes(Encoding));
         ledger.addEntry("dummy-entry-2".getBytes(Encoding));
         ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -362,7 +362,7 @@
     void testasyncResetCursor() throws Exception {
         ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
                 new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
-        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.LATEST);
         ledger.addEntry("dummy-entry-1".getBytes(Encoding));
         ledger.addEntry("dummy-entry-2".getBytes(Encoding));
         ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -395,7 +395,7 @@
     void rewind() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)
                 .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
         Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
         Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -446,7 +446,7 @@
     @Test(timeOut = 20000)
     void markDeleteSkippingMessage() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
-        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
         Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
         ledger.addEntry("dummy-entry-3".getBytes(Encoding));
@@ -542,7 +542,7 @@
     void testSingleDelete() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3)
                 .setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
-        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.latest);
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.LATEST);
 
         Position p1 = ledger.addEntry("entry1".getBytes());
         Position p2 = ledger.addEntry("entry2".getBytes());
@@ -588,7 +588,7 @@
         Position p3 = ledger.addEntry("entry-3".getBytes());
 
         Thread.sleep(300);
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         assertEquals(c1.getReadPosition(), p3);
         assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(5, -1));
     }
@@ -605,7 +605,7 @@
         /* Position p5 = */ ledger.addEntry("entry-5".getBytes());
         /* Position p6 = */ ledger.addEntry("entry-6".getBytes());
 
-        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         assertEquals(c1.getReadPosition(), p1);
         assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(3, -1));
         assertEquals(c1.getNumberOfEntries(), 6);
@@ -698,7 +698,7 @@
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
                 new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
         ManagedCursor c1 = ledger.openCursor("c1");
-        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
 
         assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
         assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
@@ -756,7 +756,7 @@
     void deleteNonDurableCursorWithName() throws Exception {
         ManagedLedger ledger = factory.open("deleteManagedLedgerWithNonDurableCursor");
 
-        ManagedCursor c = ledger.newNonDurableCursor(PositionImpl.earliest, "custom-name");
+        ManagedCursor c = ledger.newNonDurableCursor(PositionImpl.EARLIEST, "custom-name");
         assertEquals(Iterables.size(ledger.getCursors()), 1);
 
         ledger.deleteCursor(c.getName());
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index e761b04..bbaef69 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -93,7 +93,7 @@
         UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
                 ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
 
-        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         int i = 0;
         for (Entry e : cursor.readEntries(10)) {
             assertEquals(new String(e.getData()), "entry-" + i++);
@@ -163,7 +163,7 @@
         UUID secondLedgerUUID = new UUID(secondLedger.getOffloadContext().getUidMsb(),
                 secondLedger.getOffloadContext().getUidLsb());
 
-        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
         int i = 0;
         for (Entry e : cursor.readEntries(10)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index a7092e4..d0da8e6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -147,13 +147,13 @@
         assertEquals(ledger.getLedgersInfoAsList().size(), 3);
 
         try {
-            ledger.offloadPrefix(PositionImpl.earliest);
+            ledger.offloadPrefix(PositionImpl.EARLIEST);
             fail("Should have thrown an exception");
         } catch (ManagedLedgerException.InvalidCursorPositionException e) {
             // expected
         }
         try {
-            ledger.offloadPrefix(PositionImpl.latest);
+            ledger.offloadPrefix(PositionImpl.LATEST);
             fail("Should have thrown an exception");
         } catch (ManagedLedgerException.InvalidCursorPositionException e) {
             // expected
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java
index 05ddd1d..340acfe 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorTest.java
@@ -40,7 +40,7 @@
     @Test
     void notFound() throws Exception {
         try {
-            factory.openReadOnlyCursor("notFound", PositionImpl.earliest, new ManagedLedgerConfig());
+            factory.openReadOnlyCursor("notFound", PositionImpl.EARLIEST, new ManagedLedgerConfig());
             fail("Should have failed");
         } catch (ManagedLedgerNotFoundException e) {
             // Expected
@@ -59,7 +59,7 @@
             ledger.addEntry(("entry-" + i).getBytes());
         }
 
-        ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.earliest, new ManagedLedgerConfig());
+        ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.EARLIEST, new ManagedLedgerConfig());
 
         assertEquals(cursor.getNumberOfEntries(), N);
         assertTrue(cursor.hasMoreEntries());
@@ -78,7 +78,7 @@
         }
 
         // Open a new cursor
-        cursor = factory.openReadOnlyCursor("simple", PositionImpl.earliest, new ManagedLedgerConfig());
+        cursor = factory.openReadOnlyCursor("simple", PositionImpl.EARLIEST, new ManagedLedgerConfig());
 
         assertEquals(cursor.getNumberOfEntries(), 2 * N);
         assertTrue(cursor.hasMoreEntries());
@@ -114,7 +114,7 @@
             ledger.addEntry(("entry-" + i).getBytes());
         }
 
-        ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.earliest, new ManagedLedgerConfig());
+        ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.EARLIEST, new ManagedLedgerConfig());
 
         assertEquals(cursor.getNumberOfEntries(), N);
         assertTrue(cursor.hasMoreEntries());
@@ -138,7 +138,7 @@
             ledger.addEntry(("entry-" + i).getBytes());
         }
 
-        ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip-all", PositionImpl.earliest,
+        ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip-all", PositionImpl.EARLIEST,
                 new ManagedLedgerConfig());
 
         assertEquals(cursor.getNumberOfEntries(), N);
@@ -166,7 +166,7 @@
             ledger.addEntry(("entry-" + i).getBytes());
         }
 
-        ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.earliest, new ManagedLedgerConfig());
+        ReadOnlyCursor cursor = factory.openReadOnlyCursor("skip", PositionImpl.EARLIEST, new ManagedLedgerConfig());
 
         assertEquals(cursor.getNumberOfEntries(), N);
         assertTrue(cursor.hasMoreEntries());
@@ -188,7 +188,7 @@
     void empty() throws Exception {
         factory.open("empty", new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS));
 
-        ReadOnlyCursor cursor = factory.openReadOnlyCursor("empty", PositionImpl.earliest, new ManagedLedgerConfig());
+        ReadOnlyCursor cursor = factory.openReadOnlyCursor("empty", PositionImpl.EARLIEST, new ManagedLedgerConfig());
 
         assertEquals(cursor.getNumberOfEntries(), 0);
         assertFalse(cursor.hasMoreEntries());
@@ -206,7 +206,7 @@
             ledger.addEntry(("entry-" + i).getBytes());
         }
 
-        ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.earliest, new ManagedLedgerConfig());
+        ReadOnlyCursor cursor = factory.openReadOnlyCursor("simple", PositionImpl.EARLIEST, new ManagedLedgerConfig());
 
         assertEquals(cursor.getNumberOfEntries(), N);
         assertTrue(cursor.hasMoreEntries());
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index f1c0a4a..fd2b4ca 100644
--- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -90,14 +90,14 @@
 
         loginContextName = config.getSaslJaasServerSectionName();
         if (jaasCredentialsContainer == null) {
-            log.info("JAAS loginContext is: {}." , loginContextName);
+            log.info("JAAS loginContext is: {}.", loginContextName);
             try {
                 jaasCredentialsContainer = new JAASCredentialsContainer(
                     loginContextName,
                     new PulsarSaslServer.SaslServerCallbackHandler(allowedIdsPattern),
                     configuration);
             } catch (LoginException e) {
-                log.error("JAAS login in broker failed" , e);
+                log.error("JAAS login in broker failed", e);
                 throw new IOException(e);
             }
         }
@@ -122,7 +122,7 @@
             PulsarSaslServer server = new PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern);
             return new SaslAuthenticationState(server);
         } catch (Throwable t) {
-            log.error("Failed create sasl auth state" , t);
+            log.error("Failed create sasl auth state", t);
             throw new AuthenticationException(t.getMessage());
         }
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 7cf1a8b..3d0f995 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -247,7 +247,7 @@
                 // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
                 if (totalAvailableBookiesInPrimaryGroup < ensembleSize) {
                     log.info(
-                        "Not found enough available-bookies from primary isolation group [{}] , checking secondary group [{}]",
+                        "Not found enough available-bookies from primary isolation group [{}], checking secondary group [{}]",
                         primaryIsolationGroup, secondaryIsolationGroup);
                     for (String group : secondaryIsolationGroup) {
                         Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 76d2d6c..eed476c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -263,7 +263,7 @@
     private final ReentrantLock mutex = new ReentrantLock();
     private final Condition isClosedCondition = mutex.newCondition();
     private volatile CompletableFuture<Void> closeFuture;
-    // key is listener name , value is pulsar address and pulsar ssl address
+    // key is listener name, value is pulsar address and pulsar ssl address
     private Map<String, AdvertisedListener> advertisedListeners;
     private NamespaceName heartbeatNamespaceV2;
 
@@ -1218,7 +1218,7 @@
                 }
             } else {
                 LOG.info("No ledger offloader configured, using NULL instance");
-                return NullLedgerOffloader.INSTANCE;
+                return NullLedgerOffloader.instance_;
             }
         } catch (Throwable t) {
             throw new PulsarServerException(t);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index df0f380..e54bb59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2546,7 +2546,7 @@
         } catch (Exception exception) {
             exception.printStackTrace();
             log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(), messagePosition,
-                    topicName , exception);
+                    topicName, exception);
             throw new RestException(exception);
         }
     }
@@ -4239,7 +4239,7 @@
                 }));
     }
 
-    protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold , boolean isGlobal) {
+    protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold, boolean isGlobal) {
         if (compactionThreshold != null && compactionThreshold < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 9d26e30..211707d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -894,7 +894,7 @@
         //both namespace-level and topic-level policy are not set, try to use broker-level policy
         ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
         if (publishRate != null) {
-            //publishRate is not null , use namespace-level policy
+            //publishRate is not null, use namespace-level policy
             updatePublishDispatcher(publishRate);
         } else {
             PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0451571..c2a27ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1683,7 +1683,7 @@
             if (t instanceof PersistentTopic) {
                 Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
                         managedLedger -> {
-                            managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+                            managedLedger.trimConsumedLedgersInBackground(Futures.nullPromise_);
                         }
                 );
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 468946d..dc4019d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -235,7 +235,7 @@
         }
 
         // Note
-        // Must ensure that the message is written to the pendingAcks before sent is first , because this consumer
+        // Must ensure that the message is written to the pendingAcks before sent is first, because this consumer
         // is possible to disconnect at this time.
         if (pendingAcks != null) {
             for (int i = 0; i < entries.size(); i++) {
@@ -347,7 +347,7 @@
                 log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring",
                         subscription, consumerId);
             }
-            PositionImpl position = PositionImpl.earliest;
+            PositionImpl position = PositionImpl.EARLIEST;
             if (ack.getMessageIdsCount() == 1) {
                 MessageIdData msgId = ack.getMessageIdAt(0);
                 if (msgId.getAckSetsCount() > 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5b4d1f5..f4c8335 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2195,7 +2195,7 @@
                                         txnID.getLeastSigBits(), txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
-                    log.error("handleEndTxnOnPartition fail ! topic {} , "
+                    log.error("handleEndTxnOnPartition fail ! topic {}, "
                                     + "txnId: [{}], txnAction: [{}]", topic, txnID,
                             TxnAction.valueOf(txnAction), e.getCause());
                     ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
@@ -2205,7 +2205,7 @@
                 });
             }
         }).exceptionally(e -> {
-            log.error("handleEndTxnOnPartition fail ! topic {} , "
+            log.error("handleEndTxnOnPartition fail ! topic {}, "
                             + "txnId: [{}], txnAction: [{}]", topic, txnID,
                     TxnAction.valueOf(txnAction), e.getCause());
             ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
@@ -2248,7 +2248,7 @@
                         subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
                 completableFuture.whenComplete((ignored, e) -> {
                     if (e != null) {
-                        log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+                        log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
                                         + "txnId: [{}], txnAction: [{}]", topic, subName,
                                 txnID, TxnAction.valueOf(txnAction), e.getCause());
                         ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
@@ -2266,7 +2266,7 @@
                         .thenAccept((b) -> {
                             if (b) {
                                 log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, "
-                                                + "subscription: {} ,txnId: [{}], txnAction: [{}]", topic, subName,
+                                                + "subscription: {}, txnId: [{}], txnAction: [{}]", topic, subName,
                                         new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
                                 ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                         requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
@@ -2280,7 +2280,7 @@
                                         txnID.getLeastSigBits(), txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
-                    log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}"
+                    log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
                                     + "txnId: [{}], txnAction: [{}]", topic, subName,
                             txnID, TxnAction.valueOf(txnAction), e.getCause());
                     ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
@@ -2290,7 +2290,7 @@
                 });
             }
         }).exceptionally(e -> {
-            log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+            log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
                             + "txnId: [{}], txnAction: [{}]", topic, subName,
                     txnID, TxnAction.valueOf(txnAction), e.getCause());
             ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 555da88..af9907d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -187,7 +187,7 @@
             public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 future.completeExceptionally(exception);
             }
-        }, null, PositionImpl.latest);
+        }, null, PositionImpl.LATEST);
     }
 
     public Status getStatus() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 4d79c9a..f7aebc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -242,7 +242,7 @@
                             messagesToRead);
                 }
                 cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this,
-                        null, PositionImpl.latest);
+                        null, PositionImpl.LATEST);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index d8e48f9..88a9419 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -162,7 +162,7 @@
         try {
             locatorEntry = getLocator(key).get();
         } catch (Exception e) {
-            log.warn("Failed to get list of schema-storage ledger for {} , the exception as follow: \n {}", key,
+            log.warn("Failed to get list of schema-storage ledger for {}, the exception as follow: \n {}", key,
                     (e instanceof ExecutionException ? e.getCause() : e));
             throw new IOException("Failed to get schema ledger for" + key);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java
index cdd4081..b0ab159 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java
@@ -35,7 +35,7 @@
         }
         if (descriptor == null) {
             throw new InvalidSchemaDataException(
-                    "protobuf root message descriptor is null ,"
+                    "protobuf root message descriptor is null,"
                             + " please recheck rootMessageTypeName or rootFileDescriptorName conf. ");
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 98d572c..261b059 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -371,7 +371,7 @@
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return PositionImpl.latest;
+        return PositionImpl.LATEST;
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 9978f6f..d6b053c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -472,7 +472,7 @@
         if (checkIfReady() || checkIfNoSnapshot()) {
             return this.maxReadPosition;
         } else {
-            return PositionImpl.earliest;
+            return PositionImpl.EARLIEST;
         }
     }
 
@@ -510,7 +510,7 @@
 
         private final TopicTransactionBufferRecoverCallBack callBack;
 
-        private Position startReadCursorPosition = PositionImpl.earliest;
+        private Position startReadCursorPosition = PositionImpl.EARLIEST;
 
         private final SpscArrayQueue<Entry> entryQueue;
 
@@ -653,7 +653,7 @@
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
-                    cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.latest);
+                    cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.LATEST);
                 }
             }
             return isReadable;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index f7e147d..5ccec5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -90,7 +90,7 @@
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return PositionImpl.latest;
+        return PositionImpl.LATEST;
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 1592318..1b63c11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -110,7 +110,7 @@
     //TODO can control the number of entry to read
     private void readAsync(int numberOfEntriesToRead,
                            AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
-        cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.latest);
+        cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.LATEST);
     }
 
     @Override
@@ -285,7 +285,7 @@
                 buf.release();
                 completableFuture.completeExceptionally(new PersistenceException(exception));
             }
-        } , null);
+        }, null);
         return completableFuture;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index aac213f..f379500 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -90,13 +90,13 @@
         synchronized (this) {
             PositionImpl cursorPosition;
             if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
-                cursorPosition = PositionImpl.earliest;
+                cursorPosition = PositionImpl.EARLIEST;
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
             }
             if (compactionHorizon == null
                 || compactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest);
+                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.LATEST);
             } else {
                 compactedTopicContext.thenCompose(
                     (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
@@ -110,7 +110,7 @@
                             }
                             if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) {
                                 cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer,
-                                        PositionImpl.latest);
+                                        PositionImpl.LATEST);
                                 return CompletableFuture.completedFuture(null);
                             } else {
                                 long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 6cd362e..4870aa6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1210,7 +1210,7 @@
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getDispatchRate(topic)));
 
-        //2 Remove level policy ,DispatchRateLimiter should us ns level policy
+        //2 Remove level policy, DispatchRateLimiter should us ns level policy
         Awaitility.await()
                 .untilAsserted(() -> {
                     DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 1e44ea0..2acb24b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -434,7 +434,7 @@
 
         admin.topics().removeInactiveTopicPolicies(topic);
         //Only the broker-level policies is set, so after removing the topic-level policies
-        // , the topic will use the broker-level policies
+        //, the topic will use the broker-level policies
         Awaitility.await().untilAsserted(()
                 -> assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).getInactiveTopicPolicies()
                 , defaultPolicy));
@@ -446,7 +446,7 @@
         assertEquals(policies, admin.topics().getInactiveTopicPolicies(topic2));
         inactiveTopicPolicies.setMaxInactiveDurationSeconds(999);
         //Both broker level and namespace level policies are set, so after removing the topic level policies
-        // , the topic will use the namespace level policies
+        //, the topic will use the namespace level policies
         admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
         //wait for zk
         Awaitility.await().until(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 786d7a2..570c6cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1547,9 +1547,9 @@
         assertNotNull(map);
         assertEquals((long) map.get("brk_connection_created_total_count"), 2);
         assertEquals((long) map.get("brk_active_connections"), 0);
-        assertEquals((long) map.get("brk_connection_closed_total_count") , 2);
+        assertEquals((long) map.get("brk_connection_closed_total_count"), 2);
         assertEquals((long) map.get("brk_connection_create_success_count"), 1);
-        assertEquals((long) map.get("brk_connection_create_fail_count") , 1);
+        assertEquals((long) map.get("brk_connection_create_fail_count"), 1);
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index cd3f3cc..95e9741 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -312,7 +312,7 @@
             final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0];
             final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[1];
             final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[2];
-            callback.addComplete(PositionImpl.latest, payload, ctx);
+            callback.addComplete(PositionImpl.LATEST, payload, ctx);
             return null;
         }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
 
@@ -329,8 +329,8 @@
         final Topic.PublishContext publishContext = new Topic.PublishContext() {
             @Override
             public void completed(Exception e, long ledgerId, long entryId) {
-                assertEquals(ledgerId, PositionImpl.latest.getLedgerId());
-                assertEquals(entryId, PositionImpl.latest.getEntryId());
+                assertEquals(ledgerId, PositionImpl.LATEST.getLedgerId());
+                assertEquals(entryId, PositionImpl.LATEST.getEntryId());
                 latch.countDown();
             }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index c88a464..0c6e7e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1120,7 +1120,7 @@
 
     @DataProvider(name = "topicPrefix")
     public static Object[][] topicPrefix() {
-        return new Object[][] { { "persistent://" , "/persistent" }, { "non-persistent://" , "/non-persistent" } };
+        return new Object[][] { { "persistent://", "/persistent" }, { "non-persistent://", "/non-persistent" } };
     }
 
     @Test(dataProvider = "topicPrefix")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 8c5e969..6c27e2c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -339,7 +339,7 @@
         received = consumer.receive();
         assertEquals(received.getMessageId(), messageIds.get(messageIds.size() - 1));
 
-        admin.topics().resetCursor(topicName, subscriptionName, new BatchMessageIdImpl(-1, -1, -1 ,10), true);
+        admin.topics().resetCursor(topicName, subscriptionName, new BatchMessageIdImpl(-1, -1, -1,10), true);
         // Wait consumer reconnect
         Awaitility.await().until(consumer::isConnected);
         received = consumer.receive();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index c8f09fa..7b63ea5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -160,7 +160,7 @@
         Awaitility.await().untilAsserted(() -> assertFalse(messageDeduplication.isEnabled()));
         producer.newMessage().value("msg").sequenceId(1).send();
         checkDeduplicationDisabled(producerName, messageDeduplication);
-        //remove namespace-level , use broker-level
+        //remove namespace-level, use broker-level
         admin.namespaces().removeDeduplicationStatus(myNamespace);
         Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
         Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled()));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0c629f1..0d1bbda 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -209,7 +209,7 @@
             }
             return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
                     TopicName.get(topic).getPersistenceNamingEncoding(),
-                    PositionImpl.earliest, new ManagedLedgerConfig());
+                    PositionImpl.EARLIEST, new ManagedLedgerConfig());
         } catch (Exception e) {
             log.error("Failed to get origin topic readonly cursor.", e);
             Assert.fail("Failed to get origin topic readonly cursor.");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 3cc44ba..97907fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -133,7 +133,7 @@
         }
 
         txn.abort().get();
-        // commit this txn , normalAckMessageIds are in pending ack state
+        // commit this txn, normalAckMessageIds are in pending ack state
         commitTxn.commit().get();
         // abort this txn, pendingAckMessageIds are delete from pending ack state
         abortTxn.abort().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index dbcaae1..cca5b77 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -92,7 +92,7 @@
         do {
             Message<byte[]> message = consumer.receive();
             log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
-            consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+            consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
             totalReceived++;
         } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
 
@@ -275,7 +275,7 @@
         do {
             Message<byte[]> message = consumer.receive();
             log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
-            consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+            consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
             totalReceived++;
         } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
 
@@ -429,7 +429,7 @@
         do {
             Message<byte[]> message = consumer.receive();
             log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
-            consumer.reconsumeLater(message, 1 , TimeUnit.SECONDS);
+            consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
             totalReceived++;
         } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
         int totalInDeadLetter = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 63dbb8f..6728fc5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -3168,7 +3168,7 @@
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topicName)
                 .enableBatching(false)
-                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .autoUpdatePartitionsInterval(2, TimeUnit.SECONDS)
                 .create();
 
         // 1. produce 5 messages
@@ -3181,7 +3181,7 @@
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .receiverQueueSize(receiverQueueSize)
-                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .autoUpdatePartitionsInterval(2, TimeUnit.SECONDS)
                 .subscriptionName("test-multi-topic-consumer").subscribe();
 
         int counter = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index 641b9a0..95cfdd9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -99,7 +99,7 @@
                 .create();
 
         ManagedCursor cursor = ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get())
-                .getManagedLedger().newNonDurableCursor(PositionImpl.earliest);
+                .getManagedLedger().newNonDurableCursor(PositionImpl.EARLIEST);
 
         if (batchEnabled) {
             for (int i = 0; i < n - 1; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index b7215f4..5eaeea1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -752,7 +752,7 @@
         doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
                 .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
 
-        // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
+        // 7. call recheckTopics to unsubscribe topic 1,3, verify topics number: 2=6-1-3
         log.debug("recheck topics change");
         PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 69886a6..14db9a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -660,7 +660,7 @@
             }
         }, 20, 150);
 
-        //change the schema ,the function should not run, resulting in no messages to consume
+        //change the schema, the function should not run, resulting in no messages to consume
         schemaInput.put(sourceTopic, "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"false\",\"__alwaysAllowNull\":\"false\"}}");
         localRunner = LocalRunner.builder()
                 .functionConfig(functionConfig)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 6d4b29b..b7a9dd9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -289,7 +289,7 @@
      *          consumer.acknowledge(msg);
      *     } catch (Throwable t) {
      *          log.warn("Failed to process message");
-     *          consumer.reconsumeLater(msg, 1000 , TimeUnit.MILLISECONDS);
+     *          consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS);
      *     }
      * }
      * </code></pre>
@@ -322,7 +322,7 @@
      *          consumer.acknowledge(msg);
      *     } catch (Throwable t) {
      *          log.warn("Failed to process message");
-     *          consumer.reconsumeLater(msg, 1000 , TimeUnit.MILLISECONDS);
+     *          consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS);
      *     }
      * }
      * </code></pre>
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index 38c53c3..3a19ba8 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -143,7 +143,7 @@
         if (!initializedJAAS) {
             synchronized (this) {
                 if (jaasCredentialsContainer == null) {
-                    log.info("JAAS loginContext is: {}." , loginContextName);
+                    log.info("JAAS loginContext is: {}.", loginContextName);
                     try {
                         jaasCredentialsContainer = new JAASCredentialsContainer(
                             loginContextName,
@@ -151,7 +151,7 @@
                             configuration);
                         initializedJAAS = true;
                     } catch (LoginException e) {
-                        log.error("JAAS login in client failed" , e);
+                        log.error("JAAS login in client failed", e);
                         throw new PulsarClientException(e);
                     }
                 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
index d5b451f..b0b701a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
@@ -50,7 +50,7 @@
             //extract root message path
             String rootMessageTypeName = descriptor.getFullName();
             String rootFileDescriptorName = descriptor.getFile().getFullName();
-            //build FileDescriptorSet , this is equal to < protoc --include_imports --descriptor_set_out >
+            //build FileDescriptorSet, this is equal to < protoc --include_imports --descriptor_set_out >
             byte[] fileDescriptorSet = FileDescriptorSet.newBuilder().addAllFile(fileDescriptorProtoCache.values()).build().toByteArray();
 
             //serialize to bytes
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
index 084a583..717949d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
@@ -58,7 +58,7 @@
                 if (log.isDebugEnabled()) {
                     log.debug("execute with retry fail, will retry in {} ms", next, e);
                 }
-                log.info("Because of {} , will retry in {} ms", e.getMessage(), next);
+                log.info("Because of {}, will retry in {} ms", e.getMessage(), next);
                 scheduledExecutorService.schedule(() ->
                                 executeWithRetry(supplier, backoff, scheduledExecutorService, callback),
                         next, TimeUnit.MILLISECONDS);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 2d4e9df..b29d3cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -87,7 +87,7 @@
                 // if lower and upper has different key/ledger then set ranges for lower-key only if
                 // a. bitSet already exist and given value is not the last value in the bitset.
                 // it will prevent setting up values which are not actually expected to set
-                // eg: (2:10..4:10] in this case , don't set any value for 2:10 and set [4:0..4:10]
+                // eg: (2:10..4:10] in this case, don't set any value for 2:10 and set [4:0..4:10]
                 if (rangeBitSet != null && (rangeBitSet.previousSetBit(rangeBitSet.size()) > lowerValueOpen)) {
                     int lastValue = rangeBitSet.previousSetBit(rangeBitSet.size());
                     rangeBitSet.set((int) lowerValue, (int) Math.max(lastValue, lowerValue) + 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
index 037a6ff..99a5f02 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
@@ -43,7 +43,7 @@
         singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
         singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
         singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY_ID).setValue(HARD_CODE_KEY_ID_VALUE);
-        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null , 0, 0, 0);
+        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null, 0, 0, 0);
         Map<String, String> properties = msg.getProperties();
         assertEquals(properties.get(HARD_CODE_KEY), KEY_VALUE_SECOND);
         assertEquals(properties.get(HARD_CODE_KEY_ID), HARD_CODE_KEY_ID_VALUE);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index e04960f..3db2ec9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -133,7 +133,7 @@
         _statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);
 
         statTotalSysExceptions = collectorRegistry.registerIfNotExist(
-                PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL ,
+                PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL,
                 Counter.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
                 .help("Total number of system exceptions.")
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 9b8d8d4..dfb0b9e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -117,7 +117,7 @@
                                     .setIsRegexPattern(false)
                                     .build());
                 } catch (JsonProcessingException e) {
-                    throw new IllegalArgumentException(String.format("Incorrect custom schema inputs ,Topic %s ", topicName));
+                    throw new IllegalArgumentException(String.format("Incorrect custom schema inputs,Topic %s ", topicName));
                 }
             });
         }
@@ -212,7 +212,7 @@
                     sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
                 }
             } catch (JsonProcessingException e) {
-                throw new IllegalArgumentException(String.format("Incorrect custom schema outputs ,Topic %s ", functionConfig.getOutput()));
+                throw new IllegalArgumentException(String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
             }
         }
         if (typeArgs != null) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 26479ba..69f8b38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -183,7 +183,7 @@
     @Override
     public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
             uri, String clientRole) {
-        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole ,null);
+        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole, null);
         return Response.ok().build();
     }
 
diff --git a/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
index 4a7a8fd..d5f6284 100644
--- a/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
+++ b/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AbstractAwsConnector.java
@@ -63,7 +63,7 @@
         } catch (Exception e) {
             log.error("Failed to initialize AwsCredentialProviderPlugin {}", pluginFQClassName, e);
             throw new IllegalArgumentException(
-                    String.format("invalid authplugin name %s , failed to init %s", pluginFQClassName, e.getMessage()));
+                    String.format("invalid authplugin name %s, failed to init %s", pluginFQClassName, e.getMessage()));
         }
     }
 
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
index 9e85b43..77ee5bb 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
@@ -90,7 +90,7 @@
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:"
                             + entry.toString(), e);
                 }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index ead243a..cdd208d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -144,7 +144,7 @@
             if (service.getProxyLogLevel() == 2) {
                 //Set a map between inbound and outbound,
                 //so can find inbound by outbound or find outbound by inbound
-                inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
+                inboundOutboundChannelMap.put(outboundChannel.id(), inboundChannel.id());
             }
 
             if (!config.isHaProxyProtocolEnabled()) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 40c05a3..fea1a40 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -116,13 +116,13 @@
                     ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(),
                             cmd.getProducer().getTopic());
 
-                    logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName()
+                    logging(ctx.channel(), cmd.getType(), "{producer:" + cmd.getProducer().getProducerName()
                             + ",topic:" + cmd.getProducer().getTopic() + "}", null);
                     break;
 
                 case SEND:
                     if (service.getProxyLogLevel() != 2) {
-                        logging(ctx.channel() , cmd.getType() , "", null);
+                        logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
                     topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + ","
@@ -137,20 +137,20 @@
                     TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
                         topic -> new TopicStats());
                     topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
-                    logging(ctx.channel() , cmd.getType() , "" , messages);
+                    logging(ctx.channel(), cmd.getType(), "", messages);
                     break;
 
                 case SUBSCRIBE:
                     ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
                                     + ctx.channel().id(), cmd.getSubscribe().getTopic());
 
-                    logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName()
-                            + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
+                    logging(ctx.channel(), cmd.getType(), "{consumer:" + cmd.getSubscribe().getConsumerName()
+                            + ",topic:" + cmd.getSubscribe().getTopic() + "}", null);
                     break;
 
                 case MESSAGE:
                     if (service.getProxyLogLevel() != 2) {
-                        logging(ctx.channel() , cmd.getType() , "" , null);
+                        logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
                     topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
@@ -165,17 +165,15 @@
                     topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
                             topic -> new TopicStats());
                     topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
-                    logging(ctx.channel() , cmd.getType() , "" , messages);
+                    logging(ctx.channel(), cmd.getType(), "", messages);
                     break;
 
                  default:
-                    logging(ctx.channel() , cmd.getType() , "" , null);
+                    logging(ctx.channel(), cmd.getType(), "", null);
                     break;
             }
         } catch (Exception e){
-
-            log.error("{},{},{}" , e.getMessage() , e.getStackTrace() ,  e.getCause());
-
+            log.error("channelRead error ", e);
         } finally {
             buffer.resetReaderIndex();
             buffer.resetWriterIndex();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index b0d53ad..953773c 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -82,7 +82,7 @@
         doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
         Optional<Integer> proxyLogLevel = Optional.of(2);
-        assertEquals( proxyLogLevel , proxyService.getConfiguration().getProxyLogLevel());
+        assertEquals( proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
         proxyService.start();
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 9a64c05..16a838f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -178,7 +178,7 @@
                 }
             } else {
                 log.info("No ledger offloader configured, using NULL instance");
-                return NullLedgerOffloader.INSTANCE;
+                return NullLedgerOffloader.instance_;
             }
         } catch (Throwable t) {
             throw new RuntimeException(t);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index d839e05..df46991 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -395,7 +395,7 @@
                             // if the available size is invalid and the entry queue size is 0, read one entry
                             outstandingReadsRequests.decrementAndGet();
                             cursor.asyncReadEntries(batchSize, entryQueueCacheSizeAllocator.getAvailableCacheSize(),
-                                    this, System.nanoTime(), PositionImpl.latest);
+                                    this, System.nanoTime(), PositionImpl.LATEST);
                         }
 
                         // stats for successful read request
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 96a100e..ee64432 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -270,7 +270,7 @@
         try {
             readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
                     topicNamePersistenceEncoding,
-                    PositionImpl.earliest, managedLedgerConfig);
+                    PositionImpl.EARLIEST, managedLedgerConfig);
 
             long numEntries = readOnlyCursor.getNumberOfEntries();
             if (numEntries <= 0) {
@@ -362,7 +362,7 @@
             try {
                 readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
                         topicNamePersistenceEncoding,
-                        PositionImpl.earliest, managedLedgerConfig);
+                        PositionImpl.EARLIEST, managedLedgerConfig);
 
                 if (tupleDomain.getDomains().isPresent()) {
                     Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
index f1410e6..255f266 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
@@ -31,7 +31,7 @@
     public void testFormatMessage() {
         String producerName = "producer-1";
         long msgId = 3;
-        byte[] message = "{ \"producer\": \"%p\", \"msgId\": %i, \"nanoTime\": %t, \"float1\": %5.2f, \"float2\": %-5.2f, \"long1\": %12l, \"long2\": %l, \"int1\": %d, \"int2\": %1d , \"long3\": %5l,  \"str\": \"%5s\" }".getBytes();
+        byte[] message = "{ \"producer\": \"%p\", \"msgId\": %i, \"nanoTime\": %t, \"float1\": %5.2f, \"float2\": %-5.2f, \"long1\": %12l, \"long2\": %l, \"int1\": %d, \"int2\": %1d, \"long3\": %5l,  \"str\": \"%5s\" }".getBytes();
         byte[] formatted = new DefaultMessageFormatter().formatMessage(producerName, msgId, message);
         String jsonString = new String(formatted, StandardCharsets.UTF_8);
 
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 8bf2ebf..a4c347a 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -123,7 +123,7 @@
 
     private void readAsync(int numberOfEntriesToRead,
                            AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
-        cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.latest);
+        cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime(), PositionImpl.LATEST);
     }
 
     @Override
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 7f1d8a7..28f41ee 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -453,7 +453,7 @@
             try {
                 builder.cryptoFailureAction(ConsumerCryptoFailureAction.valueOf(action));
             } catch (Exception e) {
-                log.warn("Failed to configure cryptoFailureAction {} , {}", action, e.getMessage());
+                log.warn("Failed to configure cryptoFailureAction {}, {}", action, e.getMessage());
             }
         }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index ef0279d..21cd10b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -98,7 +98,7 @@
                 try {
                     builder.cryptoFailureAction(ConsumerCryptoFailureAction.valueOf(action));
                 } catch (Exception e) {
-                    log.warn("Failed to configure cryptoFailureAction {} , {}", action, e.getMessage());
+                    log.warn("Failed to configure cryptoFailureAction {}, {}", action, e.getMessage());
                 }
             }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 62f59c3..dca6be4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -241,7 +241,7 @@
                 .create();
 
         for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
-            final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
+            final Stock stock = new Stock(i, "STOCK_" + i, 100.0 + i * 10);
             producer.send(stock);
         }
         producer.flush();
@@ -277,8 +277,8 @@
 
         for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
             int j = 100 * i;
-            final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
-            final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
+            final Stock stock1 = new Stock(j, "STOCK_" + j, 100.0 + j * 10);
+            final Stock stock2 = new Stock(i, "STOCK_" + i, 100.0 + i * 10);
             producer.send(new KeyValue<>(stock1, stock2));
         }
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 4c129fa..2d3e96a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -150,7 +150,7 @@
         int sendMessageCnt = 0;
         while (true) {
             Stock stock = new Stock(
-                    sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10);
+                    sendMessageCnt,"STOCK_" + sendMessageCnt, 100.0 + sendMessageCnt * 10);
             MessageIdImpl messageId = (MessageIdImpl) producer.send(stock);
             sendMessageCnt ++;
             if (firstLedgerId == -1) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index f9c86fd..817c6a7 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -94,7 +94,7 @@
     final private long maxBufferLength;
     final private ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue<>();
     private CompletableFuture<OffloadResult> offloadResult;
-    private volatile PositionImpl lastOfferedPosition = PositionImpl.latest;
+    private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
     private final Duration maxSegmentCloseTime;
     private final long minSegmentCloseTimeMillis;
     private final long segmentBeginTimeMillis;