| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.memory; |
| |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * Memory manager to keep track of |
| * all memory for storing the sorted data |
| */ |
| public class UnsafeSortMemoryManager { |
| |
| /** |
| * logger |
| */ |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName()); |
| |
| /** |
| * off-heap is enabled |
| */ |
| private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, |
| CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)); |
| |
| /** |
| * map to keep task id to memory blocks |
| */ |
| private static Map<String, Set<MemoryBlock>> taskIdToMemoryBlockMap; |
| |
| /** |
| * singleton instance |
| */ |
| public static final UnsafeSortMemoryManager INSTANCE; |
| |
| /** |
| * total memory available for sort data storage |
| */ |
| private long totalMemory; |
| |
| /** |
| * current memory used |
| */ |
| private long memoryUsed; |
| |
| /** |
| * current memory allocator |
| */ |
| private MemoryAllocator allocator; |
| |
| static { |
| long size; |
| try { |
| size = Long.parseLong(CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB)); |
| } catch (Exception e) { |
| size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT; |
| LOGGER.info("Wrong memory size given, so setting default value to " + size); |
| } |
| if (size < CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT) { |
| size = CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT; |
| LOGGER.warn(String.format( |
| "It is not recommended to set unsafe sort memory size less than %dMB," |
| + " so setting default value to %d", |
| CarbonCommonConstants.CARBON_SORT_STORAGE_INMEMORY_IN_MB_DEFAULT, size)); |
| } |
| |
| long takenSize = size * 1024 * 1024; |
| MemoryAllocator allocator; |
| if (offHeap) { |
| allocator = MemoryAllocator.UNSAFE; |
| } else { |
| long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100; |
| if (takenSize > maxMemory) { |
| takenSize = maxMemory; |
| } |
| allocator = MemoryAllocator.HEAP; |
| } |
| INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator); |
| taskIdToMemoryBlockMap = new HashMap<>(); |
| } |
| |
| private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) { |
| this.totalMemory = totalMemory; |
| this.allocator = allocator; |
| LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator); |
| } |
| |
| /** |
| * total usable memory for sort memory manager |
| * @return size in bytes |
| */ |
| public long getUsableMemory() { |
| return totalMemory; |
| } |
| |
| public synchronized void freeMemory(String taskId, MemoryBlock memoryBlock) { |
| if (taskIdToMemoryBlockMap.containsKey(taskId)) { |
| taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); |
| } |
| if (!memoryBlock.isFreedStatus()) { |
| allocator.free(memoryBlock); |
| memoryUsed -= memoryBlock.size(); |
| memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(String.format( |
| "Freeing sort memory block (%s) with size: %d, current available memory is: %d", |
| memoryBlock.toString(), memoryBlock.size(), totalMemory - memoryUsed)); |
| } |
| } |
| } |
| |
| /** |
| * Below method will be used to free all the |
| * memory occupied for a task, this will be useful |
| * when in case of task failure we need to clear all the memory occupied |
| * @param taskId |
| */ |
| public synchronized void freeMemoryAll(String taskId) { |
| Set<MemoryBlock> memoryBlockSet = null; |
| memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId); |
| long occupiedMemory = 0; |
| if (null != memoryBlockSet) { |
| Iterator<MemoryBlock> iterator = memoryBlockSet.iterator(); |
| MemoryBlock memoryBlock = null; |
| while (iterator.hasNext()) { |
| memoryBlock = iterator.next(); |
| if (!memoryBlock.isFreedStatus()) { |
| occupiedMemory += memoryBlock.size(); |
| allocator.free(memoryBlock); |
| } |
| } |
| } |
| memoryUsed -= occupiedMemory; |
| memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug( |
| String.format("Freeing sort memory of size: %d, current available memory is: %d", |
| occupiedMemory, totalMemory - memoryUsed)); |
| } |
| LOGGER.info(String.format( |
| "Total sort memory used after task %s is %d. Current running tasks are: %s", |
| taskId, memoryUsed, StringUtils.join(taskIdToMemoryBlockMap.keySet(), ", "))); |
| } |
| |
| /** |
| * Below method will be used to check whether memory required is |
| * available or not |
| * |
| * @param required |
| * @return if memory available |
| */ |
| public synchronized boolean isMemoryAvailable(long required) { |
| return memoryUsed + required < totalMemory; |
| } |
| |
| public synchronized MemoryBlock allocateMemory(String taskId, long memoryRequested) { |
| if (memoryUsed + memoryRequested <= totalMemory) { |
| MemoryBlock allocate = allocator.allocate(memoryRequested); |
| memoryUsed += allocate.size(); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(String.format( |
| "Sort Memory block is created with size %d. Total memory used %d Bytes, left %d Bytes", |
| allocate.size(), memoryUsed, totalMemory - memoryUsed)); |
| } |
| Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); |
| if (null == listOfMemoryBlock) { |
| listOfMemoryBlock = new HashSet<>(); |
| taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); |
| } |
| listOfMemoryBlock.add(allocate); |
| return allocate; |
| } |
| return null; |
| } |
| |
| public static boolean isOffHeap() { |
| return offHeap; |
| } |
| } |