temp branch just for qa test (#54108)

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [ ] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 624361b..966a7d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -912,8 +912,13 @@
         }
 
         // 2. check if rollup index already exists
-        if (olapTable.hasMaterializedIndex(rollupIndexName)) {
-            throw new DdlException("Rollup index[" + rollupIndexName + "] already exists");
+        olapTable.readLock();
+        try {
+            if (olapTable.hasMaterializedIndex(rollupIndexName)) {
+                throw new DdlException("Rollup index[" + rollupIndexName + "] already exists");
+            }
+        } finally {
+            olapTable.readUnlock();
         }
 
         // 3. check if rollup columns are valid
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java
index ae1763f..4b9a64d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java
@@ -282,7 +282,7 @@
     }
 
     public PartitionInfo toPartitionInfo(List<Column> schema, Map<String, Long> partitionNameToId, boolean isTemp)
-            throws DdlException, AnalysisException {
+            throws DdlException {
         throw new NotImplementedException("toPartitionInfo not implemented");
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 6809bdc..3de29c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -844,6 +844,7 @@
                 // Table does not exist or atomic restore
                 if (localTbl == null || isAtomicRestore) {
                     OlapTable remoteOlapTbl = (OlapTable) remoteTbl;
+                    remoteOlapTbl.writeLock();
                     // Retain only expected restore partitions in this table;
                     Set<String> allPartNames = remoteOlapTbl.getPartitionNames();
                     for (String partName : allPartNames) {
@@ -883,6 +884,7 @@
                         LOG.debug("put remote table {} to restoredTbls", remoteOlapTbl.getName());
                     }
                     stagingRestoreTables.add(remoteOlapTbl);
+                    remoteOlapTbl.writeUnlock();
                 }
             } // end of all restore olap tables
 
@@ -992,7 +994,9 @@
                         .getType() == TableType.VIEW) && isAtomicRestore) {
                     tableName = tableAliasWithAtomicRestore(tableName);
                 }
+                restoreTbl.writeLock();
                 restoreTbl.setName(tableName);
+                restoreTbl.writeUnlock();
                 restoredTbls.add(restoreTbl);
             }
 
@@ -1661,7 +1665,8 @@
             if (reserveReplica) {
                 restoreReplicaAlloc = remotePartitionInfo.getReplicaAllocation(remotePartId);
             }
-            localPartitionInfo.addPartition(restorePart.getId(), false, remotePartitionInfo.getItem(remotePartId),
+            localPartitionInfo.addPartition(restorePart.getId(), false,
+                    remotePartitionInfo.getItem(remotePartId),
                     remoteDataProperty, restoreReplicaAlloc,
                     remotePartitionInfo.getIsInMemory(remotePartId),
                     remotePartitionInfo.getIsMutable(remotePartId));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 8b5ddd2..459d6d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -977,11 +977,7 @@
                     tableProperty.modifyTableProperties(analyzedDynamicPartition);
                     tableProperty.buildDynamicProperty();
                 }
-                for (ReplicaAllocation alloc : table.getPartitionInfo().getPartitionReplicaAllocations().values()) {
-                    Map<Tag, Short> allocMap = alloc.getAllocMap();
-                    allocMap.clear();
-                    allocMap.putAll(replicaAlloc.getAllocMap());
-                }
+                table.getPartitionInfo().modifyReplicaAlloc(replicaAlloc);
             } finally {
                 table.writeUnlock();
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1b8894a..0e1eac5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4812,11 +4812,6 @@
         catalogIf.dropTable(dbName, tableName, true, false, ifExists, false);
     }
 
-    public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay,
-                                      Long recycleTime) {
-        return getInternalCatalog().unprotectDropTable(db, table, isForceDrop, isReplay, recycleTime);
-    }
-
     public void replayDropTable(Database db, long tableId, boolean isForceDrop,
                                 Long recycleTime) throws MetaNotFoundException {
         getInternalCatalog().replayDropTable(db, tableId, isForceDrop, recycleTime);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java
index f82af44..f4305bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java
@@ -217,7 +217,7 @@
             String partitionName = partition.getName();
 
             List<PartitionKey> partitionKeys = entry.getValue().getItems();
-            List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionInfo::toPartitionValue)
+            List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionKey::toPartitionValue)
                     .collect(Collectors.toList());
             PartitionKeyDesc partitionKeyDesc = PartitionKeyDesc.createIn(inValues);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index cccd1bc..d5bf4ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -82,14 +82,14 @@
 
     @Override
     public PartitionKeyDesc toPartitionKeyDesc() {
-        List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionInfo::toPartitionValue)
+        List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionKey::toPartitionValue)
                 .collect(Collectors.toList());
         return PartitionKeyDesc.createIn(inValues);
     }
 
     @Override
     public PartitionKeyDesc toPartitionKeyDesc(int pos) throws AnalysisException {
-        List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionInfo::toPartitionValue)
+        List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionKey::toPartitionValue)
                 .collect(Collectors.toList());
         Set<List<PartitionValue>> res = Sets.newHashSet();
         for (List<PartitionValue> values : inValues) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index ea3c800..9afcac7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -17,13 +17,11 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.alter.MaterializedViewHandler;
 import org.apache.doris.analysis.AggregateInfo;
 import org.apache.doris.analysis.ColumnDef;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
 import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.IndexDef;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.backup.Status;
@@ -52,14 +50,11 @@
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import org.apache.doris.mtmv.MTMVRefreshContext;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.mtmv.MTMVSnapshotIf;
 import org.apache.doris.mtmv.MTMVVersionSnapshot;
