blob: c01a1ad77cd0157f11f730c23c03dfb7097885c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datamap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.MetadataProcessException;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
/**
* It maintains all the DataMaps in it.
*/
@InterfaceAudience.Internal
public final class DataMapStoreManager {
private static DataMapStoreManager instance = new DataMapStoreManager();
public Map<String, List<TableDataMap>> getAllDataMaps() {
return allDataMaps;
}
/**
* Contains the list of datamaps for each table.
*/
private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();
/**
* Contains the table name to the tablepath mapping.
*/
private Map<String, String> tablePathMap = new ConcurrentHashMap<>();
/**
* Contains the datamap catalog for each datamap provider.
*/
private Map<String, DataMapCatalog> dataMapCatalogs = null;
private Map<String, TableSegmentRefresher> segmentRefreshMap = new ConcurrentHashMap<>();
private DataMapSchemaStorageProvider provider = new DiskBasedDMSchemaStorageProvider(
CarbonProperties.getInstance().getSystemFolderLocation());
private static final Logger LOGGER =
LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
private final Object lockObject = new Object();
private DataMapStoreManager() {
}
/**
* It only gives the visible datamaps
*/
List<TableDataMap> getAllVisibleDataMap(CarbonTable carbonTable) throws IOException {
CarbonSessionInfo sessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
List<TableDataMap> allDataMaps = getAllDataMap(carbonTable);
Iterator<TableDataMap> dataMapIterator = allDataMaps.iterator();
while (dataMapIterator.hasNext()) {
TableDataMap dataMap = dataMapIterator.next();
String dbName = carbonTable.getDatabaseName();
String tableName = carbonTable.getTableName();
String dmName = dataMap.getDataMapSchema().getDataMapName();
// TODO: need support get the visible status of datamap without sessionInfo in the future
if (sessionInfo != null) {
boolean isDmVisible = sessionInfo.getSessionParams().getProperty(
String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE,
dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true");
if (!isDmVisible) {
LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s",
dmName, dbName, tableName));
dataMapIterator.remove();
}
} else {
String message = "Carbon session info is null";
LOGGER.info(message);
}
}
return allDataMaps;
}
/**
* It gives all datamaps except the default datamap.
*
* @return
*/
public List<TableDataMap> getAllDataMap(CarbonTable carbonTable) throws IOException {
List<DataMapSchema> dataMapSchemas = getDataMapSchemasOfTable(carbonTable);
List<TableDataMap> dataMaps = new ArrayList<>();
if (dataMapSchemas != null) {
for (DataMapSchema dataMapSchema : dataMapSchemas) {
RelationIdentifier identifier = dataMapSchema.getParentTables().get(0);
if (dataMapSchema.isIndexDataMap() && identifier.getTableId()
.equals(carbonTable.getTableId())) {
dataMaps.add(getDataMap(carbonTable, dataMapSchema));
}
}
}
return dataMaps;
}
/**
* It gives all datamap schemas of a given table.
*
*/
public List<DataMapSchema> getDataMapSchemasOfTable(CarbonTable carbonTable) throws IOException {
return provider.retrieveSchemas(carbonTable);
}
/**
* It gives all datamap schemas from store.
*/
public List<DataMapSchema> getAllDataMapSchemas() throws IOException {
return provider.retrieveAllSchemas();
}
public DataMapSchema getDataMapSchema(String dataMapName)
throws NoSuchDataMapException, IOException {
return provider.retrieveSchema(dataMapName);
}
/**
* Saves the datamap schema to storage
* @param dataMapSchema
*/
public void saveDataMapSchema(DataMapSchema dataMapSchema) throws IOException {
provider.saveSchema(dataMapSchema);
}
/**
* Drops the datamap schema from storage
* @param dataMapName
*/
public void dropDataMapSchema(String dataMapName) throws IOException {
provider.dropSchema(dataMapName);
}
/**
* Update the datamap schema after table rename
* This should be invoked after changing table name
* @param dataMapSchemaList
* @param newTableName
*/
public void updateDataMapSchema(List<DataMapSchema> dataMapSchemaList,
String newTableName) throws IOException {
List<DataMapSchema> newDataMapSchemas = new ArrayList<>();
for (DataMapSchema dataMapSchema : dataMapSchemaList) {
RelationIdentifier relationIdentifier = dataMapSchema.getRelationIdentifier();
String dataBaseName = relationIdentifier.getDatabaseName();
String tableId = relationIdentifier.getTableId();
String providerName = dataMapSchema.getProviderName();
// if the mv datamap,not be modified the relationIdentifier
if (!providerName.equalsIgnoreCase(MV.toString())) {
RelationIdentifier newRelationIdentifier = new RelationIdentifier(dataBaseName,
newTableName, tableId);
dataMapSchema.setRelationIdentifier(newRelationIdentifier);
}
List<RelationIdentifier> newParentTables = new ArrayList<>();
List<RelationIdentifier> parentTables = dataMapSchema.getParentTables();
for (RelationIdentifier identifier : parentTables) {
RelationIdentifier newParentTableIdentifier = new RelationIdentifier(
identifier.getDatabaseName(), newTableName, identifier.getTableId());
newParentTables.add(newParentTableIdentifier);
}
dataMapSchema.setParentTables(newParentTables);
newDataMapSchemas.add(dataMapSchema);
// frist drop old schema
String dataMapName = dataMapSchema.getDataMapName();
dropDataMapSchema(dataMapName);
}
// save new datamap schema to storage
for (DataMapSchema newDataMapSchema : newDataMapSchemas) {
saveDataMapSchema(newDataMapSchema);
}
}
/**
* Register datamap catalog for the datamap provider
* @param dataMapProvider
* @param dataMapSchema
*/
public void registerDataMapCatalog(DataMapProvider dataMapProvider,
DataMapSchema dataMapSchema) throws IOException {
synchronized (lockObject) {
initializeDataMapCatalogs(dataMapProvider);
String name = dataMapSchema.getProviderName().toLowerCase();
DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
if (dataMapCatalog == null) {
dataMapCatalog = dataMapProvider.createDataMapCatalog();
// If MVDataMapProvider, then createDataMapCatalog will return summaryDatasetCatalog
// instance, which needs to be added to dataMapCatalogs.
// For other datamaps, createDataMapCatalog will return null, so no need to register
if (dataMapCatalog != null) {
dataMapCatalogs.put(name, dataMapCatalog);
dataMapCatalog.registerSchema(dataMapSchema);
}
} else {
dataMapCatalog.registerSchema(dataMapSchema);
}
}
}
/**
* Unregister datamap catalog.
* @param dataMapSchema
*/
public synchronized void unRegisterDataMapCatalog(DataMapSchema dataMapSchema) {
if (dataMapCatalogs == null) {
return;
}
String name = dataMapSchema.getProviderName().toLowerCase();
DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
if (dataMapCatalog != null) {
dataMapCatalog.unregisterSchema(dataMapSchema.getDataMapName());
}
}
/**
* Get the datamap catalog for provider.
* @param providerName
* @return
*/
public synchronized DataMapCatalog getDataMapCatalog(DataMapProvider dataMapProvider,
String providerName) throws IOException {
initializeDataMapCatalogs(dataMapProvider);
return dataMapCatalogs.get(providerName.toLowerCase());
}
/**
* Initialize by reading all datamaps from store and re register it
* @param dataMapProvider
*/
private void initializeDataMapCatalogs(DataMapProvider dataMapProvider) throws IOException {
if (dataMapCatalogs == null) {
dataMapCatalogs = new ConcurrentHashMap<>();
List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
for (DataMapSchema schema : dataMapSchemas) {
if (schema.getProviderName()
.equalsIgnoreCase(dataMapProvider.getDataMapSchema().getProviderName())) {
DataMapCatalog dataMapCatalog =
dataMapCatalogs.get(schema.getProviderName().toLowerCase());
if (dataMapCatalog == null) {
dataMapCatalog = dataMapProvider.createDataMapCatalog();
if (null == dataMapCatalog) {
throw new RuntimeException("Internal Error.");
}
dataMapCatalogs.put(schema.getProviderName().toLowerCase(), dataMapCatalog);
}
try {
dataMapCatalog.registerSchema(schema);
} catch (Exception e) {
// Ignore the schema
LOGGER.error("Error while registering schema", e);
}
}
}
}
}
/**
* It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
*
* @param table
* @return
*/
public TableDataMap getDefaultDataMap(CarbonTable table) {
return getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA);
}
/**
* Get the datamap for reading data.
*/
public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
String tableId =
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableId();
List<TableDataMap> tableIndices = allDataMaps.get(table.getTableId());
if (tableIndices == null && !table.isTransactionalTable()) {
String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
if (keyUsingTablePath != null) {
tableId = keyUsingTablePath;
tableIndices = allDataMaps.get(tableId);
}
}
// in case of fileformat or sdk, when table is dropped or schema is changed the datamaps are
// not cleared, they need to be cleared by using API, so compare the columns, if not same, clear
// the datamaps on that table
if (allDataMaps.size() > 0 && !CollectionUtils.isEmpty(allDataMaps.get(tableId))
&& !allDataMaps.get(tableId).get(0).getTable().getTableInfo().getFactTable()
.getListOfColumns().equals(table.getTableInfo().getFactTable().getListOfColumns())) {
clearDataMaps(tableId);
tableIndices = null;
}
TableDataMap dataMap = null;
if (tableIndices != null) {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
}
if (dataMap == null) {
synchronized (tableId.intern()) {
tableIndices = allDataMaps.get(tableId);
if (tableIndices != null) {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
}
if (dataMap == null) {
try {
dataMap = createAndRegisterDataMap(table, dataMapSchema);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
if (dataMap == null) {
throw new RuntimeException("Datamap does not exist");
}
// This is done to handle the scenario of stale cache because of which schema mismatch
// exception can be thrown. Scenario: In case of carbondata used through FileFormat API,
// once a table is dropped and recreated with the same name again then because the dataMap
// contains the stale carbon table schema mismatch exception is thrown. To avoid such scenarios
// it is always better to update the carbon table object retrieved
dataMap.getDataMapFactory().setCarbonTable(table);
return dataMap;
}
private String getKeyUsingTablePath(String tablePath) {
if (tablePath != null) {
// Try get using table path
for (Map.Entry<String, String> entry : tablePathMap.entrySet()) {
if (new Path(entry.getValue()).equals(new Path(tablePath))) {
return entry.getKey();
}
}
}
return null;
}
/**
* Return a new datamap instance and registered in the store manager.
* The datamap is created using datamap name, datamap factory class and table identifier.
*/
public DataMapFactory getDataMapFactoryClass(CarbonTable table, DataMapSchema dataMapSchema)
throws MalformedDataMapCommandException {
try {
// try to create datamap by reflection to test whether it is a valid DataMapFactory class
return (DataMapFactory)
Class.forName(dataMapSchema.getProviderName()).getConstructors()[0]
.newInstance(table, dataMapSchema);
} catch (ClassNotFoundException e) {
// try to create DataMapClassProvider instance by taking providerName as short name
return DataMapRegistry.getDataMapFactoryByShortName(table, dataMapSchema);
} catch (Throwable e) {
throw new MetadataProcessException(
"failed to get DataMap factory for'" + dataMapSchema.getProviderName() + "'", e);
}
}
/**
* registered in the store manager.
* The datamap is created using datamap name, datamap factory class and table identifier.
*/
// TODO: make it private
public TableDataMap createAndRegisterDataMap(CarbonTable table,
DataMapSchema dataMapSchema) throws MalformedDataMapCommandException {
DataMapFactory dataMapFactory = getDataMapFactoryClass(table, dataMapSchema);
return registerDataMap(table, dataMapSchema, dataMapFactory);
}
public TableDataMap registerDataMap(CarbonTable table,
DataMapSchema dataMapSchema, DataMapFactory dataMapFactory) {
String tableUniqueName = table.getCarbonTableIdentifier().getTableUniqueName();
// Just update the segmentRefreshMap with the table if not added.
getTableSegmentRefresher(table);
List<TableDataMap> tableIndices = allDataMaps.get(table.getTableId());
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
tableIndices = allDataMaps.get(table.getTableId());
}
}
if (tableIndices == null) {
tableIndices = new ArrayList<>();
}
BlockletDetailsFetcher blockletDetailsFetcher;
SegmentPropertiesFetcher segmentPropertiesFetcher = null;
if (dataMapFactory instanceof BlockletDetailsFetcher) {
blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory;
} else {
blockletDetailsFetcher = getBlockletDetailsFetcher(table);
}
segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
TableDataMap dataMap = new TableDataMap(table,
dataMapSchema, dataMapFactory, blockletDetailsFetcher, segmentPropertiesFetcher);
tableIndices.add(dataMap);
allDataMaps.put(table.getTableId(), tableIndices);
tablePathMap.put(table.getTableId(), table.getTablePath());
return dataMap;
}
private TableDataMap getTableDataMap(String dataMapName, List<TableDataMap> tableIndices) {
TableDataMap dataMap = null;
for (TableDataMap tableDataMap : tableIndices) {
if (tableDataMap.getDataMapSchema().getDataMapName().equals(dataMapName)) {
dataMap = tableDataMap;
break;
}
}
return dataMap;
}
/**
* Clear the invalid segments from all the datamaps of the table
*
* @param carbonTable table for which the operation has to be performed.
* @param segments segments which have to be cleared from cache.
*/
public void clearInvalidSegments(CarbonTable carbonTable, List<String> segments)
throws IOException {
getDefaultDataMap(carbonTable).clear(segments);
List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
for (TableDataMap dataMap: allDataMap) {
dataMap.clear(segments);
}
}
public List<String> getSegmentsToBeRefreshed(CarbonTable carbonTable,
SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
throws IOException {
List<String> toBeCleanedSegments = new ArrayList<>();
for (Segment filteredSegment : filteredSegmentToAccess) {
boolean refreshNeeded = getTableSegmentRefresher(carbonTable).isRefreshNeeded(filteredSegment,
updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
if (refreshNeeded) {
toBeCleanedSegments.add(filteredSegment.getSegmentNo());
}
}
return toBeCleanedSegments;
}
public void refreshSegmentCacheIfRequired(CarbonTable carbonTable,
SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
throws IOException {
List<String> toBeCleanedSegments =
getSegmentsToBeRefreshed(carbonTable, updateStatusManager, filteredSegmentToAccess);
if (toBeCleanedSegments.size() > 0) {
clearInvalidSegments(carbonTable, toBeCleanedSegments);
}
}
/**
* Clear the datamap/datamaps of a table from memory
*
* @param identifier Table identifier
*/
public void clearDataMaps(AbsoluteTableIdentifier identifier) {
clearDataMaps(identifier, true);
}
/**
* Clear the datamap/datamaps of a table from memory
*
* @param identifier Table identifier
*/
public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) {
String tableId = identifier.getCarbonTableIdentifier().getTableId();
if (launchJob) {
// carbon table need to lookup only if launch job is set.
CarbonTable carbonTable = getCarbonTable(identifier);
if (null != carbonTable) {
String jobClassName;
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
jobClassName = DataMapUtil.DISTRIBUTED_JOB_NAME;
} else {
jobClassName = DataMapUtil.EMBEDDED_JOB_NAME;
}
try {
DataMapUtil.executeClearDataMapJob(carbonTable, jobClassName);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
}
} else {
// remove carbon table from meta cache if launchJob is false as this would be called in
// executor side.
CarbonMetadata.getInstance()
.removeTable(identifier.getDatabaseName(), identifier.getTableName());
}
List<TableDataMap> tableIndices =
allDataMaps.get(identifier.getCarbonTableIdentifier().getTableId());
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
if (keyUsingTablePath != null) {
tableId = keyUsingTablePath;
}
}
segmentRefreshMap.remove(tableId);
clearDataMaps(tableId);
allDataMaps.remove(tableId);
tablePathMap.remove(tableId);
}
/**
* This method returns the carbonTable from identifier
* @param identifier
* @return
*/
public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) {
CarbonTable carbonTable = null;
carbonTable = CarbonMetadata.getInstance()
.getCarbonTable(identifier.getDatabaseName(), identifier.getTableName());
if (carbonTable == null) {
try {
carbonTable = CarbonTable
.buildFromTablePath(identifier.getTableName(), identifier.getDatabaseName(),
identifier.getTablePath(), identifier.getCarbonTableIdentifier().getTableId());
} catch (IOException e) {
LOGGER.warn("failed to get carbon table from table Path" + e.getMessage(), e);
// ignoring exception
}
}
return carbonTable;
}
/**
* this methods clears the datamap of table from memory
*/
public void clearDataMaps(String tableId) {
List<TableDataMap> tableIndices = allDataMaps.get(tableId);
if (tableIndices != null) {
for (TableDataMap tableDataMap : tableIndices) {
if (tableDataMap != null) {
// clear the segmentMap in BlockletDetailsFetcher,else the Segment will remain in executor
// and the query fails as we will check whether the blocklet contains in the index or not
tableDataMap.getBlockletDetailsFetcher().clear();
tableDataMap.clear();
}
}
}
allDataMaps.remove(tableId);
tablePathMap.remove(tableId);
}
/**
* Clear the datamap/datamaps of a table from memory and disk
*
* @param identifier Table identifier
*/
public void deleteDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
CarbonTable carbonTable = getCarbonTable(identifier);
if (carbonTable == null) {
// If carbon table is null then it means table is already deleted, therefore return without
// doing any further changes.
return;
}
String tableId = identifier.getCarbonTableIdentifier().getTableId();
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) {
try {
DataMapUtil
.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME, dataMapName);
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
} else {
List<TableDataMap> tableIndices = allDataMaps.get(tableId);
if (tableIndices != null) {
int i = 0;
for (TableDataMap tableDataMap : tableIndices) {
if (tableDataMap != null && dataMapName
.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
try {
DataMapUtil
.executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME, dataMapName);
tableDataMap.clear();
} catch (IOException e) {
LOGGER.error("clear dataMap job failed", e);
// ignoring the exception
}
tableDataMap.deleteDatamapData();
tableIndices.remove(i);
break;
}
i++;
}
allDataMaps.put(tableId, tableIndices);
}
}
}
/**
* is datamap exist
* @return true if exist, else return false
*/
public boolean isDataMapExist(String tableId, String dmName) {
List<TableDataMap> tableDataMaps = allDataMaps.get(tableId);
if (tableDataMaps != null) {
for (TableDataMap dm : tableDataMaps) {
if (dm != null && dmName.equalsIgnoreCase(dm.getDataMapSchema().getDataMapName())) {
return true;
}
}
}
return false;
}
/**
* Get the blocklet datamap factory to get the detail information of blocklets
*
* @param table
* @return
*/
private BlockletDetailsFetcher getBlockletDetailsFetcher(CarbonTable table) {
TableDataMap blockletMap = getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA);
return (BlockletDetailsFetcher) blockletMap.getDataMapFactory();
}
/**
* Returns the singleton instance
*
* @return
*/
public static DataMapStoreManager getInstance() {
return instance;
}
/**
* Get the TableSegmentRefresher for the table. If not existed then add one and return.
*/
public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) {
String tableId = table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableId();
if (segmentRefreshMap.get(tableId) == null) {
segmentRefreshMap.put(tableId, new TableSegmentRefresher(table));
}
return segmentRefreshMap.get(tableId);
}
/**
* Keep track of the segment refresh time.
*/
public static class TableSegmentRefresher {
// This map stores the latest segment refresh time.So in case of update/delete we check the
// time against this map.
private Map<String, SegmentRefreshInfo> segmentRefreshTime = new HashMap<>();
// This map keeps the manual refresh entries from users. It is mainly used for partition
// altering.
private Map<String, Boolean> manualSegmentRefresh = new HashMap<>();
TableSegmentRefresher(CarbonTable table) {
SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(table);
SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails();
for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
SegmentRefreshInfo segmentRefreshInfo;
if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0);
} else {
segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
}
segmentRefreshTime.put(updateVO.getSegmentId(), segmentRefreshInfo);
}
}
public boolean isRefreshNeeded(Segment seg, UpdateVO updateVo) throws IOException {
SegmentRefreshInfo segmentRefreshInfo =
seg.getSegmentRefreshInfo(updateVo);
String segmentId = seg.getSegmentNo();
if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null) {
return false;
}
if (segmentRefreshTime.get(segmentId) == null
&& segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
segmentRefreshTime.put(segmentId, segmentRefreshInfo);
return true;
}
if (manualSegmentRefresh.get(segmentId) != null && manualSegmentRefresh.get(segmentId)) {
manualSegmentRefresh.put(segmentId, false);
return true;
}
boolean isRefresh = segmentRefreshInfo.compare(segmentRefreshTime.get(segmentId));
if (isRefresh) {
segmentRefreshTime.remove(segmentId);
}
return isRefresh;
}
public void refreshSegments(List<String> segmentIds) {
for (String segmentId : segmentIds) {
manualSegmentRefresh.put(segmentId, true);
}
}
public boolean isRefreshNeeded(String segmentId) {
if (manualSegmentRefresh.get(segmentId) != null && manualSegmentRefresh.get(segmentId)) {
manualSegmentRefresh.put(segmentId, false);
return true;
} else {
return false;
}
}
}
public synchronized void clearInvalidDataMaps(CarbonTable carbonTable, List<String> segmentNos,
String dataMapToClear) throws IOException {
List<TableDataMap> dataMaps = getAllDataMap(carbonTable);
List<TableDataMap> remainingDataMaps = new ArrayList<>();
if (StringUtils.isNotEmpty(dataMapToClear)) {
Iterator<TableDataMap> dataMapIterator = dataMaps.iterator();
while (dataMapIterator.hasNext()) {
TableDataMap tableDataMap = dataMapIterator.next();
if (dataMapToClear.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
for (String segment: segmentNos) {
tableDataMap.deleteSegmentDatamapData(segment);
}
tableDataMap.clear();
} else {
remainingDataMaps.add(tableDataMap);
}
}
allDataMaps.put(carbonTable.getTableId(), remainingDataMaps);
} else {
clearDataMaps(carbonTable.getTableId());
// clear the segment properties cache from executor
SegmentPropertiesAndSchemaHolder.getInstance()
.invalidate(carbonTable.getAbsoluteTableIdentifier());
}
}
}