blob: 032eb0ff3d8a4599562528604f6df89c518724e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.block;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.indexstore.schema.SchemaGenerator;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.log4j.Logger;
/**
* Singleton class which will help in creating the segment properties
*/
public class SegmentPropertiesAndSchemaHolder {
/**
* Logger
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(SegmentPropertiesAndSchemaHolder.class.getName());
/**
* SegmentPropertiesAndSchemaHolder instance
*/
private static final SegmentPropertiesAndSchemaHolder INSTANCE =
new SegmentPropertiesAndSchemaHolder();
/**
* object level lock
*/
private static final Object lock = new Object();
/**
* counter for maintaining the index of segmentProperties
*/
private static final AtomicInteger segmentPropertiesIndexCounter = new AtomicInteger(0);
/**
* holds segmentPropertiesWrapper to segment ID and segmentProperties Index wrapper mapping.
* Will be used while invaliding a segment and drop table
*/
private Map<SegmentPropertiesWrapper, SegmentIdAndSegmentPropertiesIndexWrapper>
segmentPropWrapperToSegmentSetMap = new ConcurrentHashMap<>();
/**
* reverse mapping for segmentProperties index to segmentPropertiesWrapper
*/
private static Map<Integer, SegmentPropertiesWrapper> indexToSegmentPropertiesWrapperMapping =
new ConcurrentHashMap<>();
/**
* Map to be used for table level locking while populating segmentProperties
*/
private Map<String, Object> absoluteTableIdentifierByteMap = new ConcurrentHashMap<>();
/**
* private constructor for singleton instance
*/
private SegmentPropertiesAndSchemaHolder() {
}
public static SegmentPropertiesAndSchemaHolder getInstance() {
return INSTANCE;
}
/**
* Method to add the segment properties and avoid construction of new segment properties until
* the schema is not modified
*
* @param carbonTable
* @param columnsInTable
* @param segmentId
*/
public SegmentPropertiesWrapper addSegmentProperties(CarbonTable carbonTable,
List<ColumnSchema> columnsInTable, String segmentId) {
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper segmentPropertiesWrapper =
new SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(carbonTable,
columnsInTable);
SegmentIdAndSegmentPropertiesIndexWrapper segmentIdSetAndIndexWrapper =
this.segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
if (null == segmentIdSetAndIndexWrapper) {
synchronized (getOrCreateTableLock(carbonTable.getAbsoluteTableIdentifier())) {
segmentIdSetAndIndexWrapper =
this.segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
if (null == segmentIdSetAndIndexWrapper) {
// create new segmentProperties
segmentPropertiesWrapper.initSegmentProperties();
segmentPropertiesWrapper.addMinMaxColumns(carbonTable);
int segmentPropertiesIndex = segmentPropertiesIndexCounter.incrementAndGet();
indexToSegmentPropertiesWrapperMapping
.put(segmentPropertiesIndex, segmentPropertiesWrapper);
LOGGER.info("Constructing new SegmentProperties for table: " + carbonTable
.getCarbonTableIdentifier().getTableUniqueName()
+ ". Current size of segment properties" + " holder list is: "
+ indexToSegmentPropertiesWrapperMapping.size());
// populate the SegmentIdAndSegmentPropertiesIndexWrapper to maintain the set of segments
// having same SegmentPropertiesWrapper instance this will used to decide during the
// tblSegmentsProperties map clean-up for the invalid segments
segmentIdSetAndIndexWrapper =
new SegmentIdAndSegmentPropertiesIndexWrapper(segmentId, segmentPropertiesIndex);
segmentPropWrapperToSegmentSetMap
.put(segmentPropertiesWrapper, segmentIdSetAndIndexWrapper);
}
}
} else {
synchronized (getOrCreateTableLock(carbonTable.getAbsoluteTableIdentifier())) {
segmentIdSetAndIndexWrapper.addSegmentId(segmentId);
indexToSegmentPropertiesWrapperMapping
.get(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex())
.addMinMaxColumns(carbonTable);
}
}
return getSegmentPropertiesWrapper(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex());
}
/**
* Method to create table Level lock
*
* @param absoluteTableIdentifier
* @return
*/
private Object getOrCreateTableLock(AbsoluteTableIdentifier absoluteTableIdentifier) {
Object tableLock = absoluteTableIdentifierByteMap
.get(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName());
if (null == tableLock) {
synchronized (lock) {
tableLock = absoluteTableIdentifierByteMap
.get(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName());
if (null == tableLock) {
tableLock = new Object();
absoluteTableIdentifierByteMap
.put(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName(),
tableLock);
}
}
}
return tableLock;
}
/**
* Method to get the segment properties from given index
*
* @param segmentPropertiesIndex
* @return
*/
public SegmentProperties getSegmentProperties(int segmentPropertiesIndex) {
SegmentPropertiesWrapper segmentPropertiesWrapper =
getSegmentPropertiesWrapper(segmentPropertiesIndex);
if (null != segmentPropertiesWrapper) {
return segmentPropertiesWrapper.getSegmentProperties();
}
return null;
}
/**
* Method to get the segment properties from given index
*
* @param segmentPropertiesWrapperIndex
* @return
*/
public SegmentPropertiesWrapper getSegmentPropertiesWrapper(int segmentPropertiesWrapperIndex) {
return indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesWrapperIndex);
}
/**
* This method will remove the segment properties from the map on drop table
*
* @param absoluteTableIdentifier
*/
public void invalidate(AbsoluteTableIdentifier absoluteTableIdentifier) {
List<SegmentPropertiesWrapper> segmentPropertiesWrappersToBeRemoved = new ArrayList<>();
// remove segmentProperties wrapper entries from the copyOnWriteArrayList
for (Map.Entry<SegmentPropertiesWrapper, SegmentIdAndSegmentPropertiesIndexWrapper> entry :
segmentPropWrapperToSegmentSetMap.entrySet()) {
SegmentPropertiesWrapper segmentPropertiesWrapper = entry.getKey();
if (segmentPropertiesWrapper.getTableIdentifier().getCarbonTableIdentifier()
.getTableUniqueName()
.equals(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName())) {
SegmentIdAndSegmentPropertiesIndexWrapper value = entry.getValue();
// remove from the reverse mapping map
indexToSegmentPropertiesWrapperMapping.remove(value.getSegmentPropertiesIndex());
segmentPropertiesWrappersToBeRemoved.add(segmentPropertiesWrapper);
}
}
// remove all the segmentPropertiesWrapper entries from map
for (SegmentPropertiesWrapper segmentPropertiesWrapper : segmentPropertiesWrappersToBeRemoved) {
segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
}
// remove the table lock
absoluteTableIdentifierByteMap
.remove(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName());
}
/**
* Method to remove the given segment ID
*
* @param segmentId
* @param clearSegmentWrapperFromMap flag to specify whether to clear segmentPropertiesWrapper
* from Map if all the segment's using it have become stale
*/
public void invalidate(String segmentId, SegmentPropertiesWrapper segmentPropertiesWrapper,
boolean clearSegmentWrapperFromMap) {
SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper =
segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
if (segmentIdAndSegmentPropertiesIndexWrapper != null) {
synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) {
segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId);
// if after removal of given SegmentId, the segmentIdSet becomes empty that means this
// segmentPropertiesWrapper is not getting used at all. In that case this object can be
// removed from all the holders
if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
.isEmpty()) {
indexToSegmentPropertiesWrapperMapping
.remove(segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
} else if (!clearSegmentWrapperFromMap
&& segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
// min max columns can very when cache is modified. So even though entry is not required
// to be deleted from map clear the column cache so that it can filled again
segmentPropertiesWrapper.clear();
LOGGER.info("cleared min max for segmentProperties at index: "
+ segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
}
}
}
}
/**
* add segmentId at given segmentPropertyIndex
* Note: This method is getting used in extension with other features. Please do not remove
*
* @param segmentPropertiesIndex
* @param segmentId
*/
public void addSegmentId(int segmentPropertiesIndex, String segmentId) {
SegmentPropertiesWrapper segmentPropertiesWrapper =
indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex);
if (null != segmentPropertiesWrapper) {
SegmentIdAndSegmentPropertiesIndexWrapper segmentIdAndSegmentPropertiesIndexWrapper =
segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) {
segmentIdAndSegmentPropertiesIndexWrapper.addSegmentId(segmentId);
}
}
}
/**
* This class wraps tableIdentifier, columnsInTable and columnCardinality as a key to determine
* whether the SegmentProperties object can be reused.
*/
public static class SegmentPropertiesWrapper {
private static final Object taskSchemaLock = new Object();
private static final Object fileFooterSchemaLock = new Object();
private static final Object minMaxLock = new Object();
private List<ColumnSchema> columnsInTable;
private SegmentProperties segmentProperties;
private List<CarbonColumn> minMaxCacheColumns;
private CarbonTable carbonTable;
// in case of hybrid store we can have block as well as blocklet schema
// Scenario: When there is a hybrid store in which few loads are from legacy store which do
// not contain the blocklet information and hence they will be, by default have cache_level as
// BLOCK and few loads with latest store which contain the BLOCKLET information and have
// cache_level BLOCKLET. For these type of scenarios we need to have separate task and footer
// schemas. For all loads with/without blocklet info there will not be any additional cost
// of maintaining 2 variables
private CarbonRowSchema[] taskSummarySchemaForBlock;
private CarbonRowSchema[] taskSummarySchemaForBlocklet;
private CarbonRowSchema[] taskSummarySchemaForBlockWithOutFilePath;
private CarbonRowSchema[] taskSummarySchemaForBlockletWithOutFilePath;
private CarbonRowSchema[] fileFooterEntrySchemaForBlock;
private CarbonRowSchema[] fileFooterEntrySchemaForBlocklet;
public SegmentPropertiesWrapper(CarbonTable carbonTable, List<ColumnSchema> columnsInTable) {
this.carbonTable = carbonTable;
this.columnsInTable = columnsInTable;
}
public CarbonTable getCarbonTable() {
return this.carbonTable;
}
public void initSegmentProperties() {
segmentProperties = new SegmentProperties(columnsInTable);
}
public void addMinMaxColumns(CarbonTable carbonTable) {
if (null == minMaxCacheColumns || minMaxCacheColumns.isEmpty()) {
minMaxCacheColumns = carbonTable.getMinMaxCacheColumns(segmentProperties);
}
}
/**
* clear required fields
*/
public void clear() {
if (null != minMaxCacheColumns) {
minMaxCacheColumns = null;
}
taskSummarySchemaForBlock = null;
taskSummarySchemaForBlocklet = null;
taskSummarySchemaForBlockWithOutFilePath = null;
taskSummarySchemaForBlockletWithOutFilePath = null;
fileFooterEntrySchemaForBlock = null;
fileFooterEntrySchemaForBlocklet = null;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper)) {
return false;
}
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper other =
(SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper) obj;
return carbonTable.getAbsoluteTableIdentifier()
.equals(other.carbonTable.getAbsoluteTableIdentifier()) && checkColumnSchemaEquality(
columnsInTable, other.columnsInTable);
}
private boolean checkColumnSchemaEquality(List<ColumnSchema> obj1, List<ColumnSchema> obj2) {
if (obj1 == null || obj2 == null || (obj1.size() != obj2.size())) {
return false;
}
boolean exists = true;
for (int i = 0; i < obj1.size(); i++) {
if (!obj1.get(i).equalsWithStrictCheck(obj2.get(i))) {
exists = false;
break;
}
}
return exists;
}
private void sortList(List<ColumnSchema> columnSchemas) {
Collections.sort(columnSchemas, new Comparator<ColumnSchema>() {
@Override
public int compare(ColumnSchema o1, ColumnSchema o2) {
return o1.getColumnUniqueId().compareTo(o2.getColumnUniqueId());
}
});
}
@Override
public int hashCode() {
int allColumnsHashCode = 0;
// check column order
StringBuilder builder = new StringBuilder();
for (ColumnSchema columnSchema: columnsInTable) {
allColumnsHashCode = allColumnsHashCode + columnSchema.strictHashCode();
builder.append(columnSchema.getColumnUniqueId()).append(",");
}
return carbonTable.getAbsoluteTableIdentifier().hashCode() + allColumnsHashCode +
builder.toString().hashCode();
}
public AbsoluteTableIdentifier getTableIdentifier() {
return carbonTable.getAbsoluteTableIdentifier();
}
public SegmentProperties getSegmentProperties() {
return segmentProperties;
}
public List<ColumnSchema> getColumnsInTable() {
return columnsInTable;
}
public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean storeBlockletCount,
boolean filePathToBeStored) {
if (null == taskSummarySchemaForBlock && filePathToBeStored) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlock) {
taskSummarySchemaForBlock = SchemaGenerator
.createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
storeBlockletCount, filePathToBeStored);
}
}
} else if (null == taskSummarySchemaForBlockWithOutFilePath && !filePathToBeStored) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlockWithOutFilePath) {
taskSummarySchemaForBlockWithOutFilePath = SchemaGenerator
.createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
storeBlockletCount, filePathToBeStored);
}
}
}
if (filePathToBeStored) {
return taskSummarySchemaForBlock;
} else {
return taskSummarySchemaForBlockWithOutFilePath;
}
}
public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean storeBlockletCount,
boolean filePathToBeStored) {
if (null == taskSummarySchemaForBlocklet && filePathToBeStored) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlocklet) {
taskSummarySchemaForBlocklet = SchemaGenerator
.createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
storeBlockletCount, filePathToBeStored);
}
}
} else if (null == taskSummarySchemaForBlockletWithOutFilePath && !filePathToBeStored) {
synchronized (taskSchemaLock) {
if (null == taskSummarySchemaForBlockletWithOutFilePath) {
taskSummarySchemaForBlockletWithOutFilePath = SchemaGenerator
.createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
storeBlockletCount, filePathToBeStored);
}
}
}
if (filePathToBeStored) {
return taskSummarySchemaForBlocklet;
} else {
return taskSummarySchemaForBlockletWithOutFilePath;
}
}
public CarbonRowSchema[] getBlockFileFooterEntrySchema() {
if (null == fileFooterEntrySchemaForBlock) {
synchronized (fileFooterSchemaLock) {
if (null == fileFooterEntrySchemaForBlock) {
fileFooterEntrySchemaForBlock =
SchemaGenerator.createBlockSchema(segmentProperties, getMinMaxCacheColumns());
}
}
}
return fileFooterEntrySchemaForBlock;
}
public CarbonRowSchema[] getBlockletFileFooterEntrySchema() {
if (null == fileFooterEntrySchemaForBlocklet) {
synchronized (fileFooterSchemaLock) {
if (null == fileFooterEntrySchemaForBlocklet) {
fileFooterEntrySchemaForBlocklet =
SchemaGenerator.createBlockletSchema(segmentProperties, getMinMaxCacheColumns());
}
}
}
return fileFooterEntrySchemaForBlocklet;
}
public List<CarbonColumn> getMinMaxCacheColumns() {
if (null == minMaxCacheColumns) {
synchronized (minMaxLock) {
if (null == minMaxCacheColumns) {
addMinMaxColumns(carbonTable);
}
}
}
return minMaxCacheColumns;
}
}
/**
* holder for segmentId and segmentPropertiesIndex
*/
public static class SegmentIdAndSegmentPropertiesIndexWrapper {
/**
* set holding all unique segment Id's using the same segmentProperties
*/
private Set<String> segmentIdSet;
/**
* index which maps to segmentPropertiesWrapper Index from where segmentProperties
* can be retrieved
*/
private int segmentPropertiesIndex;
public SegmentIdAndSegmentPropertiesIndexWrapper(String segmentId, int segmentPropertiesIndex) {
segmentIdSet = new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
addSegmentId(segmentId);
this.segmentPropertiesIndex = segmentPropertiesIndex;
}
public void addSegmentId(String segmentId) {
segmentIdSet.add(segmentId);
}
public void removeSegmentId(String segmentId) {
segmentIdSet.remove(segmentId);
}
public int getSegmentPropertiesIndex() {
return segmentPropertiesIndex;
}
}
}