| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.offheap; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.CacheClosedException; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.internal.cache.BucketRegion; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalRegion; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.PartitionedRegionDataStore; |
| import org.apache.geode.internal.cache.RegionEntry; |
| import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier; |
| import org.apache.geode.internal.offheap.annotations.Unretained; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * This allocator is somewhat like an Arena allocator. We start out with an array of multiple large |
| * chunks of memory. We also keep lists of any chunk that have been allocated and freed. An |
| * allocation will always try to find a chunk in a free list that is a close fit to the requested |
| * size. If no close fits exist then it allocates the next slice from the front of one the original |
| * large chunks. If we can not find enough free memory then all the existing free memory is |
| * defragmented. If we still do not have enough to make the allocation an exception is thrown. |
| * |
| * @since Geode 1.0 |
| */ |
| public class MemoryAllocatorImpl implements MemoryAllocator { |
| |
| static final Logger logger = LogService.getLogger(); |
| |
| public static final String FREE_OFF_HEAP_MEMORY_PROPERTY = |
| DistributionConfig.GEMFIRE_PREFIX + "free-off-heap-memory"; |
| |
| private volatile OffHeapMemoryStats stats; |
| |
| private volatile OutOfOffHeapMemoryListener ooohml; |
| |
| OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() { |
| return this.ooohml; |
| } |
| |
| public final FreeListManager freeList; |
| |
| private MemoryInspector memoryInspector; |
| |
| private volatile MemoryUsageListener[] memoryUsageListeners = new MemoryUsageListener[0]; |
| |
| @MakeNotStatic |
| private static MemoryAllocatorImpl singleton = null; |
| |
| public static MemoryAllocatorImpl getAllocator() { |
| MemoryAllocatorImpl result = singleton; |
| if (result == null) { |
| throw new CacheClosedException("Off Heap memory allocator does not exist."); |
| } |
| return result; |
| } |
| |
| private static final boolean DO_EXPENSIVE_VALIDATION = |
| Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "OFF_HEAP_DO_EXPENSIVE_VALIDATION"); |
| |
| public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, |
| int slabCount, long offHeapMemorySize, long maxSlabSize) { |
| return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, |
| new SlabFactory() { |
| @Override |
| public Slab create(int size) { |
| return new SlabImpl(size); |
| } |
| }); |
| } |
| |
| private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, |
| OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, |
| Slab[] slabs, SlabFactory slabFactory) { |
| MemoryAllocatorImpl result = singleton; |
| boolean created = false; |
| try { |
| if (result != null) { |
| result.reuse(ooohml, stats, offHeapMemorySize, slabs); |
| logger.info( |
| "Reusing {} bytes of off-heap memory. The maximum size of a single off-heap object is {} bytes.", |
| result.getTotalMemory(), result.freeList.getLargestSlabSize()); |
| created = true; |
| LifecycleListener.invokeAfterReuse(result); |
| } else { |
| if (slabs == null) { |
| // allocate memory chunks |
| logger.info( |
| "Allocating {} bytes of off-heap memory. The maximum size of a single off-heap object is {} bytes.", |
| offHeapMemorySize, maxSlabSize); |
| slabs = new SlabImpl[slabCount]; |
| long uncreatedMemory = offHeapMemorySize; |
| for (int i = 0; i < slabCount; i++) { |
| try { |
| if (uncreatedMemory >= maxSlabSize) { |
| slabs[i] = slabFactory.create((int) maxSlabSize); |
| uncreatedMemory -= maxSlabSize; |
| } else { |
| // the last slab can be smaller then maxSlabSize |
| slabs[i] = slabFactory.create((int) uncreatedMemory); |
| } |
| } catch (OutOfMemoryError err) { |
| if (i > 0) { |
| logger.error( |
| "Off-heap memory creation failed after successfully allocating {} bytes of off-heap memory.", |
| (i * maxSlabSize)); |
| } |
| for (int j = 0; j < i; j++) { |
| if (slabs[j] != null) { |
| slabs[j].free(); |
| } |
| } |
| throw err; |
| } |
| } |
| } |
| |
| result = new MemoryAllocatorImpl(ooohml, stats, slabs); |
| singleton = result; |
| LifecycleListener.invokeAfterCreate(result); |
| created = true; |
| } |
| } finally { |
| if (!created) { |
| if (stats != null) { |
| stats.close(); |
| } |
| if (ooohml != null) { |
| ooohml.close(); |
| } |
| } |
| } |
| return result; |
| } |
| |
| static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml, |
| OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, |
| SlabFactory memChunkFactory) { |
| return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, memChunkFactory); |
| } |
| |
| public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, |
| OffHeapMemoryStats stats, Slab[] slabs) { |
| int slabCount = 0; |
| long offHeapMemorySize = 0; |
| long maxSlabSize = 0; |
| if (slabs != null) { |
| slabCount = slabs.length; |
| for (int i = 0; i < slabCount; i++) { |
| int slabSize = slabs[i].getSize(); |
| offHeapMemorySize += slabSize; |
| if (slabSize > maxSlabSize) { |
| maxSlabSize = slabSize; |
| } |
| } |
| } |
| return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null); |
| } |
| |
| |
| private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats, |
| long offHeapMemorySize, Slab[] slabs) { |
| if (isClosed()) { |
| throw new IllegalStateException("Can not reuse a closed off-heap memory manager."); |
| } |
| if (oooml == null) { |
| throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null"); |
| } |
| if (getTotalMemory() != offHeapMemorySize) { |
| logger.warn("Using {} bytes of existing off-heap memory instead of the requested {}.", |
| getTotalMemory(), offHeapMemorySize); |
| } |
| if (!this.freeList.okToReuse(slabs)) { |
| throw new IllegalStateException( |
| "attempted to reuse existing off-heap memory even though new off-heap memory was allocated"); |
| } |
| this.ooohml = oooml; |
| newStats.initialize(this.stats); |
| this.stats = newStats; |
| } |
| |
| private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, |
| final OffHeapMemoryStats stats, final Slab[] slabs) { |
| if (oooml == null) { |
| throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null"); |
| } |
| |
| this.ooohml = oooml; |
| this.stats = stats; |
| |
| this.stats.setFragments(slabs.length); |
| this.stats.setLargestFragment(slabs[0].getSize()); |
| |
| this.freeList = new FreeListManager(this, slabs); |
| this.memoryInspector = new MemoryInspectorImpl(this.freeList); |
| |
| this.stats.incMaxMemory(this.freeList.getTotalMemory()); |
| this.stats.incFreeMemory(this.freeList.getTotalMemory()); |
| } |
| |
| public List<OffHeapStoredObject> getLostChunks(InternalCache cache) { |
| List<OffHeapStoredObject> liveChunks = this.freeList.getLiveChunks(); |
| List<OffHeapStoredObject> regionChunks = getRegionLiveChunks(cache); |
| Set<OffHeapStoredObject> liveChunksSet = new HashSet<>(liveChunks); |
| Set<OffHeapStoredObject> regionChunksSet = new HashSet<>(regionChunks); |
| liveChunksSet.removeAll(regionChunksSet); |
| return new ArrayList<OffHeapStoredObject>(liveChunksSet); |
| } |
| |
| /** |
| * Returns a possibly empty list that contains all the Chunks used by regions. |
| */ |
| private List<OffHeapStoredObject> getRegionLiveChunks(InternalCache cache) { |
| ArrayList<OffHeapStoredObject> result = new ArrayList<OffHeapStoredObject>(); |
| if (cache != null) { |
| Iterator<Region<?, ?>> rootIt = cache.rootRegions().iterator(); |
| while (rootIt.hasNext()) { |
| Region<?, ?> rr = rootIt.next(); |
| getRegionLiveChunks(rr, result); |
| Iterator<Region<?, ?>> srIt = rr.subregions(true).iterator(); |
| while (srIt.hasNext()) { |
| getRegionLiveChunks(srIt.next(), result); |
| } |
| } |
| } |
| return result; |
| } |
| |
| private void getRegionLiveChunks(Region<?, ?> r, List<OffHeapStoredObject> result) { |
| if (r.getAttributes().getOffHeap()) { |
| |
| if (r instanceof PartitionedRegion) { |
| PartitionedRegionDataStore prs = ((PartitionedRegion) r).getDataStore(); |
| if (prs != null) { |
| Set<BucketRegion> brs = prs.getAllLocalBucketRegions(); |
| if (brs != null) { |
| for (BucketRegion br : brs) { |
| if (br != null && !br.isDestroyed()) { |
| this.basicGetRegionLiveChunks(br, result); |
| } |
| |
| } |
| } |
| } |
| } else { |
| this.basicGetRegionLiveChunks((InternalRegion) r, result); |
| } |
| |
| } |
| |
| } |
| |
| private void basicGetRegionLiveChunks(InternalRegion r, List<OffHeapStoredObject> result) { |
| for (Object key : r.keySet()) { |
| RegionEntry re = r.getRegionEntry(key); |
| if (re != null) { |
| /** |
| * value could be GATEWAY_SENDER_EVENT_IMPL_VALUE or region entry value. |
| */ |
| @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE) |
| Object value = re.getValue(); |
| if (value instanceof OffHeapStoredObject) { |
| result.add((OffHeapStoredObject) value); |
| } |
| } |
| } |
| } |
| |
| private OffHeapStoredObject allocateOffHeapStoredObject(int size) { |
| OffHeapStoredObject result = this.freeList.allocate(size); |
| int resultSize = result.getSize(); |
| stats.incObjects(1); |
| stats.incUsedMemory(resultSize); |
| stats.incFreeMemory(-resultSize); |
| notifyListeners(); |
| if (ReferenceCountHelper.trackReferenceCounts()) { |
| ReferenceCountHelper.refCountChanged(result.getAddress(), false, 1); |
| } |
| return result; |
| } |
| |
| @Override |
| public StoredObject allocate(int size) { |
| // System.out.println("allocating " + size); |
| OffHeapStoredObject result = allocateOffHeapStoredObject(size); |
| // ("allocated off heap object of size " + size + " @" + |
| // Long.toHexString(result.getMemoryAddress()), true); |
| return result; |
| } |
| |
| public static void debugLog(String msg, boolean logStack) { |
| if (logStack) { |
| logger.info(msg, new RuntimeException(msg)); |
| } else { |
| logger.info(msg); |
| } |
| } |
| |
| @Override |
| public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed) { |
| return allocateAndInitialize(v, isSerialized, isCompressed, null); |
| } |
| |
| @Override |
| public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed, |
| byte[] originalHeapData) { |
| long addr = OffHeapRegionEntryHelper.encodeDataAsAddress(v, isSerialized, isCompressed); |
| if (addr != 0L) { |
| return new TinyStoredObject(addr); |
| } |
| OffHeapStoredObject result = allocateOffHeapStoredObject(v.length); |
| // debugLog("allocated off heap object of size " + v.length + " @" + |
| // Long.toHexString(result.getMemoryAddress()), true); |
| // debugLog("allocated off heap object of size " + v.length + " @" + |
| // Long.toHexString(result.getMemoryAddress()) + "chunkSize=" + result.getSize() + " |
| // isSerialized=" + isSerialized + " v=" + Arrays.toString(v), true); |
| result.setSerializedValue(v); |
| result.setSerialized(isSerialized); |
| result.setCompressed(isCompressed); |
| if (originalHeapData != null) { |
| result = new OffHeapStoredObjectWithHeapForm(result, originalHeapData); |
| } |
| return result; |
| } |
| |
| @Override |
| public long getFreeMemory() { |
| return this.freeList.getFreeMemory(); |
| } |
| |
| @Override |
| public long getUsedMemory() { |
| return this.freeList.getUsedMemory(); |
| } |
| |
| @Override |
| public long getTotalMemory() { |
| return this.freeList.getTotalMemory(); |
| } |
| |
| @Override |
| public void close() { |
| try { |
| LifecycleListener.invokeBeforeClose(this); |
| } finally { |
| this.ooohml.close(); |
| if (Boolean.getBoolean(FREE_OFF_HEAP_MEMORY_PROPERTY)) { |
| realClose(); |
| } |
| } |
| } |
| |
| public static void freeOffHeapMemory() { |
| MemoryAllocatorImpl ma = singleton; |
| if (ma != null) { |
| ma.realClose(); |
| } |
| } |
| |
| private void realClose() { |
| // Removing this memory immediately can lead to a SEGV. See 47885. |
| if (setClosed()) { |
| this.freeList.freeSlabs(); |
| this.stats.close(); |
| singleton = null; |
| } |
| } |
| |
| private final AtomicBoolean closed = new AtomicBoolean(); |
| |
| private boolean isClosed() { |
| return this.closed.get(); |
| } |
| |
| /** |
| * Returns true if caller is the one who should close; false if some other thread is already |
| * closing. |
| */ |
| private boolean setClosed() { |
| return this.closed.compareAndSet(false, true); |
| } |
| |
| |
| FreeListManager getFreeListManager() { |
| return this.freeList; |
| } |
| |
| /** |
| * Return the slabId of the slab that contains the given addr. |
| */ |
| int findSlab(long addr) { |
| return this.freeList.findSlab(addr); |
| } |
| |
| @Override |
| public OffHeapMemoryStats getStats() { |
| return this.stats; |
| } |
| |
| @Override |
| public void addMemoryUsageListener(final MemoryUsageListener listener) { |
| synchronized (this.memoryUsageListeners) { |
| final MemoryUsageListener[] newMemoryUsageListeners = |
| Arrays.copyOf(this.memoryUsageListeners, this.memoryUsageListeners.length + 1); |
| newMemoryUsageListeners[this.memoryUsageListeners.length] = listener; |
| this.memoryUsageListeners = newMemoryUsageListeners; |
| } |
| } |
| |
| @Override |
| public void removeMemoryUsageListener(final MemoryUsageListener listener) { |
| synchronized (this.memoryUsageListeners) { |
| int listenerIndex = -1; |
| for (int i = 0; i < this.memoryUsageListeners.length; i++) { |
| if (this.memoryUsageListeners[i] == listener) { |
| listenerIndex = i; |
| break; |
| } |
| } |
| |
| if (listenerIndex != -1) { |
| final MemoryUsageListener[] newMemoryUsageListeners = |
| new MemoryUsageListener[this.memoryUsageListeners.length - 1]; |
| System.arraycopy(this.memoryUsageListeners, 0, newMemoryUsageListeners, 0, listenerIndex); |
| System.arraycopy(this.memoryUsageListeners, listenerIndex + 1, newMemoryUsageListeners, |
| listenerIndex, this.memoryUsageListeners.length - listenerIndex - 1); |
| this.memoryUsageListeners = newMemoryUsageListeners; |
| } |
| } |
| } |
| |
| void notifyListeners() { |
| final MemoryUsageListener[] savedListeners = this.memoryUsageListeners; |
| |
| if (savedListeners.length == 0) { |
| return; |
| } |
| |
| final long bytesUsed = getUsedMemory(); |
| for (int i = 0; i < savedListeners.length; i++) { |
| savedListeners[i].updateMemoryUsed(bytesUsed); |
| } |
| } |
| |
| static void validateAddress(long addr) { |
| validateAddressAndSize(addr, -1); |
| } |
| |
| static void validateAddressAndSize(long addr, int size) { |
| // if the caller does not have a "size" to provide then use -1 |
| if ((addr & 7) != 0) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("address was not 8 byte aligned: 0x").append(Long.toString(addr, 16)); |
| MemoryAllocatorImpl ma = MemoryAllocatorImpl.singleton; |
| if (ma != null) { |
| sb.append(". Valid addresses must be in one of the following ranges: "); |
| ma.freeList.getSlabDescriptions(sb); |
| } |
| throw new IllegalStateException(sb.toString()); |
| } |
| if (addr >= 0 && addr < 1024) { |
| throw new IllegalStateException("addr was smaller than expected 0x" + addr); |
| } |
| validateAddressAndSizeWithinSlab(addr, size, DO_EXPENSIVE_VALIDATION); |
| } |
| |
| static void validateAddressAndSizeWithinSlab(long addr, int size, boolean doExpensiveValidation) { |
| if (doExpensiveValidation) { |
| MemoryAllocatorImpl ma = MemoryAllocatorImpl.singleton; |
| if (ma != null) { |
| if (!ma.freeList.validateAddressAndSizeWithinSlab(addr, size)) { |
| throw new IllegalStateException(" address 0x" + Long.toString(addr, 16) |
| + " does not address the original slab memory"); |
| } |
| } |
| } |
| } |
| |
| public synchronized List<MemoryBlock> getOrphans(InternalCache cache) { |
| List<OffHeapStoredObject> liveChunks = this.freeList.getLiveChunks(); |
| List<OffHeapStoredObject> regionChunks = getRegionLiveChunks(cache); |
| liveChunks.removeAll(regionChunks); |
| List<MemoryBlock> orphans = new ArrayList<MemoryBlock>(); |
| for (OffHeapStoredObject chunk : liveChunks) { |
| orphans.add(new MemoryBlockNode(this, chunk)); |
| } |
| Collections.sort(orphans, new Comparator<MemoryBlock>() { |
| @Override |
| public int compare(MemoryBlock o1, MemoryBlock o2) { |
| return Long.compare(o1.getAddress(), o2.getAddress()); |
| } |
| }); |
| // this.memoryBlocks = new WeakReference<List<MemoryBlock>>(orphans); |
| return orphans; |
| } |
| |
| @Override |
| public MemoryInspector getMemoryInspector() { |
| return this.memoryInspector; |
| } |
| |
| } |