-import org.apache.doris.nereids.hint.Hint;
-import org.apache.doris.nereids.hint.UseMvHint;
 import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
 import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -136,17 +131,32 @@
 
     @Override
     public Map<Long, PartitionItem> getOriginPartitions(CatalogRelation scan) {
-        return getPartitionInfo().getIdToItem(false);
+        readLock();
+        try {
+            return getPartitionInfo().getIdToItem(false);
+        } finally {
+            readUnlock();
+        }
     }
 
     @Override
     public Object getPartitionMetaVersion(CatalogRelation scan) throws RpcException {
-        return getVisibleVersion();
+        readLock();
+        try {
+            return getVisibleVersion();
+        } finally {
+            readUnlock();
+        }
     }
 
     @Override
     public long getPartitionMetaLoadTimeMillis(CatalogRelation scan) {
-        return getVisibleVersionTime();
+        readLock();
+        try {
+            return getVisibleVersionTime();
+        } finally {
+            readUnlock();
+        }
     }
 
     public enum OlapTableState {
@@ -279,121 +289,177 @@
     }
 
     public BinlogConfig getBinlogConfig() {
-        return getOrCreatTableProperty().getBinlogConfig();
+        readLock();
+        try {
+            return getOrCreatTableProperty().getBinlogConfig();
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setBinlogConfig(BinlogConfig binlogConfig) {
+        writeLock();
         getOrCreatTableProperty().setBinlogConfig(binlogConfig);
+        writeUnlock();
     }
 
     public void setIsBeingSynced(boolean isBeingSynced) {
+        writeLock();
         getOrCreatTableProperty().modifyTableProperties(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED,
                 String.valueOf(isBeingSynced));
+        writeUnlock();
     }
 
     public String getStorageVaultName() {
-        if (Strings.isNullOrEmpty(getStorageVaultId())) {
-            return "";
+        readLock();
+        try {
+            if (Strings.isNullOrEmpty(getStorageVaultId())) {
+                return "";
+            }
+            return Env.getCurrentEnv().getStorageVaultMgr().getVaultNameById(getStorageVaultId());
+        } finally {
+            readUnlock();
         }
-        return Env.getCurrentEnv().getStorageVaultMgr().getVaultNameById(getStorageVaultId());
     }
 
     public void setStorageVaultId(String storageVaultId) throws DdlException {
         if (Strings.isNullOrEmpty(storageVaultId)) {
             throw new DdlException("Invalid storage vault id, please set an available storage vault");
         }
+        writeLock();
         getOrCreatTableProperty().setStorageVaultId(storageVaultId);
+        writeUnlock();
     }
 
     public String getStorageVaultId() {
-        return getOrCreatTableProperty().getStorageVaultId();
+        readLock();
+        try {
+            return getOrCreatTableProperty().getStorageVaultId();
+        } finally {
+            readUnlock();
+        }
     }
 
     public boolean isBeingSynced() {
-        return getOrCreatTableProperty().isBeingSynced();
+        readLock();
+        try {
+            return getOrCreatTableProperty().isBeingSynced();
+        } finally {
+            readUnlock();
+        }
     }
 
     public boolean isTemporaryPartition(long partitionId) {
-        return tempPartitions.hasPartition(partitionId);
+        readLock();
+        try {
+            return tempPartitions.hasPartition(partitionId);
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setTableProperty(TableProperty tableProperty) {
+        writeLock();
         this.tableProperty = tableProperty;
+        writeUnlock();
     }
 
     public TableProperty getTableProperty() {
-        return this.tableProperty;
+        readLock();
+        try {
+            return this.tableProperty;
+        } finally {
+            readUnlock();
+        }
     }
 
     public boolean dynamicPartitionExists() {
-        return tableProperty != null
+        readLock();
+        try {
+            return tableProperty != null
                 && tableProperty.getDynamicPartitionProperty() != null
                 && tableProperty.getDynamicPartitionProperty().isExist();
+        } finally {
+            readUnlock();
+        }
     }
 
     public boolean isZOrderSort() {
-        return tableProperty != null
+        readLock();
+        try {
+            return tableProperty != null
                 && tableProperty.getDataSortInfo() != null
                 && tableProperty.getDataSortInfo().getSortType() == TSortType.ZORDER;
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setBaseIndexId(long baseIndexId) {
+        writeLock();
         this.baseIndexId = baseIndexId;
+        writeUnlock();
     }
 
     public long getBaseIndexId() {
-        return baseIndexId;
+        readLock();
+        try {
+            return baseIndexId;
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setState(OlapTableState state) {
+        writeLock();
         this.state = state;
+        writeUnlock();
     }
 
     public OlapTableState getState() {
-        return state;
+        readLock();
+        try {
+            return state;
+        } finally {
+            readUnlock();
+        }
     }
 
     public List<Index> getIndexes() {
-        if (indexes == null) {
-            return Lists.newArrayList();
+        readLock();
+        try {
+            if (indexes == null) {
+                return Lists.newArrayList();
+            }
+            return indexes.getIndexes();
+        } finally {
+            readUnlock();
         }
-        return indexes.getIndexes();
     }
 
     public List<Long> getIndexIds() {
-        if (indexes == null) {
-            return Lists.newArrayList();
+        readLock();
+        try {
+            if (indexes == null) {
+                return Lists.newArrayList();
+            }
+            return indexes.getIndexIds();
+        } finally {
+            readUnlock();
         }
-        return indexes.getIndexIds();
-    }
-
-    /**
-     * Checks if the table contains at least one index of the specified type.
-     * @param indexType The index type to check for
-     * @return true if the table has at least one index of the specified type, false otherwise
-     */
-    public boolean hasIndexOfType(IndexDef.IndexType indexType) {
-        if (indexes == null) {
-            return false;
-        }
-        return indexes.getIndexes().stream()
-                .anyMatch(index -> index.getIndexType() == indexType);
     }
 
     @Override
     public TableIndexes getTableIndexes() {
-        return indexes;
-    }
-
-    public Map<String, Index> getIndexesMap() {
-        Map<String, Index> indexMap = new HashMap<>();
-        if (indexes != null) {
-            Optional.ofNullable(indexes.getIndexes()).orElse(Collections.emptyList()).forEach(
-                    i -> indexMap.put(i.getIndexName(), i));
+        readLock();
+        try {
+            return indexes;
+        } finally {
+            readUnlock();
         }
-        return indexMap;
     }
 
+    // outer locked
     public void checkAndSetName(String newName, boolean onlyCheck) throws DdlException {
         // check if rollup has same name
         for (String idxName : getIndexNameToId().keySet()) {
@@ -406,6 +472,7 @@
         }
     }
 
+    // outer locked
     public void setName(String newName) {
         // change name in indexNameToId
         long baseIndexId = indexNameToId.remove(this.name);
@@ -428,9 +495,15 @@
     }
 
     public boolean hasMaterializedIndex(String indexName) {
-        return indexNameToId.containsKey(indexName);
+        readLock();
+        try {
+            return indexNameToId.containsKey(indexName);
+        } finally {
+            readUnlock();
+        }
     }
 
+    // outer locked
     public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash,
             short shortKeyColumnCount, TStorageType storageType, KeysType keysType) {
         setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, shortKeyColumnCount, storageType,
@@ -438,6 +511,7 @@
                 null, null); // indexes is null by default
     }
 
+    // outer locked
     public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash,
             short shortKeyColumnCount, TStorageType storageType, KeysType keysType, List<Index> indexes) {
         setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, shortKeyColumnCount, storageType,
@@ -445,13 +519,7 @@
                 null, indexes);
     }
 
-    public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion,
-            int schemaHash,
-            short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement origStmt) {
-        setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, shortKeyColumnCount, storageType,
-                keysType, origStmt, null); // indexes is null by default
-    }
-
+    // outer locked
     public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion,
             int schemaHash,
             short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement origStmt,
@@ -492,6 +560,7 @@
     // rebuild the full schema of table
     // the order of columns in fullSchema is meaningless
     public void rebuildFullSchema() {
+        writeLock();
         fullSchema.clear();
         nameToColumn.clear();
         for (Column baseColumn : indexIdToMeta.get(baseIndexId).getSchema()) {
@@ -508,11 +577,13 @@
             // Column maybe renamed, rebuild the column name map
             indexMeta.initColumnNameMap();
         }
+        writeUnlock();
         if (LOG.isDebugEnabled()) {
             LOG.debug("after rebuild full schema. table {}, schema size: {}", id, fullSchema.size());
         }
     }
 
+    // outer locked
     public void rebuildDistributionInfo() {
         if (!Objects.equals(defaultDistributionInfo.getType(), DistributionInfoType.HASH)) {
             return;
@@ -531,6 +602,7 @@
                 .forEach(info -> ((HashDistributionInfo) info).setDistributionColumns(newDistributionColumns));
     }
 
+    // outer locked
     public boolean deleteIndexInfo(String indexName) {
         if (!indexNameToId.containsKey(indexName)) {
             return false;
@@ -550,25 +622,35 @@
     }
 
     public Map<String, Long> getIndexNameToId() {
-        return indexNameToId;
+        readLock();
+        try {
+            return indexNameToId;
+        } finally {
+            readUnlock();
+        }
     }
 
     public Long getIndexIdByName(String indexName) {
-        return indexNameToId.get(indexName);
-    }
-
-    public Long getSegmentV2FormatIndexId() {
-        String v2RollupIndexName = MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + getName();
-        return indexNameToId.get(v2RollupIndexName);
+        readLock();
+        try {
+            return indexNameToId.get(indexName);
+        } finally {
+            readUnlock();
+        }
     }
 
     public String getIndexNameById(long indexId) {
-        for (Map.Entry<String, Long> entry : indexNameToId.entrySet()) {
-            if (entry.getValue() == indexId) {
-                return entry.getKey();
+        readLock();
+        try {
+            for (Map.Entry<String, Long> entry : indexNameToId.entrySet()) {
+                if (entry.getValue() == indexId) {
+                    return entry.getKey();
+                }
             }
+            return null;
+        } finally {
+            readUnlock();
         }
-        return null;
     }
 
     public List<Long> getAllTabletIds() {
@@ -591,107 +673,59 @@
     }
 
     public Map<Long, MaterializedIndexMeta> getVisibleIndexIdToMeta() {
-        Map<Long, MaterializedIndexMeta> visibleMVs = Maps.newHashMap();
-        List<MaterializedIndex> mvs = getVisibleIndex();
-        for (MaterializedIndex mv : mvs) {
-            visibleMVs.put(mv.getId(), indexIdToMeta.get(mv.getId()));
-        }
-        return visibleMVs;
-    }
-
-    public Long getBestMvIdWithHint(List<Long> orderedMvs) {
-        Optional<UseMvHint> useMvHint = ConnectContext.get().getStatementContext().getUseMvHint("USE_MV");
-        Optional<UseMvHint> noUseMvHint = ConnectContext.get().getStatementContext().getUseMvHint("NO_USE_MV");
-        List<String> names = new ArrayList<>();
-        InternalCatalog catalog = Env.getCurrentEnv().getInternalCatalog();
-        names.add(catalog.getName());
-        names.add(getDBName());
-        names.add(this.name);
-        if (useMvHint.isPresent() && noUseMvHint.isPresent()) {
-            return getMvIdWithUseMvHint(useMvHint.get(), names, orderedMvs);
-        } else if (useMvHint.isPresent()) {
-            return getMvIdWithUseMvHint(useMvHint.get(), names, orderedMvs);
-        } else if (noUseMvHint.isPresent()) {
-            return getMvIdWithNoUseMvHint(noUseMvHint.get(), names, orderedMvs);
-        }
-        return orderedMvs.get(0);
-    }
-
-    private Long getMvIdWithUseMvHint(UseMvHint useMvHint, List<String> names, List<Long> orderedMvs) {
-        if (useMvHint.isAllMv()) {
-            useMvHint.setStatus(Hint.HintStatus.SYNTAX_ERROR);
-            useMvHint.setErrorMessage("use_mv hint should only have one mv in one table: "
-                    + this.name);
-            return orderedMvs.get(0);
-        } else {
-            for (Map.Entry<String, Long> entry : indexNameToId.entrySet()) {
-                String mvName = entry.getKey();
-                names.add(mvName);
-                if (useMvHint.getUseMvTableColumnMap().containsKey(names)) {
-                    useMvHint.getUseMvTableColumnMap().put(names, true);
-                    Long choosedIndexId = indexNameToId.get(mvName);
-                    if (orderedMvs.contains(choosedIndexId)) {
-                        useMvHint.setStatus(Hint.HintStatus.SUCCESS);
-                        return choosedIndexId;
-                    } else {
-                        useMvHint.setStatus(Hint.HintStatus.SYNTAX_ERROR);
-                        useMvHint.setErrorMessage("do not have mv: " + mvName + " in table: " + this.name);
-                    }
-                }
+        readLock();
+        try {
+            Map<Long, MaterializedIndexMeta> visibleMVs = Maps.newHashMap();
+            List<MaterializedIndex> mvs = getVisibleIndex();
+            for (MaterializedIndex mv : mvs) {
+                visibleMVs.put(mv.getId(), indexIdToMeta.get(mv.getId()));
             }
+            return visibleMVs;
+        } finally {
+            readUnlock();
         }
-        return orderedMvs.get(0);
-    }
-
-    private Long getMvIdWithNoUseMvHint(UseMvHint noUseMvHint, List<String> names, List<Long> orderedMvs) {
-        if (noUseMvHint.isAllMv()) {
-            noUseMvHint.setStatus(Hint.HintStatus.SUCCESS);
-            return getBaseIndex().getId();
-        } else {
-            Set<Long> forbiddenIndexIds = Sets.newHashSet();
-            for (Map.Entry<String, Long> entry : indexNameToId.entrySet()) {
-                String mvName = entry.getKey();
-                names.add(mvName);
-                if (noUseMvHint.getNoUseMvTableColumnMap().containsKey(names)) {
-                    noUseMvHint.getNoUseMvTableColumnMap().put(names, true);
-                    Long forbiddenIndexId = indexNameToId.get(mvName);
-                    forbiddenIndexIds.add(forbiddenIndexId);
-                }
-            }
-            for (int i = 0; i < orderedMvs.size(); i++) {
-                if (!forbiddenIndexIds.contains(orderedMvs.get(i))) {
-                    return orderedMvs.get(i);
-                }
-            }
-        }
-        return orderedMvs.get(0);
     }
 
     public List<MaterializedIndex> getVisibleIndex() {
-        Optional<Partition> partition = idToPartition.values().stream().findFirst();
-        if (!partition.isPresent()) {
-            partition = tempPartitions.getAllPartitions().stream().findFirst();
-        }
-        return partition.isPresent() ? partition.get().getMaterializedIndices(IndexExtState.VISIBLE)
+        readLock();
+        try {
+            Optional<Partition> partition = idToPartition.values().stream().findFirst();
+            if (!partition.isPresent()) {
+                partition = tempPartitions.getAllPartitions().stream().findFirst();
+            }
+            return partition.isPresent() ? partition.get().getMaterializedIndices(IndexExtState.VISIBLE)
                 : Collections.emptyList();
+        } finally {
+            readUnlock();
+        }
     }
 
     public MaterializedIndex getBaseIndex() {
-        Optional<Partition> partition = idToPartition.values().stream().findFirst();
-        if (!partition.isPresent()) {
-            partition = tempPartitions.getAllPartitions().stream().findFirst();
+        readLock();
+        try {
+            Optional<Partition> partition = idToPartition.values().stream().findFirst();
+            if (!partition.isPresent()) {
+                partition = tempPartitions.getAllPartitions().stream().findFirst();
+            }
+            return partition.isPresent() ? partition.get().getBaseIndex() : null;
+        } finally {
+            readUnlock();
         }
-        return partition.isPresent() ? partition.get().getBaseIndex() : null;
     }
 
     public Column getVisibleColumn(String columnName) {
-        for (MaterializedIndexMeta meta : getVisibleIndexIdToMeta().values()) {
-            Column target = meta.getColumnByDefineName(columnName);
-            if (target != null) {
-                return target;
+        readLock();
+        try {
+            for (MaterializedIndexMeta meta : getVisibleIndexIdToMeta().values()) {
+                Column target = meta.getColumnByDefineName(columnName);
+                if (target != null) {
+                    return target;
+                }
             }
+            return null;
+        } finally {
+            readUnlock();
         }
-        return null;
     }
 
     /**
@@ -713,21 +747,28 @@
 
     @Override
     public long getUpdateTime() {
-        long updateTime = tempPartitions.getUpdateTime();
-        for (Partition p : idToPartition.values()) {
-            if (p.getVisibleVersionTime() > updateTime) {
-                updateTime = p.getVisibleVersionTime();
+        readLock();
+        try {
+            long updateTime = tempPartitions.getUpdateTime();
+            for (Partition p : idToPartition.values()) {
+                if (p.getVisibleVersionTime() > updateTime) {
+                    updateTime = p.getVisibleVersionTime();
+                }
             }
+            return updateTime;
+        } finally {
+            readUnlock();
         }
-        return updateTime;
     }
 
     // this is only for schema change.
+    // outer locked
     public void renameIndexForSchemaChange(String name, String newName) {
         long idxId = indexNameToId.remove(name);
         indexNameToId.put(newName, idxId);
     }
 
+    // outer locked
     public void renameColumnNamePrefix(long idxId) {
         List<Column> columns = indexIdToMeta.get(idxId).getSchema();
         for (Column column : columns) {
@@ -769,6 +810,7 @@
         }
     }
 
+    // outer locked
     public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restoreReplicaAlloc,
             boolean reserveReplica, boolean reserveColocate, List<ColocatePersistInfo> colocatePersistInfos,
             String srcDbName) {
@@ -995,41 +1037,76 @@
     }
 
     public int getIndexNumber() {
-        return indexIdToMeta.size();
+        readLock();
+        try {
+            return indexIdToMeta.size();
+        } finally {
+            readUnlock();
+        }
     }
 
     public Map<Long, MaterializedIndexMeta> getIndexIdToMeta() {
-        return indexIdToMeta;
+        readLock();
+        try {
+            return new HashMap<>(indexIdToMeta);
+        } finally {
+            readUnlock();
+        }
     }
 
     public Map<Long, MaterializedIndexMeta> getCopyOfIndexIdToMeta() {
-        return new HashMap<>(indexIdToMeta);
+        readLock();
+        try {
+            return new HashMap<>(indexIdToMeta);
+        } finally {
+            readUnlock();
+        }
     }
 
     public Map<Long, MaterializedIndexMeta> getCopiedIndexIdToMeta() {
-        return new HashMap<>(indexIdToMeta);
+        readLock();
+        try {
+            return new HashMap<>(indexIdToMeta);
+        } finally {
+            readUnlock();
+        }
     }
 
     public MaterializedIndexMeta getIndexMetaByIndexId(long indexId) {
-        return indexIdToMeta.get(indexId);
+        readLock();
+        try {
+            return indexIdToMeta.get(indexId);
+        } finally {
+            readUnlock();
+        }
     }
 
     public List<Long> getIndexIdListExceptBaseIndex() {
-        List<Long> result = Lists.newArrayList();
-        for (Long indexId : indexIdToMeta.keySet()) {
-            if (indexId != baseIndexId) {
-                result.add(indexId);
+        readLock();
+        try {
+            List<Long> result = Lists.newArrayList();
+            for (Long indexId : indexIdToMeta.keySet()) {
+                if (indexId != baseIndexId) {
+                    result.add(indexId);
+                }
             }
+            return result;
+        } finally {
+            readUnlock();
         }
-        return result;
     }
 
     public List<Long> getIndexIdList() {
-        List<Long> result = Lists.newArrayList();
-        for (Long indexId : indexIdToMeta.keySet()) {
-            result.add(indexId);
+        readLock();
+        try {
+            List<Long> result = Lists.newArrayList();
+            for (Long indexId : indexIdToMeta.keySet()) {
+                result.add(indexId);
+            }
+            return result;
+        } finally {
+            readUnlock();
         }
-        return result;
     }
 
     // schema
@@ -1039,20 +1116,30 @@
 
     // schema
     public Map<Long, List<Column>> getIndexIdToSchema(boolean full) {
-        Map<Long, List<Column>> result = Maps.newHashMap();
-        for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
-            result.put(entry.getKey(), entry.getValue().getSchema(full));
+        readLock();
+        try {
+            Map<Long, List<Column>> result = Maps.newHashMap();
+            for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
+                result.put(entry.getKey(), entry.getValue().getSchema(full));
+            }
+            return result;
+        } finally {
+            readUnlock();
         }
-        return result;
     }
 
     // get schemas with a copied column list
     public Map<Long, List<Column>> getCopiedIndexIdToSchema(boolean full) {
-        Map<Long, List<Column>> result = Maps.newHashMap();
-        for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
-            result.put(entry.getKey(), new ArrayList<>(entry.getValue().getSchema(full)));
+        readLock();
+        try {
+            Map<Long, List<Column>> result = Maps.newHashMap();
+            for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
+                result.put(entry.getKey(), new ArrayList<>(entry.getValue().getSchema(full)));
+            }
+            return result;
+        } finally {
+            readUnlock();
         }
-        return result;
     }
 
     public List<Column> getSchemaByIndexId(Long indexId) {
@@ -1060,11 +1147,16 @@
     }
 
     public List<Column> getSchemaByIndexId(Long indexId, boolean full) {
-        if (full) {
-            return indexIdToMeta.get(indexId).getSchema();
-        } else {
-            return indexIdToMeta.get(indexId).getSchema().stream().filter(Column::isVisible)
+        readLock();
+        try {
+            if (full) {
+                return indexIdToMeta.get(indexId).getSchema();
+            } else {
+                return indexIdToMeta.get(indexId).getSchema().stream().filter(Column::isVisible)
                     .collect(Collectors.toList());
+            }
+        } finally {
+            readUnlock();
         }
     }
 
@@ -1108,27 +1200,42 @@
 
     // schemaHash
     public Map<Long, Integer> getIndexIdToSchemaHash() {
-        Map<Long, Integer> result = Maps.newHashMap();
-        for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
-            result.put(entry.getKey(), entry.getValue().getSchemaHash());
+        readLock();
+        try {
+            Map<Long, Integer> result = Maps.newHashMap();
+            for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
+                result.put(entry.getKey(), entry.getValue().getSchemaHash());
+            }
+            return result;
+        } finally {
+            readUnlock();
         }
-        return result;
     }
 
     public int getSchemaHashByIndexId(Long indexId) {
-        MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
-        if (indexMeta == null) {
-            return -1;
+        readLock();
+        try {
+            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+            if (indexMeta == null) {
+                return -1;
+            }
+            return indexMeta.getSchemaHash();
+        } finally {
+            readUnlock();
         }
-        return indexMeta.getSchemaHash();
     }
 
     public TStorageType getStorageTypeByIndexId(Long indexId) {
-        MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
-        if (indexMeta == null) {
-            return TStorageType.COLUMN;
+        readLock();
+        try {
+            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+            if (indexMeta == null) {
+                return TStorageType.COLUMN;
+            }
+            return indexMeta.getStorageType();
+        } finally {
+            readUnlock();
         }
-        return indexMeta.getStorageType();
     }
 
     public KeysType getKeysType() {
@@ -1136,9 +1243,14 @@
     }
 
     public KeysType getKeysTypeByIndexId(long indexId) {
-        MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
-        Preconditions.checkNotNull(indexMeta, "index id:" + indexId + " meta is null");
-        return indexMeta.getKeysType();
+        readLock();
+        try {
+            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+            Preconditions.checkNotNull(indexMeta, "index id:" + indexId + " meta is null");
+            return indexMeta.getKeysType();
+        } finally {
+            readUnlock();
+        }
     }
 
     public PartitionInfo getPartitionInfo() {
@@ -1193,6 +1305,7 @@
         return defaultDistributionInfo instanceof RandomDistributionInfo;
     }
 
+    // outer locked
     public void renamePartition(String partitionName, String newPartitionName) {
         if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
             // bug fix
@@ -1211,12 +1324,15 @@
     }
 
     public void addPartition(Partition partition) {
+        writeLock();
         idToPartition.put(partition.getId(), partition);
         nameToPartition.put(partition.getName(), partition);
+        writeUnlock();
     }
 
     // This is a private method.
     // Call public "dropPartitionAndReserveTablet" and "dropPartition"
+    // outer locked
     private Partition dropPartition(long dbId, String partitionName, boolean isForceDrop, boolean reserveTablets) {
         // 1. If "isForceDrop" is false, the partition will be added to the Catalog Recyle bin, and all tablets of this
         //    partition will not be deleted.
@@ -1235,14 +1351,17 @@
         return partition;
     }
 
+    // outer locked
     public Partition dropPartitionAndReserveTablet(String partitionName) {
         return dropPartition(-1, partitionName, true, true);
     }
 
+    // outer locked
     public Partition dropPartition(long dbId, String partitionName, boolean isForceDrop) {
         return dropPartition(dbId, partitionName, isForceDrop, !isForceDrop);
     }
 
+    // outer locked
     private void dropPartitionCommon(long dbId, boolean isForceDrop,
                                         RecyclePartitionParam recyclePartitionParam,
                                         Partition partition,
@@ -1305,6 +1424,7 @@
         partitionInfo.dropPartition(partition.getId());
     }
 
+    // outer locked
     public Partition dropPartitionForTruncate(long dbId, boolean isForceDrop,
                                             RecyclePartitionParam recyclePartitionParam) {
         // 1. If "isForceDrop" is false, the partition will be added to the Catalog Recyle bin, and all tablets of this
@@ -1391,42 +1511,62 @@
 
     // get partition by name
     public Partition getPartition(String partitionName, boolean isTempPartition) {
-        if (isTempPartition) {
-            return tempPartitions.getPartition(partitionName);
-        } else {
-            return nameToPartition.get(partitionName);
+        readLock();
+        try {
+            if (isTempPartition) {
+                return tempPartitions.getPartition(partitionName);
+            } else {
+                return nameToPartition.get(partitionName);
+            }
+        } finally {
+            readUnlock();
         }
     }
 
     // Priority is given to querying from the partition. If not found, query from the tempPartition
     public Partition getPartition(long partitionId) {
-        Partition partition = idToPartition.get(partitionId);
-        if (partition == null) {
-            partition = tempPartitions.getPartition(partitionId);
+        readLock();
+        try {
+            Partition partition = idToPartition.get(partitionId);
+            if (partition == null) {
+                partition = tempPartitions.getPartition(partitionId);
+            }
+            return partition;
+        } finally {
+            readUnlock();
         }
-        return partition;
     }
 
     public PartitionItem getPartitionItemOrAnalysisException(String partitionName) throws AnalysisException {
-        Partition partition = nameToPartition.get(partitionName);
-        if (partition == null) {
-            partition = tempPartitions.getPartition(partitionName);
+        readLock();
+        try {
+            Partition partition = nameToPartition.get(partitionName);
+            if (partition == null) {
+                partition = tempPartitions.getPartition(partitionName);
+            }
+            if (partition == null) {
+                throw new AnalysisException("partition not found: " + partitionName);
+            }
+            return partitionInfo.getItem(partition.getId());
+        } finally {
+            readUnlock();
         }
-        if (partition == null) {
-            throw new AnalysisException("partition not found: " + partitionName);
-        }
-        return partitionInfo.getItem(partition.getId());
     }
 
     public Partition getPartitionOrAnalysisException(long partitionId) throws AnalysisException {
-        Partition partition = idToPartition.get(partitionId);
-        if (partition == null) {
-            partition = tempPartitions.getPartition(partitionId);
+        readLock();
+        try {
+            Partition partition = idToPartition.get(partitionId);
+            if (partition == null) {
+                partition = tempPartitions.getPartition(partitionId);
+            }
+            if (partition == null) {
+                throw new AnalysisException("partition not found: " + partitionId);
+            }
+            return partition;
+        } finally {
+            readUnlock();
         }
-        if (partition == null) {
-            throw new AnalysisException("partition not found: " + partitionId);
-        }
-        return partition;
     }
 
     public void getVersionInBatchForCloudMode(Collection<Long> partitionIds) throws RpcException {
@@ -1467,29 +1607,51 @@
     }
 
     public int getPartitionNum() {
-        return idToPartition.size();
+        readLock();
+        try {
+            return idToPartition.size();
+        } finally {
+            readUnlock();
+        }
     }
 
     // get all partitions except temp partitions
     public Collection<Partition> getPartitions() {
-        return idToPartition.values();
+        readLock();
+        try {
+            return idToPartition.values();
+        } finally {
+            readUnlock();
+        }
     }
 
     // get only temp partitions
     public List<Partition> getAllTempPartitions() {
-        return tempPartitions.getAllPartitions();
+        readLock();
+        try {
+            return tempPartitions.getAllPartitions();
+        } finally {
+            readUnlock();
+        }
     }
 
     // get all partitions including temp partitions
     public List<Partition> getAllPartitions() {
+        readLock();
         List<Partition> partitions = Lists.newArrayList(idToPartition.values());
         partitions.addAll(tempPartitions.getAllPartitions());
+        readUnlock();
         return partitions;
     }
 
     // get all partitions' name except the temp partitions
     public Set<String> getPartitionNames() {
-        return Sets.newHashSet(nameToPartition.keySet());
+        readLock();
+        try {
+            return Sets.newHashSet(nameToPartition.keySet());
+        } finally {
+            readUnlock();
+        }
     }
 
     // for those elements equal in partiton ids, get their names. if tables partition changed(drop or something) make
@@ -1636,10 +1798,6 @@
         return getSequenceCol() != null;
     }
 
-    public boolean hasHiddenColumn() {
-        return getBaseSchema().stream().anyMatch(column -> !column.isVisible());
-    }
-
     public Type getSequenceType() {
         if (getSequenceCol() == null) {
             return null;
@@ -1710,6 +1868,7 @@
         Set<Pair<String, String>> ret = Sets.newHashSet();
         // Check the schema of all indexes for each given column name,
         // If the column name exists in the index, add the <IndexName, ColumnName> pair to return list.
+        readLock();
         for (String column : columns) {
             for (MaterializedIndexMeta meta : indexIdToMeta.values()) {
                 Column col = meta.getColumnByName(column);
@@ -1719,6 +1878,7 @@
                 ret.add(Pair.of(getIndexNameById(meta.getIndexId()), column));
             }
         }
+        readUnlock();
         return ret;
     }
 
@@ -1811,69 +1971,74 @@
     // bloom filter, partition type and columns, distribution type and columns, buckets number,
     // indexes and columns.
     public String getSignature(int signatureVersion, List<String> partNames) {
-        StringBuilder sb = new StringBuilder(signatureVersion);
-        sb.append(name);
-        sb.append(type);
-        Set<String> indexNames = Sets.newTreeSet(indexNameToId.keySet());
-        for (String indexName : indexNames) {
-            long indexId = indexNameToId.get(indexName);
-            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
-            sb.append(indexName);
-            sb.append(Util.getSchemaSignatureString(indexMeta.getSchema()));
-            sb.append(indexMeta.getShortKeyColumnCount());
-            sb.append(indexMeta.getStorageType());
-        }
-
-        // bloom filter
-        if (bfColumns != null && !bfColumns.isEmpty()) {
-            for (String bfCol : bfColumns) {
-                sb.append(bfCol);
+        readLock();
+        try {
+            StringBuilder sb = new StringBuilder(signatureVersion);
+            sb.append(name);
+            sb.append(type);
+            Set<String> indexNames = Sets.newTreeSet(indexNameToId.keySet());
+            for (String indexName : indexNames) {
+                long indexId = indexNameToId.get(indexName);
+                MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+                sb.append(indexName);
+                sb.append(Util.getSchemaSignatureString(indexMeta.getSchema()));
+                sb.append(indexMeta.getShortKeyColumnCount());
+                sb.append(indexMeta.getStorageType());
             }
-            sb.append(bfFpp);
-        }
 
-        // partition type
-        sb.append(partitionInfo.getType());
-        if (partitionInfo.getType() == PartitionType.RANGE) {
-            RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
-            List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns();
-            sb.append(Util.getSchemaSignatureString(partitionColumns));
-        }
-
-        // partition and distribution
-        Collections.sort(partNames, String.CASE_INSENSITIVE_ORDER);
-        for (String partName : partNames) {
-            Partition partition = getPartition(partName);
-            Preconditions.checkNotNull(partition, partName);
-            DistributionInfo distributionInfo = partition.getDistributionInfo();
-            sb.append(partName);
-            sb.append(distributionInfo.getType());
-            if (distributionInfo.getType() == DistributionInfoType.HASH) {
-                HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
-                sb.append(Util.getSchemaSignatureString(hashDistributionInfo.getDistributionColumns()));
-                sb.append(hashDistributionInfo.getBucketNum());
+            // bloom filter
+            if (bfColumns != null && !bfColumns.isEmpty()) {
+                for (String bfCol : bfColumns) {
+                    sb.append(bfCol);
+                }
+                sb.append(bfFpp);
             }
-        }
 
-        // indexes
-        if (this.indexes != null) {
-            Map<String, Index> indexes = Maps.newTreeMap();
-            for (Index idx : this.indexes.getIndexes()) {
-                indexes.put(idx.getIndexName(), idx);
+            // partition type
+            sb.append(partitionInfo.getType());
+            if (partitionInfo.getType() == PartitionType.RANGE) {
+                RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
+                List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns();
+                sb.append(Util.getSchemaSignatureString(partitionColumns));
             }
-            for (Map.Entry<String, Index> entry : indexes.entrySet()) {
-                Index idx = entry.getValue();
-                sb.append(entry.getKey());
-                sb.append(idx.getIndexType());
-                sb.append(Joiner.on(",").join(idx.getColumns()));
-            }
-        }
 
-        String signature = sb.toString();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("get signature of table {}. signature string: {}", name, sb.toString());
+            // partition and distribution
+            Collections.sort(partNames, String.CASE_INSENSITIVE_ORDER);
+            for (String partName : partNames) {
+                Partition partition = getPartition(partName);
+                Preconditions.checkNotNull(partition, partName);
+                DistributionInfo distributionInfo = partition.getDistributionInfo();
+                sb.append(partName);
+                sb.append(distributionInfo.getType());
+                if (distributionInfo.getType() == DistributionInfoType.HASH) {
+                    HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
+                    sb.append(Util.getSchemaSignatureString(hashDistributionInfo.getDistributionColumns()));
+                    sb.append(hashDistributionInfo.getBucketNum());
+                }
+            }
+
+            // indexes
+            if (this.indexes != null) {
+                Map<String, Index> indexes = Maps.newTreeMap();
+                for (Index idx : this.indexes.getIndexes()) {
+                    indexes.put(idx.getIndexName(), idx);
+                }
+                for (Map.Entry<String, Index> entry : indexes.entrySet()) {
+                    Index idx = entry.getValue();
+                    sb.append(entry.getKey());
+                    sb.append(idx.getIndexType());
+                    sb.append(Joiner.on(",").join(idx.getColumns()));
+                }
+            }
+
+            String signature = sb.toString();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("get signature of table {}. signature string: {}", name, sb.toString());
+            }
+            return signature;
+        } finally {
+            readUnlock();
         }
-        return signature;
     }
 
     // get intersect partition names with the given table "anotherTbl". not including temp partitions
@@ -1972,6 +2137,7 @@
         rebuildFullSchema();
     }
 
+    // outer locked
     public OlapTable selectiveCopy(Collection<String> reservedPartitions, IndexExtState extState, boolean isForBackup) {
         OlapTable copied = DeepCopy.copy(this, OlapTable.class, FeConstants.meta_version);
         if (copied == null) {
@@ -2064,6 +2230,7 @@
      *
      * return the old partition.
      */
+    // outer locked
     public Partition replacePartition(Partition newPartition,
                                         RecyclePartitionParam recyclePartitionParam) {
         Partition oldPartition = nameToPartition.remove(newPartition.getName());
@@ -2097,6 +2264,7 @@
         return oldPartition;
     }
 
+    // outer locked
     public void checkNormalStateForAlter() throws DdlException {
         if (state != OlapTableState.NORMAL) {
             throw new DdlException("Table[" + name + "]'s state(" + state.toString()
@@ -2108,6 +2276,7 @@
         }
     }
 
+    // outer lock
     public boolean isStable(SystemInfoService infoService, TabletScheduler tabletScheduler) {
         List<Long> aliveBeIds = infoService.getAllBackendIds(true);
         for (Partition partition : idToPartition.values()) {
@@ -2136,48 +2305,53 @@
 
     // arbitrarily choose a partition, and get the buckets backends sequence from base index.
     public Map<Tag, List<List<Long>>> getArbitraryTabletBucketsSeq() throws DdlException {
-        SystemInfoService infoService = Env.getCurrentSystemInfo();
-        Map<Tag, List<List<Long>>> backendsPerBucketSeq = Maps.newHashMap();
-        for (Partition partition : idToPartition.values()) {
-            ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
-            short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
-            MaterializedIndex baseIdx = partition.getBaseIndex();
-            for (Long tabletId : baseIdx.getTabletIdsInOrder()) {
-                Tablet tablet = baseIdx.getTablet(tabletId);
-                List<Long> replicaBackendIds = tablet.getNormalReplicaBackendIds();
-                if (replicaBackendIds.size() != totalReplicaNum) {
-                    // this should not happen, but in case, throw an exception to terminate this process
-                    throw new DdlException("Normal replica number of tablet " + tabletId + " is: "
+        readLock();
+        try {
+            SystemInfoService infoService = Env.getCurrentSystemInfo();
+            Map<Tag, List<List<Long>>> backendsPerBucketSeq = Maps.newHashMap();
+            for (Partition partition : idToPartition.values()) {
+                ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
+                short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+                MaterializedIndex baseIdx = partition.getBaseIndex();
+                for (Long tabletId : baseIdx.getTabletIdsInOrder()) {
+                    Tablet tablet = baseIdx.getTablet(tabletId);
+                    List<Long> replicaBackendIds = tablet.getNormalReplicaBackendIds();
+                    if (replicaBackendIds.size() != totalReplicaNum) {
+                        // this should not happen, but in case, throw an exception to terminate this process
+                        throw new DdlException("Normal replica number of tablet " + tabletId + " is: "
                             + replicaBackendIds.size() + ", but expected: " + totalReplicaNum);
-                }
-
-                // check tag
-                Map<Tag, Short> currentReplicaAlloc = Maps.newHashMap();
-                Map<Tag, List<Long>> tag2beIds = Maps.newHashMap();
-                for (long beId : replicaBackendIds) {
-                    Backend be = infoService.getBackend(beId);
-                    if (be == null || !be.isMixNode()) {
-                        continue;
                     }
-                    short num = currentReplicaAlloc.getOrDefault(be.getLocationTag(), (short) 0);
-                    currentReplicaAlloc.put(be.getLocationTag(), (short) (num + 1));
-                    List<Long> beIds = tag2beIds.getOrDefault(be.getLocationTag(), Lists.newArrayList());
-                    beIds.add(beId);
-                    tag2beIds.put(be.getLocationTag(), beIds);
-                }
-                if (!currentReplicaAlloc.equals(replicaAlloc.getAllocMap())) {
-                    throw new DdlException("The relica allocation is " + currentReplicaAlloc.toString()
-                            + ", but expected: " + replicaAlloc.toCreateStmt());
-                }
 
-                for (Map.Entry<Tag, List<Long>> entry : tag2beIds.entrySet()) {
-                    backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
-                    backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
+                    // check tag
+                    Map<Tag, Short> currentReplicaAlloc = Maps.newHashMap();
+                    Map<Tag, List<Long>> tag2beIds = Maps.newHashMap();
+                    for (long beId : replicaBackendIds) {
+                        Backend be = infoService.getBackend(beId);
+                        if (be == null || !be.isMixNode()) {
+                            continue;
+                        }
+                        short num = currentReplicaAlloc.getOrDefault(be.getLocationTag(), (short) 0);
+                        currentReplicaAlloc.put(be.getLocationTag(), (short) (num + 1));
+                        List<Long> beIds = tag2beIds.getOrDefault(be.getLocationTag(), Lists.newArrayList());
+                        beIds.add(beId);
+                        tag2beIds.put(be.getLocationTag(), beIds);
+                    }
+                    if (!currentReplicaAlloc.equals(replicaAlloc.getAllocMap())) {
+                        throw new DdlException("The relica allocation is " + currentReplicaAlloc.toString()
+                            + ", but expected: " + replicaAlloc.toCreateStmt());
+                    }
+
+                    for (Map.Entry<Tag, List<Long>> entry : tag2beIds.entrySet()) {
+                        backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
+                        backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
+                    }
                 }
+                break;
             }
-            break;
+            return backendsPerBucketSeq;
+        } finally {
+            readUnlock();
         }
-        return backendsPerBucketSeq;
     }
 
     /**
@@ -2186,50 +2360,68 @@
      * @return proximate row count
      */
     public long proximateRowCount() {
-        long totalCount = 0;
-        for (Partition partition : getPartitions()) {
-            long version = partition.getVisibleVersion();
-            for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                for (Tablet tablet : index.getTablets()) {
-                    long tabletRowCount = 0L;
-                    for (Replica replica : tablet.getReplicas()) {
-                        if (replica.checkVersionCatchUp(version, false)
-                                && replica.getRowCount() > tabletRowCount) {
-                            tabletRowCount = replica.getRowCount();
+        readLock();
+        try {
+            long totalCount = 0;
+            for (Partition partition : getPartitions()) {
+                long version = partition.getVisibleVersion();
+                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                    for (Tablet tablet : index.getTablets()) {
+                        long tabletRowCount = 0L;
+                        for (Replica replica : tablet.getReplicas()) {
+                            if (replica.checkVersionCatchUp(version, false)
+                                    && replica.getRowCount() > tabletRowCount) {
+                                tabletRowCount = replica.getRowCount();
+                            }
                         }
+                        totalCount += tabletRowCount;
                     }
-                    totalCount += tabletRowCount;
                 }
             }
+            return totalCount;
+        } finally {
+            readUnlock();
         }
-        return totalCount;
     }
 
     @Override
     public List<Column> getBaseSchema() {
-        return getSchemaByIndexId(baseIndexId);
+        readLock();
+        try {
+            return getSchemaByIndexId(baseIndexId);
+        } finally {
+            readUnlock();
+        }
     }
 
     @Override
     public List<Column> getBaseSchema(boolean full) {
-        return getSchemaByIndexId(baseIndexId, full);
+        readLock();
+        try {
+            return getSchemaByIndexId(baseIndexId, full);
+        } finally {
+            readUnlock();
+        }
     }
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        OlapTable other = (OlapTable) o;
+        readLock();
+        try {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            OlapTable other = (OlapTable) o;
 
-        if (!Objects.equals(defaultDistributionInfo, other.defaultDistributionInfo)) {
-            return false;
-        }
+            if (!Objects.equals(defaultDistributionInfo, other.defaultDistributionInfo)) {
+                return false;
+            }
 
-        return Double.compare(other.bfFpp, bfFpp) == 0 && hasSequenceCol == other.hasSequenceCol
+
+            return Double.compare(other.bfFpp, bfFpp) == 0 && hasSequenceCol == other.hasSequenceCol
                 && baseIndexId == other.baseIndexId && state == other.state && Objects.equals(indexIdToMeta,
                 other.indexIdToMeta) && Objects.equals(indexNameToId, other.indexNameToId) && keysType == other.keysType
                 && Objects.equals(partitionInfo, other.partitionInfo) && Objects.equals(
@@ -2239,6 +2431,9 @@
                 other.colocateGroup) && Objects.equals(sequenceType, other.sequenceType)
                 && Objects.equals(indexes, other.indexes) && Objects.equals(tableProperty,
                 other.tableProperty);
+        } finally {
+            readUnlock();
+        }
     }
 
     @Override
@@ -2255,15 +2450,6 @@
         return null;
     }
 
-    public Column getBaseColumn(int colUniqueId) {
-        for (Column column : getBaseSchema()) {
-            if (column.getUniqueId() == colUniqueId) {
-                return column;
-            }
-        }
-        return null;
-    }
-
     public int getKeysNum() {
         int keysNum = 0;
         for (Column column : getBaseSchema()) {
@@ -2371,7 +2557,12 @@
     }
 
     public boolean containsPartition(String partitionName) {
-        return nameToPartition.containsKey(partitionName);
+        readLock();
+        try {
+            return nameToPartition.containsKey(partitionName);
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setInAtomicRestore() {
@@ -2515,8 +2706,13 @@
     }
 
     public int getBaseSchemaVersion() {
-        MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId);
-        return baseIndexMeta.getSchemaVersion();
+        readLock();
+        try {
+            MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId);
+            return baseIndexMeta.getSchemaVersion();
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setEnableSingleReplicaCompaction(boolean enableSingleReplicaCompaction) {
@@ -2648,8 +2844,13 @@
     }
 
     public int getIndexSchemaVersion(long indexId) {
-        MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
-        return indexMeta.getSchemaVersion();
+        readLock();
+        try {
+            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+            return indexMeta.getSchemaVersion();
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setDataSortInfo(DataSortInfo dataSortInfo) {
@@ -2661,25 +2862,36 @@
     // return true if partition with given name already exist, both in partitions and temp partitions.
     // return false otherwise
     public boolean checkPartitionNameExist(String partitionName) {
-        if (nameToPartition.containsKey(partitionName)) {
-            return true;
+        readLock();
+        try {
+            if (nameToPartition.containsKey(partitionName)) {
+                return true;
+            }
+            return tempPartitions.hasPartition(partitionName);
+        } finally {
+            readUnlock();
         }
-        return tempPartitions.hasPartition(partitionName);
     }
 
     // if includeTempPartition is true, check if temp partition with given name exist,
     // if includeTempPartition is false, check if normal partition with given name exist.
     // return true if exist, otherwise, return false;
     public boolean checkPartitionNameExist(String partitionName, boolean isTempPartition) {
-        if (isTempPartition) {
-            return tempPartitions.hasPartition(partitionName);
-        } else {
-            return nameToPartition.containsKey(partitionName);
+        readLock();
+        try {
+            if (isTempPartition) {
+                return tempPartitions.hasPartition(partitionName);
+            } else {
+                return nameToPartition.containsKey(partitionName);
+            }
+        } finally {
+            readUnlock();
         }
     }
 
     // drop temp partition. if needDropTablet is true, tablets of this temp partition
     // will be dropped from tablet inverted index.
+    // outer locked
     public Partition dropTempPartition(String partitionName, boolean needDropTablet) {
         Partition partition = getPartition(partitionName, true);
         if (partition != null) {
@@ -2705,6 +2917,7 @@
      *         names are still p1 and p2.
      *
      */
+    // outer locked
     public List<Long> replaceTempPartitions(long dbId, List<String> partitionNames, List<String> tempPartitionNames,
             boolean strictRange, boolean useTempPartitionName, boolean isForceDropOld) throws DdlException {
         List<Long> replacedPartitionIds = Lists.newArrayList();
@@ -2738,6 +2951,7 @@
         return replacedPartitionIds;
     }
 
+    // outer locked
     private void checkPartition(List<String> partitionNames, List<String> tempPartitionNames,
             boolean strictRange) throws DdlException {
         if (strictRange) {
@@ -2777,18 +2991,18 @@
     }
 
     public void addTempPartition(Partition partition) {
+        writeLock();
         tempPartitions.addPartition(partition);
-    }
-
-    public void dropAllTempPartitions() {
-        for (Partition partition : tempPartitions.getAllPartitions()) {
-            partitionInfo.dropPartition(partition.getId());
-        }
-        tempPartitions.dropAll();
+        writeUnlock();
     }
 
     public boolean existTempPartitions() {
-        return !tempPartitions.isEmpty();
+        readLock();
+        try {
+            return !tempPartitions.isEmpty();
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setCompressionType(TCompressionType compressionType) {
@@ -2980,12 +3194,17 @@
 
     // for light schema change
     public void initSchemaColumnUniqueId() {
-        if (!getEnableLightSchemaChange()) {
-            return;
-        }
+        writeLock();
+        try {
+            if (!getEnableLightSchemaChange()) {
+                return;
+            }
 
-        for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
-            indexMeta.initSchemaColumnUniqueId();
+            for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
+                indexMeta.initSchemaColumnUniqueId();
+            }
+        } finally {
+            writeUnlock();
         }
     }
 
@@ -3101,14 +3320,19 @@
 
     @Override
     public void analyze(String dbName) {
-        for (MaterializedIndexMeta meta : indexIdToMeta.values()) {
-            try {
-                ConnectContext connectContext = new ConnectContext();
-                connectContext.setDatabase(dbName);
-                meta.parseStmt();
-            } catch (IOException e) {
-                LOG.info(e);
+        readLock();
+        try {
+            for (MaterializedIndexMeta meta : indexIdToMeta.values()) {
+                try {
+                    ConnectContext connectContext = new ConnectContext();
+                    connectContext.setDatabase(dbName);
+                    meta.parseStmt();
+                } catch (IOException e) {
+                    LOG.info(e);
+                }
             }
+        } finally {
+            readUnlock();
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index 6b00bd3..55c4848 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -17,15 +17,12 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.analysis.DateLiteral;
 import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.MaxLiteral;
-import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.PartitionDesc;
-import org.apache.doris.analysis.PartitionValue;
 import org.apache.doris.analysis.SinglePartitionDesc;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TTabletType;
 
@@ -43,14 +40,14 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /*
  * Repository of a partition's related infos
  */
 public class PartitionInfo {
     private static final Logger LOG = LogManager.getLogger(PartitionInfo.class);
-
+    protected ReentrantReadWriteLock rwLock;
     @SerializedName("Type")
     protected PartitionType type;
     // partition columns for list and range partitions
@@ -98,6 +95,7 @@
         this.idToTabletType = new HashMap<>();
         this.idToStoragePolicy = new HashMap<>();
         this.partitionExprs = new ArrayList<>();
+        this.rwLock = new ReentrantReadWriteLock();
     }
 
     public PartitionInfo(PartitionType type) {
@@ -108,14 +106,34 @@
         this.idToTabletType = new HashMap<>();
         this.idToStoragePolicy = new HashMap<>();
         this.partitionExprs = new ArrayList<>();
+        this.rwLock = new ReentrantReadWriteLock();
     }
 
     public PartitionInfo(PartitionType type, List<Column> partitionColumns) {
         this(type);
         this.partitionColumns = partitionColumns;
         this.isMultiColumnPartition = partitionColumns.size() > 1;
+        this.rwLock = new ReentrantReadWriteLock();
     }
 
+
+    private void readLock() {
+        rwLock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        rwLock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        rwLock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        rwLock.writeLock().unlock();
+    }
+
+
     public PartitionType getType() {
         return type;
     }
@@ -127,6 +145,7 @@
     public String getDisplayPartitionColumns() {
         StringBuilder sb = new StringBuilder();
         int index = 0;
+        readLock();
         for (Column c : partitionColumns) {
             if (index  != 0) {
                 sb.append(", ");
@@ -134,15 +153,23 @@
             sb.append(c.getDisplayName());
             index++;
         }
+        readUnlock();
         return sb.toString();
     }
 
     public Map<Long, PartitionItem> getIdToItem(boolean isTemp) {
-        if (isTemp) {
-            return idToTempItem;
-        } else {
-            return idToItem;
+        HashMap all = new HashMap<>();
+        readLock();
+        try {
+            if (isTemp) {
+                all.putAll(idToTempItem);
+            } else {
+                all.putAll(idToItem);
+            }
+        } finally {
+            readUnlock();
         }
+        return all;
     }
 
     /**
@@ -150,15 +177,26 @@
      */
     public Map<Long, PartitionItem> getAllPartitions() {
         HashMap all = new HashMap<>();
-        all.putAll(idToTempItem);
-        all.putAll(idToItem);
+        readLock();
+        try {
+            all.putAll(idToTempItem);
+            all.putAll(idToItem);
+        } finally {
+            readUnlock();
+        }
         return all;
     }
 
     public PartitionItem getItem(long partitionId) {
-        PartitionItem item = idToItem.get(partitionId);
-        if (item == null) {
-            item = idToTempItem.get(partitionId);
+        PartitionItem item = null;
+        readLock();
+        try {
+            item = idToItem.get(partitionId);
+            if (item == null) {
+                item = idToTempItem.get(partitionId);
+            }
+        } finally {
+            readUnlock();
         }
         return item;
     }
@@ -173,22 +211,20 @@
         return partitionRange;
     }
 
-    public PartitionItem getItemOrAnalysisException(long partitionId) throws AnalysisException {
-        PartitionItem item = idToItem.get(partitionId);
-        if (item == null) {
-            item = idToTempItem.get(partitionId);
-        }
-        if (item == null) {
-            throw new AnalysisException("PartitionItem not found: " + partitionId);
-        }
-        return item;
-    }
-
     public void setItem(long partitionId, boolean isTemp, PartitionItem item) {
-        setItemInternal(partitionId, isTemp, item);
+        writeLock();
+        try {
+            if (isTemp) {
+                idToTempItem.put(partitionId, item);
+            } else {
+                idToItem.put(partitionId, item);
+            }
+        } finally {
+            writeUnlock();
+        }
     }
 
-    private void setItemInternal(long partitionId, boolean isTemp, PartitionItem item) {
+    private void setItemInternalWithOutLock(long partitionId, boolean isTemp, PartitionItem item) {
         if (isTemp) {
             idToTempItem.put(partitionId, item);
         } else {
@@ -200,12 +236,16 @@
                                                       long partitionId, boolean isTemp) throws DdlException {
         Preconditions.checkArgument(desc.isAnalyzed());
         PartitionItem partitionItem = createAndCheckPartitionItem(desc, isTemp);
-        setItemInternal(partitionId, isTemp, partitionItem);
-
-        idToDataProperty.put(partitionId, desc.getPartitionDataProperty());
-        idToReplicaAllocation.put(partitionId, desc.getReplicaAlloc());
-        idToInMemory.put(partitionId, desc.isInMemory());
-        idToStoragePolicy.put(partitionId, desc.getStoragePolicy());
+        writeLock();
+        try {
+            setItemInternalWithOutLock(partitionId, isTemp, partitionItem);
+            idToDataProperty.put(partitionId, desc.getPartitionDataProperty());
+            idToReplicaAllocation.put(partitionId, desc.getReplicaAlloc());
+            idToInMemory.put(partitionId, desc.isInMemory());
+            idToStoragePolicy.put(partitionId, desc.getStoragePolicy());
+        } finally {
+            writeUnlock();
+        }
 
         return partitionItem;
     }
@@ -217,57 +257,75 @@
     public void unprotectHandleNewSinglePartitionDesc(long partitionId, boolean isTemp, PartitionItem partitionItem,
                                                       DataProperty dataProperty, ReplicaAllocation replicaAlloc,
                                                       boolean isInMemory, boolean isMutable) {
-        setItemInternal(partitionId, isTemp, partitionItem);
-        idToDataProperty.put(partitionId, dataProperty);
-        idToReplicaAllocation.put(partitionId, replicaAlloc);
-        idToInMemory.put(partitionId, isInMemory);
-        idToStoragePolicy.put(partitionId, "");
-        //TODO
-        //idToMutable.put(partitionId, isMutable);
+        writeLock();
+        try {
+            setItemInternalWithOutLock(partitionId, isTemp, partitionItem);
+            idToDataProperty.put(partitionId, dataProperty);
+            idToReplicaAllocation.put(partitionId, replicaAlloc);
+            idToInMemory.put(partitionId, isInMemory);
+            idToStoragePolicy.put(partitionId, "");
+        } finally {
+            writeUnlock();
+        }
     }
 
     public List<Map.Entry<Long, PartitionItem>> getPartitionItemEntryList(boolean isTemp, boolean isSorted) {
-        Map<Long, PartitionItem> tmpMap = idToItem;
-        if (isTemp) {
-            tmpMap = idToTempItem;
+        readLock();
+        try {
+            Map<Long, PartitionItem> tmpMap = idToItem;
+            if (isTemp) {
+                tmpMap = idToTempItem;
+            }
+            List<Map.Entry<Long, PartitionItem>> itemEntryList = Lists.newArrayList(tmpMap.entrySet());
+            if (isSorted) {
+                Collections.sort(itemEntryList, PartitionItem.ITEM_MAP_ENTRY_COMPARATOR);
+            }
+            return itemEntryList;
+        } finally {
+            readUnlock();
         }
-        List<Map.Entry<Long, PartitionItem>> itemEntryList = Lists.newArrayList(tmpMap.entrySet());
-        if (isSorted) {
-            Collections.sort(itemEntryList, PartitionItem.ITEM_MAP_ENTRY_COMPARATOR);
-        }
-        return itemEntryList;
     }
 
     // get sorted item list, exclude partitions which ids are in 'excludePartitionIds'
     public List<PartitionItem> getItemList(Set<Long> excludePartitionIds, boolean isTemp) {
-        Map<Long, PartitionItem> tempMap = idToItem;
-        if (isTemp) {
-            tempMap = idToTempItem;
-        }
-        List<PartitionItem> resultList = Lists.newArrayList();
-        for (Map.Entry<Long, PartitionItem> entry : tempMap.entrySet()) {
-            if (!excludePartitionIds.contains(entry.getKey())) {
-                resultList.add(entry.getValue());
+        readLock();
+        try {
+            Map<Long, PartitionItem> tempMap = idToItem;
+            if (isTemp) {
+                tempMap = idToTempItem;
             }
+            List<PartitionItem> resultList = Lists.newArrayList();
+            for (Map.Entry<Long, PartitionItem> entry : tempMap.entrySet()) {
+                if (!excludePartitionIds.contains(entry.getKey())) {
+                    resultList.add(entry.getValue());
+                }
+            }
+            return resultList;
+        } finally {
+            readUnlock();
         }
-        return resultList;
     }
 
     // return any item intersect with the newItem.
     // return null if no item intersect.
     public PartitionItem getAnyIntersectItem(PartitionItem newItem, boolean isTemp) {
-        Map<Long, PartitionItem> tmpMap = idToItem;
-        if (isTemp) {
-            tmpMap = idToTempItem;
-        }
-        PartitionItem retItem;
-        for (PartitionItem item : tmpMap.values()) {
-            retItem = item.getIntersect(newItem);
-            if (null != retItem) {
-                return retItem;
+        readLock();
+        try {
+            Map<Long, PartitionItem> tmpMap = idToItem;
+            if (isTemp) {
+                tmpMap = idToTempItem;
             }
+            PartitionItem retItem;
+            for (PartitionItem item : tmpMap.values()) {
+                retItem = item.getIntersect(newItem);
+                if (null != retItem) {
+                    return retItem;
+                }
+            }
+            return null;
+        } finally {
+            readUnlock();
         }
-        return null;
     }
 
     public boolean enableAutomaticPartition() {
@@ -287,94 +345,162 @@
     }
 
     public DataProperty getDataProperty(long partitionId) {
-        return idToDataProperty.get(partitionId);
+        readLock();
+        try {
+            return idToDataProperty.get(partitionId);
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setDataProperty(long partitionId, DataProperty newDataProperty) {
-        idToDataProperty.put(partitionId, newDataProperty);
+        writeLock();
+        try {
+            idToDataProperty.put(partitionId, newDataProperty);
+        } finally {
+            writeUnlock();
+        }
     }
 
     public void refreshTableStoragePolicy(String storagePolicy) {
-        idToStoragePolicy.replaceAll((k, v) -> storagePolicy);
-        idToDataProperty.entrySet().forEach(entry -> {
-            entry.getValue().setStoragePolicy(storagePolicy);
-        });
+        writeLock();
+        try {
+            idToStoragePolicy.replaceAll((k, v) -> storagePolicy);
+            idToDataProperty.entrySet().forEach(entry -> {
+                entry.getValue().setStoragePolicy(storagePolicy);
+            });
+        } finally {
+            writeUnlock();
+        }
     }
 
     public String getStoragePolicy(long partitionId) {
-        return idToStoragePolicy.getOrDefault(partitionId, "");
+        readLock();
+        try {
+            return idToStoragePolicy.getOrDefault(partitionId, "");
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setStoragePolicy(long partitionId, String storagePolicy) {
-        idToStoragePolicy.put(partitionId, storagePolicy);
+        writeLock();
+        try {
+            idToStoragePolicy.put(partitionId, storagePolicy);
+        } finally {
+            writeUnlock();
+        }
     }
 
-    public Map<Long, ReplicaAllocation> getPartitionReplicaAllocations() {
-        return idToReplicaAllocation;
+    public void modifyReplicaAlloc(ReplicaAllocation replicaAlloc) {
+        writeLock();
+        for (ReplicaAllocation alloc : idToReplicaAllocation.values()) {
+            Map<Tag, Short> allocMap = alloc.getAllocMap();
+            allocMap.clear();
+            allocMap.putAll(replicaAlloc.getAllocMap());
+        }
+        writeUnlock();
     }
 
     public ReplicaAllocation getReplicaAllocation(long partitionId) {
-        if (!idToReplicaAllocation.containsKey(partitionId)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("failed to get replica allocation for partition: {}", partitionId);
+        readLock();
+        try {
+            if (!idToReplicaAllocation.containsKey(partitionId)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("failed to get replica allocation for partition: {}", partitionId);
+                }
+                return ReplicaAllocation.DEFAULT_ALLOCATION;
             }
-            return ReplicaAllocation.DEFAULT_ALLOCATION;
+            return idToReplicaAllocation.get(partitionId);
+        } finally {
+            readUnlock();
         }
-        return idToReplicaAllocation.get(partitionId);
     }
 
     public void setReplicaAllocation(long partitionId, ReplicaAllocation replicaAlloc) {
+        writeLock();
         this.idToReplicaAllocation.put(partitionId, replicaAlloc);
+        writeUnlock();
     }
 
     public boolean getIsInMemory(long partitionId) {
-        return idToInMemory.get(partitionId);
+        readLock();
+        try {
+            return idToInMemory.get(partitionId);
+        } finally {
+            readUnlock();
+        }
     }
 
     public boolean getIsMutable(long partitionId) {
-        return idToDataProperty.get(partitionId).isMutable();
+        readLock();
+        try {
+            return idToDataProperty.get(partitionId).isMutable();
+        } finally {
+            readUnlock();
+        }
     }
 
     public void setIsMutable(long partitionId, boolean isMutable) {
+        writeLock();
         idToDataProperty.get(partitionId).setMutable(isMutable);
+        writeUnlock();
     }
 
     public void setIsInMemory(long partitionId, boolean isInMemory) {
+        writeLock();
         idToInMemory.put(partitionId, isInMemory);
+        writeUnlock();
     }
 
     public TTabletType getTabletType(long partitionId) {
-        if (!idToTabletType.containsKey(partitionId)) {
-            return TTabletType.TABLET_TYPE_DISK;
+        readLock();
+        try {
+            if (!idToTabletType.containsKey(partitionId)) {
+                return TTabletType.TABLET_TYPE_DISK;
+            }
+            return idToTabletType.get(partitionId);
+        } finally {
+            readUnlock();
         }
-        return idToTabletType.get(partitionId);
     }
 
     public void setTabletType(long partitionId, TTabletType tabletType) {
+        writeLock();
         idToTabletType.put(partitionId, tabletType);
+        writeUnlock();
     }
 
     public void dropPartition(long partitionId) {
+        writeLock();
         idToDataProperty.remove(partitionId);
         idToReplicaAllocation.remove(partitionId);
         idToInMemory.remove(partitionId);
         idToItem.remove(partitionId);
         idToTempItem.remove(partitionId);
+        writeUnlock();
     }
 
     public void addPartition(long partitionId, boolean isTemp, PartitionItem item, DataProperty dataProperty,
                              ReplicaAllocation replicaAlloc, boolean isInMemory, boolean isMutable) {
-        addPartition(partitionId, dataProperty, replicaAlloc, isInMemory, isMutable);
-        setItemInternal(partitionId, isTemp, item);
+        writeLock();
+        dataProperty.setMutable(isMutable);
+        idToDataProperty.put(partitionId, dataProperty);
+        idToReplicaAllocation.put(partitionId, replicaAlloc);
+        idToInMemory.put(partitionId, isInMemory);
+        setItemInternalWithOutLock(partitionId, isTemp, item);
+        writeUnlock();
     }
 
     public void addPartition(long partitionId, DataProperty dataProperty,
                              ReplicaAllocation replicaAlloc,
                              boolean isInMemory, boolean isMutable) {
+        writeLock();
         dataProperty.setMutable(isMutable);
         idToDataProperty.put(partitionId, dataProperty);
         idToReplicaAllocation.put(partitionId, replicaAlloc);
         idToInMemory.put(partitionId, isInMemory);
+        writeUnlock();
     }
 
     public boolean isMultiColumnPartition() {
@@ -389,30 +515,19 @@
         throw new RuntimeException("Should implement it in derived classes.");
     }
 
-    public static List<PartitionValue> toPartitionValue(PartitionKey partitionKey) {
-        return partitionKey.getKeys().stream().map(expr -> {
-            if (expr == MaxLiteral.MAX_VALUE) {
-                return PartitionValue.MAX_VALUE;
-            } else if (expr instanceof DateLiteral) {
-                return new PartitionValue(expr.getStringValue());
-            } else if (expr instanceof NullLiteral) {
-                return new PartitionValue("NULL", true);
-            } else {
-                return new PartitionValue(expr.getRealValue().toString());
-            }
-        }).collect(Collectors.toList());
-    }
-
     public void moveFromTempToFormal(long tempPartitionId) {
+        writeLock();
         PartitionItem item = idToTempItem.remove(tempPartitionId);
         if (item != null) {
             idToItem.put(tempPartitionId, item);
         }
+        writeUnlock();
     }
 
     public void resetPartitionIdForRestore(
             Map<Long, Long> partitionIdMap,
             ReplicaAllocation restoreReplicaAlloc, boolean isSinglePartitioned) {
+        writeLock();
         Map<Long, DataProperty> origIdToDataProperty = idToDataProperty;
         Map<Long, ReplicaAllocation> origIdToReplicaAllocation = idToReplicaAllocation;
         Map<Long, PartitionItem> origIdToItem = idToItem;
@@ -435,6 +550,7 @@
             idToInMemory.put(entry.getKey(), origIdToInMemory.get(entry.getValue()));
             idToStoragePolicy.put(entry.getKey(), origIdToStoragePolicy.getOrDefault(entry.getValue(), ""));
         }
+        writeUnlock();
     }
 
     @Override
@@ -442,6 +558,7 @@
         StringBuilder buff = new StringBuilder();
         buff.append("type: ").append(type.typeString).append("; ");
 
+        readLock();
         for (Map.Entry<Long, DataProperty> entry : idToDataProperty.entrySet()) {
             buff.append(entry.getKey()).append(" is HDD: ");
             if (entry.getValue().equals(new DataProperty(TStorageMedium.HDD))) {
@@ -455,6 +572,7 @@
             buff.append("in memory: ").append(idToInMemory.get(entry.getKey()));
             buff.append("is mutable: ").append(idToDataProperty.get(entry.getKey()).isMutable());
         }
+        readUnlock();
 
         return buff.toString();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index 091f9ed..7e476e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -592,4 +592,19 @@
     public List<String> getOriginHiveKeys() {
         return originHiveKeys;
     }
+
+
+    public static List<PartitionValue> toPartitionValue(PartitionKey partitionKey) {
+        return partitionKey.getKeys().stream().map(expr -> {
+            if (expr == MaxLiteral.MAX_VALUE) {
+                return PartitionValue.MAX_VALUE;
+            } else if (expr instanceof DateLiteral) {
+                return new PartitionValue(expr.getStringValue());
+            } else if (expr instanceof NullLiteral) {
+                return new PartitionValue("NULL", true);
+            } else {
+                return new PartitionValue(expr.getRealValue().toString());
+            }
+        }).collect(Collectors.toList());
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java
index a59c3ca..7ccc7b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java
@@ -289,8 +289,8 @@
 
             Range<PartitionKey> range = entry.getValue().getItems();
             PartitionKeyDesc partitionKeyDesc = PartitionKeyDesc.createFixed(
-                    PartitionInfo.toPartitionValue(range.lowerEndpoint()),
-                    PartitionInfo.toPartitionValue(range.upperEndpoint()));
+                    PartitionKey.toPartitionValue(range.lowerEndpoint()),
+                    PartitionKey.toPartitionValue(range.upperEndpoint()));
 
             Map<String, String> properties = Maps.newHashMap();
             Optional.ofNullable(this.idToStoragePolicy.get(entry.getKey())).ifPresent(p -> {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index cad6ca3..8f29da1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -66,8 +66,8 @@
     @Override
     public PartitionKeyDesc toPartitionKeyDesc() {
         return PartitionKeyDesc.createFixed(
-                PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()),
-                PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
+            PartitionKey.toPartitionValue(partitionKeyRange.lowerEndpoint()),
+            PartitionKey.toPartitionValue(partitionKeyRange.upperEndpoint()));
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIndexes.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIndexes.java
index 345362e..b47f0b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIndexes.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIndexes.java
@@ -44,11 +44,6 @@
         this.properties = Maps.newHashMap();
     }
 
-    public TableIndexes(List<Index> indexes, Map<String, String> properties) {
-        this.indexes = indexes;
-        this.properties = properties;
-    }
-
     public List<Index> getIndexes() {
         if (indexes == null) {
             indexes = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 575024e..3098347 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3823,8 +3823,10 @@
             throw new DdlException(e.getMessage());
         }
         int schemaHash = Util.generateSchemaHash();
+        olapTable.writeLock();
         olapTable.setIndexMeta(baseIndexId, tableName, baseSchema, schemaVersion, schemaHash, shortKeyColumnCount,
                 baseIndexStorageType, keysType, olapTable.getIndexes());
+        olapTable.writeUnlock();
 
         for (AlterClause alterClause : stmt.getRollupAlterClauseList()) {
             if (olapTable.isDuplicateWithoutKey()) {
@@ -3849,8 +3851,10 @@
                     true/*isKeysRequired*/);
             int rollupSchemaHash = Util.generateSchemaHash();
             long rollupIndexId = idGeneratorBuffer.getNextId();
+            olapTable.writeLock();
             olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
                     rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
+            olapTable.writeUnlock();
         }
 
         // analyse sequence map column
@@ -3902,7 +3906,9 @@
 
         olapTable.initSchemaColumnUniqueId();
         olapTable.initAutoIncrementGenerator(db.getId());
+        olapTable.writeLock();
         olapTable.rebuildFullSchema();
+        olapTable.writeUnlock();
 
         // analyze version info
         Long versionInfo = null;