| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.datastore.block; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; |
| import org.apache.carbondata.core.indexstore.schema.SchemaGenerator; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; |
| import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; |
| |
| import org.apache.log4j.Logger; |
| |
| /** |
| * Singleton class which will help in creating the segment properties |
| */ |
| public class SegmentPropertiesAndSchemaHolder { |
| |
| /** |
| * Logger |
| */ |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(SegmentPropertiesAndSchemaHolder.class.getName()); |
| /** |
| * SegmentPropertiesAndSchemaHolder instance |
| */ |
| private static final SegmentPropertiesAndSchemaHolder INSTANCE = |
| new SegmentPropertiesAndSchemaHolder(); |
| /** |
| * object level lock |
| */ |
| private static final Object lock = new Object(); |
| /** |
| * counter for maintaining the index of segmentProperties |
| */ |
| private static final AtomicInteger segmentPropertiesIndexCounter = new AtomicInteger(0); |
| /** |
| * holds segmentPropertiesWrapper to segment ID and segmentProperties Index wrapper mapping. |
| * Will be used while invaliding a segment and drop table |
| */ |
| private Map<SegmentPropertiesWrapper, SegmentIdAndSegmentPropertiesIndexWrapper> |
| segmentPropWrapperToSegmentSetMap = new ConcurrentHashMap<>(); |
| /** |
| * reverse mapping for segmentProperties index to segmentPropertiesWrapper |
| */ |
| private static Map<Integer, SegmentPropertiesWrapper> indexToSegmentPropertiesWrapperMapping = |
| new ConcurrentHashMap<>(); |
| /** |
| * Map to be used for table level locking while populating segmentProperties |
| */ |
| private Map<String, Object> absoluteTableIdentifierByteMap = new ConcurrentHashMap<>(); |
| |
| /** |
| * private constructor for singleton instance |
| */ |
| private SegmentPropertiesAndSchemaHolder() { |
| |
| } |
| |
| public static SegmentPropertiesAndSchemaHolder getInstance() { |
| return INSTANCE; |
| } |
| |
| /** |
| * Method to add the segment properties and avoid construction of new segment properties until |
| * the schema is not modified |
| * |
| * @param carbonTable |
| * @param columnsInTable |
| * @param segmentId |
| */ |
| public SegmentPropertiesWrapper addSegmentProperties(CarbonTable carbonTable, |
| List<ColumnSchema> columnsInTable, String segmentId) { |
| SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper = |
| new SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(carbonTable, |
| columnsInTable); |
| SegmentIdAndSegmentPropertiesIndexWrapper segmentIdSetAndIndexWrapper = |
| this.segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper); |
| if (null == segmentIdSetAndIndexWrapper) { |
| synchronized (getOrCreateTableLock(carbonTable.getAbsoluteTableIdentifier())) { |
| segmentIdSetAndIndexWrapper = |
| this.segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper); |
| if (null == segmentIdSetAndIndexWrapper) { |
| // create new segmentProperties |
| segmentPropertiesWrapper.initSegmentProperties(); |
| segmentPropertiesWrapper.addMinMaxColumns(carbonTable); |
| int segmentPropertiesIndex = segmentPropertiesIndexCounter.incrementAndGet(); |
| indexToSegmentPropertiesWrapperMapping |
| .put(segmentPropertiesIndex, segmentPropertiesWrapper); |
| LOGGER.info("Constructing new SegmentProperties for table: " + carbonTable |
| .getCarbonTableIdentifier().getTableUniqueName() |
| + ". Current size of segment properties" + " holder list is: " |
| + indexToSegmentPropertiesWrapperMapping.size()); |
| // populate the SegmentIdAndSegmentPropertiesIndexWrapper to maintain the set of segments |
| // having same SegmentPropertiesWrapper instance this will used to decide during the |
| // tblSegmentsProperties map clean-up for the invalid segments |
| segmentIdSetAndIndexWrapper = |
| new SegmentIdAndSegmentPropertiesIndexWrapper(segmentId, segmentPropertiesIndex); |
| segmentPropWrapperToSegmentSetMap |
| .put(segmentPropertiesWrapper, segmentIdSetAndIndexWrapper); |
| } |
| } |
| } else { |
| synchronized (getOrCreateTableLock(carbonTable.getAbsoluteTableIdentifier())) { |
| segmentIdSetAndIndexWrapper.addSegmentId(segmentId); |
| indexToSegmentPropertiesWrapperMapping |
| .get(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex()) |
| .addMinMaxColumns(carbonTable); |
| } |
| } |
| return getSegmentPropertiesWrapper(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex()); |
| } |
| |
| /** |
| * Method to create table Level lock |
| * |
| * @param absoluteTableIdentifier |
| * @return |
| */ |
| private Object getOrCreateTableLock(AbsoluteTableIdentifier absoluteTableIdentifier) { |
| Object tableLock = absoluteTableIdentifierByteMap |
| .get(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()); |
| if (null == tableLock) { |
| synchronized (lock) { |
| tableLock = absoluteTableIdentifierByteMap |
| .get(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()); |
| if (null == tableLock) { |
| tableLock = new Object(); |
| absoluteTableIdentifierByteMap |
| .put(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName(), |
| tableLock); |
| } |
| } |
| } |
| return tableLock; |
| } |
| |
| /** |
| * Method to get the segment properties from given index |
| * |
| * @param segmentPropertiesIndex |
| * @return |
| */ |
| public SegmentProperties getSegmentProperties(int segmentPropertiesIndex) { |
| SegmentPropertiesWrapper segmentPropertiesWrapper = |
| getSegmentPropertiesWrapper(segmentPropertiesIndex); |
| if (null != segmentPropertiesWrapper) { |
| return segmentPropertiesWrapper.getSegmentProperties(); |
| } |
| return null; |
| } |
| |
| /** |
| * Method to get the segment properties from given index |
| * |
| * @param segmentPropertiesWrapperIndex |
| * @return |
| */ |
| public SegmentPropertiesWrapper getSegmentPropertiesWrapper(int segmentPropertiesWrapperIndex) { |
| return indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesWrapperIndex); |
| } |
| |
| /** |
| * This method will remove the segment properties from the map on drop table |
| * |
| * @param absoluteTableIdentifier |
| */ |
| public void invalidate(AbsoluteTableIdentifier absoluteTableIdentifier) { |
| List<SegmentPropertiesWrapper> segmentPropertiesWrappersToBeRemoved = new ArrayList<>(); |
| // remove segmentProperties wrapper entries from the copyOnWriteArrayList |
| for (Map.Entry<SegmentPropertiesWrapper, SegmentIdAndSegmentPropertiesIndexWrapper> entry : |
| segmentPropWrapperToSegmentSetMap.entrySet()) { |
| SegmentPropertiesWrapper segmentPropertiesWrapper = entry.getKey(); |
| if (segmentPropertiesWrapper.getTableIdentifier().getCarbonTableIdentifier() |
| .getTableUniqueName() |
| .equals(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName())) { |
| SegmentIdAndSegmentPropertiesIndexWrapper value = entry.getValue(); |
| // remove from the reverse mapping map |
| indexToSegmentPropertiesWrapperMapping.remove(value.getSegmentPropertiesIndex()); |
| segmentPropertiesWrappersToBeRemoved.add(segmentPropertiesWrapper); |
| } |
| } |
| // remove all the segmentPropertiesWrapper entries from map |
| for (SegmentPropertiesWrapper segmentPropertiesWrapper : segmentPropertiesWrappersToBeRemoved) { |
| segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper); |
| } |
| // remove the table lock |
| absoluteTableIdentifierByteMap |
| .remove(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()); |
| } |
| |
| /** |
| * Method to remove the given segment ID |
| * |
| * @param segmentId |
| * @param clearSegmentWrapperFromMap flag to specify whether to clear segmentPropertiesWrapper |
| * from Map if all the segment's using it have become stale |
| */ |
| public void invalidate(String segmentId, SegmentPropertiesWrapper segmentPropertiesWrapper, |
| boolean clearSegmentWrapperFromMap) { |
| SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper = |
| segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper); |
| if (segmentIdAndSegmentPropertiesIndexWrapper != null) { |
| synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) { |
| segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId); |
| // if after removal of given SegmentId, the segmentIdSet becomes empty that means this |
| // segmentPropertiesWrapper is not getting used at all. In that case this object can be |
| // removed from all the holders |
| if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet |
| .isEmpty()) { |
| indexToSegmentPropertiesWrapperMapping |
| .remove(segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex()); |
| segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper); |
| } else if (!clearSegmentWrapperFromMap |
| && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) { |
| // min max columns can very when cache is modified. So even though entry is not required |
| // to be deleted from map clear the column cache so that it can filled again |
| segmentPropertiesWrapper.clear(); |
| LOGGER.info("cleared min max for segmentProperties at index: " |
| + segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * add segmentId at given segmentPropertyIndex |
| * Note: This method is getting used in extension with other features. Please do not remove |
| * |
| * @param segmentPropertiesIndex |
| * @param segmentId |
| */ |
| public void addSegmentId(int segmentPropertiesIndex, String segmentId) { |
| SegmentPropertiesWrapper segmentPropertiesWrapper = |
| indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex); |
| if (null != segmentPropertiesWrapper) { |
| SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper = |
| segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper); |
| synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) { |
| segmentIdAndSegmentPropertiesIndexWrapper.addSegmentId(segmentId); |
| } |
| } |
| } |
| |
| /** |
| * This class wraps tableIdentifier, columnsInTable and columnCardinality as a key to determine |
| * whether the SegmentProperties object can be reused. |
| */ |
| public static class SegmentPropertiesWrapper { |
| |
| private static final Object taskSchemaLock = new Object(); |
| private static final Object fileFooterSchemaLock = new Object(); |
| private static final Object minMaxLock = new Object(); |
| |
| private List<ColumnSchema> columnsInTable; |
| private SegmentProperties segmentProperties; |
| private List<CarbonColumn> minMaxCacheColumns; |
| private CarbonTable carbonTable; |
| // in case of hybrid store we can have block as well as blocklet schema |
| // Scenario: When there is a hybrid store in which few loads are from legacy store which do |
| // not contain the blocklet information and hence they will be, by default have cache_level as |
| // BLOCK and few loads with latest store which contain the BLOCKLET information and have |
| // cache_level BLOCKLET. For these type of scenarios we need to have separate task and footer |
| // schemas. For all loads with/without blocklet info there will not be any additional cost |
| // of maintaining 2 variables |
| private CarbonRowSchema[] taskSummarySchemaForBlock; |
| private CarbonRowSchema[] taskSummarySchemaForBlocklet; |
| private CarbonRowSchema[] taskSummarySchemaForBlockWithOutFilePath; |
| private CarbonRowSchema[] taskSummarySchemaForBlockletWithOutFilePath; |
| private CarbonRowSchema[] fileFooterEntrySchemaForBlock; |
| private CarbonRowSchema[] fileFooterEntrySchemaForBlocklet; |
| |
| public SegmentPropertiesWrapper(CarbonTable carbonTable, List<ColumnSchema> columnsInTable) { |
| this.carbonTable = carbonTable; |
| this.columnsInTable = columnsInTable; |
| } |
| |
| public CarbonTable getCarbonTable() { |
| return this.carbonTable; |
| } |
| |
| public void initSegmentProperties() { |
| segmentProperties = new SegmentProperties(columnsInTable); |
| } |
| |
| public void addMinMaxColumns(CarbonTable carbonTable) { |
| if (null == minMaxCacheColumns || minMaxCacheColumns.isEmpty()) { |
| minMaxCacheColumns = carbonTable.getMinMaxCacheColumns(segmentProperties); |
| } |
| } |
| |
| /** |
| * clear required fields |
| */ |
| public void clear() { |
| if (null != minMaxCacheColumns) { |
| minMaxCacheColumns = null; |
| } |
| |
| taskSummarySchemaForBlock = null; |
| taskSummarySchemaForBlocklet = null; |
| taskSummarySchemaForBlockWithOutFilePath = null; |
| taskSummarySchemaForBlockletWithOutFilePath = null; |
| fileFooterEntrySchemaForBlock = null; |
| fileFooterEntrySchemaForBlocklet = null; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (!(obj instanceof SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper)) { |
| return false; |
| } |
| SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper other = |
| (SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper) obj; |
| return carbonTable.getAbsoluteTableIdentifier() |
| .equals(other.carbonTable.getAbsoluteTableIdentifier()) && checkColumnSchemaEquality( |
| columnsInTable, other.columnsInTable); |
| } |
| |
| private boolean checkColumnSchemaEquality(List<ColumnSchema> obj1, List<ColumnSchema> obj2) { |
| if (obj1 == null || obj2 == null || (obj1.size() != obj2.size())) { |
| return false; |
| } |
| boolean exists = true; |
| for (int i = 0; i < obj1.size(); i++) { |
| if (!obj1.get(i).equalsWithStrictCheck(obj2.get(i))) { |
| exists = false; |
| break; |
| } |
| } |
| return exists; |
| } |
| |
| private void sortList(List<ColumnSchema> columnSchemas) { |
| Collections.sort(columnSchemas, new Comparator<ColumnSchema>() { |
| @Override |
| public int compare(ColumnSchema o1, ColumnSchema o2) { |
| return o1.getColumnUniqueId().compareTo(o2.getColumnUniqueId()); |
| } |
| }); |
| } |
| |
| @Override |
| public int hashCode() { |
| int allColumnsHashCode = 0; |
| // check column order |
| StringBuilder builder = new StringBuilder(); |
| for (ColumnSchema columnSchema: columnsInTable) { |
| allColumnsHashCode = allColumnsHashCode + columnSchema.strictHashCode(); |
| builder.append(columnSchema.getColumnUniqueId()).append(","); |
| } |
| return carbonTable.getAbsoluteTableIdentifier().hashCode() + allColumnsHashCode + |
| builder.toString().hashCode(); |
| } |
| |
| public AbsoluteTableIdentifier getTableIdentifier() { |
| return carbonTable.getAbsoluteTableIdentifier(); |
| } |
| |
| public SegmentProperties getSegmentProperties() { |
| return segmentProperties; |
| } |
| |
| public List<ColumnSchema> getColumnsInTable() { |
| return columnsInTable; |
| } |
| |
| public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean storeBlockletCount, |
| boolean filePathToBeStored) { |
| if (null == taskSummarySchemaForBlock && filePathToBeStored) { |
| synchronized (taskSchemaLock) { |
| if (null == taskSummarySchemaForBlock) { |
| taskSummarySchemaForBlock = SchemaGenerator |
| .createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(), |
| storeBlockletCount, filePathToBeStored); |
| } |
| } |
| } else if (null == taskSummarySchemaForBlockWithOutFilePath && !filePathToBeStored) { |
| synchronized (taskSchemaLock) { |
| if (null == taskSummarySchemaForBlockWithOutFilePath) { |
| taskSummarySchemaForBlockWithOutFilePath = SchemaGenerator |
| .createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(), |
| storeBlockletCount, filePathToBeStored); |
| } |
| } |
| } |
| if (filePathToBeStored) { |
| return taskSummarySchemaForBlock; |
| } else { |
| return taskSummarySchemaForBlockWithOutFilePath; |
| } |
| |
| } |
| |
| public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean storeBlockletCount, |
| boolean filePathToBeStored) { |
| if (null == taskSummarySchemaForBlocklet && filePathToBeStored) { |
| synchronized (taskSchemaLock) { |
| if (null == taskSummarySchemaForBlocklet) { |
| taskSummarySchemaForBlocklet = SchemaGenerator |
| .createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(), |
| storeBlockletCount, filePathToBeStored); |
| } |
| } |
| } else if (null == taskSummarySchemaForBlockletWithOutFilePath && !filePathToBeStored) { |
| synchronized (taskSchemaLock) { |
| if (null == taskSummarySchemaForBlockletWithOutFilePath) { |
| taskSummarySchemaForBlockletWithOutFilePath = SchemaGenerator |
| .createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(), |
| storeBlockletCount, filePathToBeStored); |
| } |
| } |
| } |
| if (filePathToBeStored) { |
| return taskSummarySchemaForBlocklet; |
| } else { |
| return taskSummarySchemaForBlockletWithOutFilePath; |
| } |
| } |
| |
| public CarbonRowSchema[] getBlockFileFooterEntrySchema() { |
| if (null == fileFooterEntrySchemaForBlock) { |
| synchronized (fileFooterSchemaLock) { |
| if (null == fileFooterEntrySchemaForBlock) { |
| fileFooterEntrySchemaForBlock = |
| SchemaGenerator.createBlockSchema(segmentProperties, getMinMaxCacheColumns()); |
| } |
| } |
| } |
| return fileFooterEntrySchemaForBlock; |
| } |
| |
| public CarbonRowSchema[] getBlockletFileFooterEntrySchema() { |
| if (null == fileFooterEntrySchemaForBlocklet) { |
| synchronized (fileFooterSchemaLock) { |
| if (null == fileFooterEntrySchemaForBlocklet) { |
| fileFooterEntrySchemaForBlocklet = |
| SchemaGenerator.createBlockletSchema(segmentProperties, getMinMaxCacheColumns()); |
| } |
| } |
| } |
| return fileFooterEntrySchemaForBlocklet; |
| } |
| |
| public List<CarbonColumn> getMinMaxCacheColumns() { |
| if (null == minMaxCacheColumns) { |
| synchronized (minMaxLock) { |
| if (null == minMaxCacheColumns) { |
| addMinMaxColumns(carbonTable); |
| } |
| } |
| } |
| return minMaxCacheColumns; |
| } |
| |
| } |
| |
| /** |
| * holder for segmentId and segmentPropertiesIndex |
| */ |
| public static class SegmentIdAndSegmentPropertiesIndexWrapper { |
| |
| /** |
| * set holding all unique segment Id's using the same segmentProperties |
| */ |
| private Set<String> segmentIdSet; |
| /** |
| * index which maps to segmentPropertiesWrapper Index from where segmentProperties |
| * can be retrieved |
| */ |
| private int segmentPropertiesIndex; |
| |
| public SegmentIdAndSegmentPropertiesIndexWrapper(String segmentId, int segmentPropertiesIndex) { |
| segmentIdSet = new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); |
| addSegmentId(segmentId); |
| this.segmentPropertiesIndex = segmentPropertiesIndex; |
| } |
| |
| public void addSegmentId(String segmentId) { |
| segmentIdSet.add(segmentId); |
| } |
| |
| public void removeSegmentId(String segmentId) { |
| segmentIdSet.remove(segmentId); |
| } |
| |
| public int getSegmentPropertiesIndex() { |
| return segmentPropertiesIndex; |
| } |
| } |
| } |