Merge branch 'cassandra-3.11' into trunk
diff --git a/CHANGES.txt b/CHANGES.txt
index 76cd271..d382650 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@
  * Upgrade Jackson to 2.9.10 (CASSANDRA-15867)
  * Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503)
 Merged from 3.0:
+ * Avoid thread starvation, and improve compare-and-swap performance, in the slab allocators (CASSANDRA-15922)
  * Add token to tombstone warning and error messages (CASSANDRA-15890)
  * Fixed range read concurrency factor computation and capped as 10 times tpc cores (CASSANDRA-15752)
  * Catch exception on bootstrap resume and init native transport (CASSANDRA-15863)
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 61e8407..af8b750 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -218,11 +218,6 @@
         private final AtomicInteger nextFreeOffset = new AtomicInteger(0);
 
         /**
-         * Total number of allocations satisfied from this buffer
-         */
-        private final AtomicInteger allocCount = new AtomicInteger();
-
-        /**
          * Create an uninitialized region. Note that memory is not allocated yet, so
          * this is cheap.
          *
@@ -237,34 +232,24 @@
         /**
          * Try to allocate <code>size</code> bytes from the region.
          *
-         * @return the successful allocation, or null to indicate not-enough-space
+         * @return the successful allocation, or -1 to indicate not-enough-space
          */
         long allocate(int size)
         {
-            while (true)
-            {
-                int oldOffset = nextFreeOffset.get();
+            int newOffset = nextFreeOffset.getAndAdd(size);
 
-                if (oldOffset + size > capacity) // capacity == remaining
-                    return -1;
+            if (newOffset + size > capacity)
+                // this region is full
+                return -1;
 
-                // Try to atomically claim this region
-                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
-                {
-                    // we got the alloc
-                    allocCount.incrementAndGet();
-                    return peer + oldOffset;
-                }
-                // we raced and lost alloc, try again
-            }
+            return peer + newOffset;
         }
 
         @Override
         public String toString()
         {
             return "Region@" + System.identityHashCode(this) +
-                    " allocs=" + allocCount.get() + "waste=" +
-                    (capacity - nextFreeOffset.get());
+                    "waste=" + Math.max(0, capacity - nextFreeOffset.get());
         }
     }
 
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
index 16cb45d..538cd3f 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
@@ -179,11 +179,6 @@
         private final AtomicInteger nextFreeOffset = new AtomicInteger(0);
 
         /**
-         * Total number of allocations satisfied from this buffer
-         */
-        private final AtomicInteger allocCount = new AtomicInteger();
-
-        /**
          * Create an uninitialized region. Note that memory is not allocated yet, so
          * this is cheap.
          *
@@ -201,30 +196,20 @@
          */
         public ByteBuffer allocate(int size)
         {
-            while (true)
-            {
-                int oldOffset = nextFreeOffset.get();
+            int newOffset = nextFreeOffset.getAndAdd(size);
 
-                if (oldOffset + size > data.capacity()) // capacity == remaining
-                    return null;
+            if (newOffset + size > data.capacity())
+                // this region is full
+                return null;
 
-                // Try to atomically claim this region
-                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
-                {
-                    // we got the alloc
-                    allocCount.incrementAndGet();
-                    return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size);
-                }
-                // we raced and lost alloc, try again
-            }
+            return (ByteBuffer) data.duplicate().position((newOffset)).limit(newOffset + size);
         }
 
         @Override
         public String toString()
         {
             return "Region@" + System.identityHashCode(this) +
-                   " allocs=" + allocCount.get() + "waste=" +
-                   (data.capacity() - nextFreeOffset.get());
+                   "waste=" + Math.max(0, data.capacity() - nextFreeOffset.get());
         }
     }
 }