blob: c91fa1de6f30f8f61d4648788240f51d777ce91b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.mpp.plan.analyze.cache.PartitionCache;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ClusterPartitionFetcher implements IPartitionFetcher {
private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final SeriesPartitionExecutor partitionExecutor;
private final PartitionCache partitionCache;
private final IClientManager<PartitionRegionId, ConfigNodeClient> configNodeClientManager =
new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
.createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
private static final class ClusterPartitionFetcherHolder {
private static final ClusterPartitionFetcher INSTANCE = new ClusterPartitionFetcher();
private ClusterPartitionFetcherHolder() {}
}
public static ClusterPartitionFetcher getInstance() {
return ClusterPartitionFetcherHolder.INSTANCE;
}
private ClusterPartitionFetcher() {
this.partitionExecutor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
this.partitionCache = new PartitionCache();
}
@Override
public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
patternTree.constructTree();
List<String> devicePaths = patternTree.getAllDevicePatterns();
Map<String, List<String>> storageGroupToDeviceMap =
partitionCache.getStorageGroupToDevice(devicePaths, true, false);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
if (null == schemaPartition) {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
} catch (TException | IOException e) {
logger.error("Get Schema Partition error", e);
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaPartition():" + e.getMessage());
}
}
@Override
public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
patternTree.constructTree();
List<String> devicePaths = patternTree.getAllDevicePatterns();
Map<String, List<String>> storageGroupToDeviceMap =
partitionCache.getStorageGroupToDevice(devicePaths, true, true);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
if (null == schemaPartition) {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateSchemaPartition():" + e.getMessage());
}
}
@Override
public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
PathPatternTree patternTree, Integer level) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
patternTree.constructTree();
TSchemaNodeManagementResp schemaNodeManagementResp =
client.getSchemaNodeManagementPartition(
constructSchemaNodeManagementPartitionReq(patternTree, level));
return parseSchemaNodeManagementPartitionResp(schemaNodeManagementResp);
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaNodeManagementPartition():" + e.getMessage());
}
}
/** get data partition when query */
@Override
public DataPartition getDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
DataPartition dataPartition = partitionCache.getDataPartition(sgNameToQueryParamsMap);
if (null == dataPartition) {
TDataPartitionTableResp dataPartitionTableResp =
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new StatementAnalyzeException(
"An error occurred when executing getDataPartition():"
+ dataPartitionTableResp.getStatus().getMessage());
}
}
return dataPartition;
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getDataPartition():" + e.getMessage());
}
}
/** get data partition when write */
@Override
public DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
splitDataPartitionQueryParam(dataPartitionQueryParams, false);
DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
if (null == dataPartition) {
TDataPartitionTableResp dataPartitionTableResp =
client.getDataPartitionTable(constructDataPartitionReq(splitDataPartitionQueryParams));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
dataPartitionTableResp.getStatus().getMessage(),
dataPartitionTableResp.getStatus().getCode()));
}
}
return dataPartition;
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getDataPartition():" + e.getMessage());
}
}
/** get data partition when query */
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
// Do not use data partition cache
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
DataPartition dataPartition = partitionCache.getDataPartition(sgNameToQueryParamsMap);
if (null == dataPartition) {
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateDataPartition():"
+ dataPartitionTableResp.getStatus().getMessage());
}
}
return dataPartition;
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
}
/** get data partition when write */
@Override
public DataPartition getOrCreateDataPartition(
List<DataPartitionQueryParam> dataPartitionQueryParams) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
splitDataPartitionQueryParam(dataPartitionQueryParams, true);
DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
if (null == dataPartition) {
TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(
constructDataPartitionReq(splitDataPartitionQueryParams));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
dataPartitionTableResp.getStatus().getMessage(),
dataPartitionTableResp.getStatus().getCode()));
}
}
return dataPartition;
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
}
@Override
public boolean updateRegionCache(TRegionRouteReq req) {
return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), req.getRegionRouteMap());
}
@Override
public void invalidAllCache() {
partitionCache.invalidAllCache();
}
/** split data partition query param by storage group */
private Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParam(
List<DataPartitionQueryParam> dataPartitionQueryParams, boolean isAutoCreate) {
List<String> devicePaths = new ArrayList<>();
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
devicePaths.add(dataPartitionQueryParam.getDevicePath());
}
Map<String, String> deviceToStorageGroupMap =
partitionCache.getDeviceToStorageGroup(devicePaths, true, isAutoCreate);
Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
String devicePath = dataPartitionQueryParam.getDevicePath();
if (deviceToStorageGroupMap.containsKey(devicePath)) {
String storageGroup = deviceToStorageGroupMap.get(devicePath);
if (!result.containsKey(storageGroup)) {
result.put(storageGroup, new ArrayList<>());
}
result.get(storageGroup).add(dataPartitionQueryParam);
}
}
return result;
}
private TSchemaPartitionReq constructSchemaPartitionReq(PathPatternTree patternTree) {
PublicBAOS baos = new PublicBAOS();
try {
patternTree.serialize(baos);
ByteBuffer serializedPatternTree = ByteBuffer.allocate(baos.size());
serializedPatternTree.put(baos.getBuf(), 0, baos.size());
serializedPatternTree.flip();
return new TSchemaPartitionReq(serializedPatternTree);
} catch (IOException e) {
throw new StatementAnalyzeException("An error occurred when serializing pattern tree");
}
}
private TSchemaNodeManagementReq constructSchemaNodeManagementPartitionReq(
PathPatternTree patternTree, Integer level) {
PublicBAOS baos = new PublicBAOS();
try {
patternTree.serialize(baos);
ByteBuffer serializedPatternTree = ByteBuffer.allocate(baos.size());
serializedPatternTree.put(baos.getBuf(), 0, baos.size());
serializedPatternTree.flip();
TSchemaNodeManagementReq schemaNodeManagementReq =
new TSchemaNodeManagementReq(serializedPatternTree);
if (null == level) {
schemaNodeManagementReq.setLevel(-1);
} else {
schemaNodeManagementReq.setLevel(level);
}
return schemaNodeManagementReq;
} catch (IOException e) {
throw new StatementAnalyzeException("An error occurred when serializing pattern tree");
}
}
private TDataPartitionReq constructDataPartitionReq(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
new HashMap<>();
for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
sgNameToQueryParamsMap.entrySet()) {
// for each sg
Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> deviceToTimePartitionMap =
new HashMap<>();
for (DataPartitionQueryParam queryParam : entry.getValue()) {
deviceToTimePartitionMap.put(
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
queryParam.getTimePartitionSlotList().stream()
.map(timePartitionSlot -> new TTimePartitionSlot(timePartitionSlot.getStartTime()))
.collect(Collectors.toList()));
}
partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
}
return new TDataPartitionReq(partitionSlotsMap);
}
private SchemaPartition parseSchemaPartitionTableResp(
TSchemaPartitionTableResp schemaPartitionTableResp) {
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> regionReplicaMap = new HashMap<>();
for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> entry1 :
schemaPartitionTableResp.getSchemaPartitionTable().entrySet()) {
Map<TSeriesPartitionSlot, TRegionReplicaSet> result1 =
regionReplicaMap.computeIfAbsent(entry1.getKey(), k -> new HashMap<>());
for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> entry2 :
entry1.getValue().entrySet()) {
TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
TConsensusGroupId consensusGroupId = entry2.getValue();
result1.put(seriesPartitionSlot, partitionCache.getRegionReplicaSet(consensusGroupId));
}
}
return new SchemaPartition(
regionReplicaMap,
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
}
private SchemaNodeManagementPartition parseSchemaNodeManagementPartitionResp(
TSchemaNodeManagementResp schemaNodeManagementResp) {
return new SchemaNodeManagementPartition(
schemaNodeManagementResp.getSchemaRegionMap(),
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum(),
schemaNodeManagementResp.getMatchedNode());
}
private DataPartition parseDataPartitionResp(TDataPartitionTableResp dataPartitionTableResp) {
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
regionReplicaSet = new HashMap<>();
for (Map.Entry<
String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
entry1 : dataPartitionTableResp.getDataPartitionTable().entrySet()) {
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> result1 =
regionReplicaSet.computeIfAbsent(entry1.getKey(), k -> new HashMap<>());
for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
entry2 : entry1.getValue().entrySet()) {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> result2 =
result1.computeIfAbsent(entry2.getKey(), k -> new HashMap<>());
for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry3 :
entry2.getValue().entrySet()) {
List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
for (TConsensusGroupId consensusGroupId : entry3.getValue()) {
regionReplicaSets.add(partitionCache.getRegionReplicaSet(consensusGroupId));
}
result2.put(entry3.getKey(), regionReplicaSets);
}
}
}
return new DataPartition(
regionReplicaSet,
config.getSeriesPartitionExecutorClass(),
config.getSeriesPartitionSlotNum());
}
}