| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.utils.memory; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.NativeDecoratedKey; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.utils.concurrent.OpOrder; |
| |
| /** |
| * This NativeAllocator uses global slab allocation strategy |
| * with slab size that scales exponentially from 8kb to 1Mb to |
| * serve allocation of up to 128kb. |
| * <p> |
| * </p> |
| * The slab allocation reduces heap fragmentation from small |
| * long-lived objects. |
| * |
| */ |
| public class NativeAllocator extends MemtableAllocator |
| { |
| private final static int MAX_REGION_SIZE = 1 * 1024 * 1024; |
| private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region |
| private final static int MIN_REGION_SIZE = 8 * 1024; |
| |
| // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more |
| private static final Map<Integer, RaceAllocated> RACE_ALLOCATED = new HashMap<>(); |
| |
| static |
| { |
| for(int i = MIN_REGION_SIZE ; i <= MAX_REGION_SIZE; i *= 2) |
| RACE_ALLOCATED.put(i, new RaceAllocated()); |
| } |
| |
| private final AtomicReference<Region> currentRegion = new AtomicReference<>(); |
| private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>(); |
| |
| protected NativeAllocator(NativePool pool) |
| { |
| super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator()); |
| } |
| |
| public Row.Builder rowBuilder(OpOrder.Group opGroup) |
| { |
| // TODO |
| throw new UnsupportedOperationException(); |
| } |
| |
| public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp) |
| { |
| return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey()); |
| } |
| |
| public long allocate(int size, OpOrder.Group opGroup) |
| { |
| assert size >= 0; |
| offHeap().allocate(size, opGroup); |
| // satisfy large allocations directly from JVM since they don't cause fragmentation |
| // as badly, and fill up our regions quickly |
| if (size > MAX_CLONED_SIZE) |
| return allocateOversize(size); |
| |
| while (true) |
| { |
| Region region = currentRegion.get(); |
| long peer; |
| if (region != null && (peer = region.allocate(size)) > 0) |
| return peer; |
| |
| trySwapRegion(region, size); |
| } |
| } |
| |
| private void trySwapRegion(Region current, int minSize) |
| { |
| // decide how big we want the new region to be: |
| // * if there is no prior region, we set it to min size |
| // * otherwise we double its size; if it's too small to fit the allocation, we round it up to 4-8x its size |
| int size; |
| if (current == null) size = MIN_REGION_SIZE; |
| else size = current.capacity * 2; |
| if (minSize > size) |
| size = Integer.highestOneBit(minSize) << 3; |
| size = Math.min(MAX_REGION_SIZE, size); |
| |
| // first we try and repurpose a previously allocated region |
| RaceAllocated raceAllocated = RACE_ALLOCATED.get(size); |
| Region next = raceAllocated.poll(); |
| |
| // if there are none, we allocate one |
| if (next == null) |
| next = new Region(MemoryUtil.allocate(size), size); |
| |
| // we try to swap in the region we've obtained; |
| // if we fail to swap the region, we try to stash it for repurposing later; if we're out of stash room, we free it |
| if (currentRegion.compareAndSet(current, next)) |
| regions.add(next); |
| else if (!raceAllocated.stash(next)) |
| MemoryUtil.free(next.peer); |
| } |
| |
| private long allocateOversize(int size) |
| { |
| // satisfy large allocations directly from JVM since they don't cause fragmentation |
| // as badly, and fill up our regions quickly |
| Region region = new Region(MemoryUtil.allocate(size), size); |
| regions.add(region); |
| |
| long peer; |
| if ((peer = region.allocate(size)) == -1) |
| throw new AssertionError(); |
| |
| return peer; |
| } |
| |
| public void setDiscarded() |
| { |
| for (Region region : regions) |
| MemoryUtil.free(region.peer); |
| super.setDiscarded(); |
| } |
| |
| // used to ensure we don't keep loads of race allocated regions around indefinitely. keeps the total bound on wasted memory low. |
| private static class RaceAllocated |
| { |
| final ConcurrentLinkedQueue<Region> stash = new ConcurrentLinkedQueue<>(); |
| final Semaphore permits = new Semaphore(8); |
| boolean stash(Region region) |
| { |
| if (!permits.tryAcquire()) |
| return false; |
| stash.add(region); |
| return true; |
| } |
| Region poll() |
| { |
| Region next = stash.poll(); |
| if (next != null) |
| permits.release(); |
| return next; |
| } |
| } |
| |
| /** |
| * A region of memory out of which allocations are sliced. |
| * |
| * This serves two purposes: |
| * - to provide a step between initialization and allocation, so that racing to CAS a |
| * new region in is harmless |
| * - encapsulates the allocation offset |
| */ |
| private static class Region |
| { |
| /** |
| * Actual underlying data |
| */ |
| private final long peer; |
| |
| private final int capacity; |
| |
| /** |
| * Offset for the next allocation, or the sentinel value -1 |
| * which implies that the region is still uninitialized. |
| */ |
| private AtomicInteger nextFreeOffset = new AtomicInteger(0); |
| |
| /** |
| * Create an uninitialized region. Note that memory is not allocated yet, so |
| * this is cheap. |
| * |
| * @param peer peer |
| */ |
| private Region(long peer, int capacity) |
| { |
| this.peer = peer; |
| this.capacity = capacity; |
| } |
| |
| /** |
| * Try to allocate <code>size</code> bytes from the region. |
| * |
| * @return the successful allocation, or -1 to indicate not-enough-space |
| */ |
| long allocate(int size) |
| { |
| int newOffset = nextFreeOffset.getAndAdd(size); |
| |
| if (newOffset + size > capacity) |
| // this region is full |
| return -1; |
| |
| return peer + newOffset; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "Region@" + System.identityHashCode(this) + |
| "waste=" + Math.max(0, capacity - nextFreeOffset.get()); |
| } |
| } |
| |
| } |