blob: 373841dbd1c0f82d9f38b90c014bbca715c269e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.query.dataset.ClusterAlignByDeviceDataSet;
import org.apache.iotdb.cluster.query.filter.SlotSgFilter;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ClusterPlanExecutor extends PlanExecutor {
private static final Logger logger = LoggerFactory.getLogger(ClusterPlanExecutor.class);
private MetaGroupMember metaGroupMember;
public static final int THREAD_POOL_SIZE = 6;
public static final String LOG_FAIL_CONNECT = "Failed to connect to node: {}";
public ClusterPlanExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException {
super();
this.metaGroupMember = metaGroupMember;
this.queryRouter = new ClusterQueryRouter(metaGroupMember);
}
@Override
public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
throws IOException, StorageEngineException, QueryFilterOptimizationException,
QueryProcessException, MetadataException, InterruptedException {
if (queryPlan instanceof QueryPlan) {
logger.debug("Executing a query: {}", queryPlan);
return processDataQuery((QueryPlan) queryPlan, context);
} else if (queryPlan instanceof ShowPlan) {
try {
metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
return processShowQuery((ShowPlan) queryPlan, context);
} else if (queryPlan instanceof AuthorPlan) {
try {
metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
return processAuthorQuery((AuthorPlan) queryPlan);
} else {
throw new QueryProcessException(String.format("Unrecognized query plan %s", queryPlan));
}
}
@Override
@TestOnly
protected List<PartialPath> getPathsName(PartialPath path) throws MetadataException {
return ((CMManager) IoTDB.metaManager).getMatchedPaths(path);
}
@Override
protected int getPathsNum(PartialPath path) throws MetadataException {
return getNodesNumInGivenLevel(path, -1);
}
@Override
protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
// make sure this node knows all storage groups
try {
metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
// "root.*" will be translated into:
// "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ...
Map<String, String> sgPathMap = IoTDB.metaManager.determineStorageGroup(path);
if (sgPathMap.isEmpty()) {
throw new PathNotExistException(path.getFullPath());
}
logger.debug("The storage groups of path {} are {}", path, sgPathMap.keySet());
int ret;
try {
ret = getPathCount(sgPathMap, level);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
logger.debug("The number of paths satisfying {}@{} is {}", path, level, ret);
return ret;
}
/**
* Split the paths by the data group they belong to and query them from the groups separately.
*
* @param sgPathMap the key is the storage group name and the value is the path to be queried with
* storage group added
* @param level the max depth to match the pattern, -1 means matching the whole pattern
* @return the number of paths that match the pattern at given level
*/
private int getPathCount(Map<String, String> sgPathMap, int level)
throws MetadataException, CheckConsistencyException {
AtomicInteger result = new AtomicInteger();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
String storageGroupName = sgPathEntry.getKey();
PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
// find the data group that should hold the timeseries schemas of the storage group
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName, 0);
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
metaGroupMember
.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
.syncLeaderWithConsistencyCheck(false);
int localResult = getLocalPathCount(pathUnderSG, level);
logger.debug(
"{}: get path count of {} locally, result {}",
metaGroupMember.getName(),
partitionGroup,
localResult);
result.addAndGet(localResult);
} else {
// batch the queries of the same group to reduce communication
groupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
.add(pathUnderSG.getFullPath());
}
}
if (groupPathMap.isEmpty()) {
return result.get();
}
ExecutorService remoteQueryThreadPool = Executors.newFixedThreadPool(groupPathMap.size());
List<Future<Void>> remoteFutures = new ArrayList<>();
// query each data group separately
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
remoteFutures.add(
remoteQueryThreadPool.submit(
() -> {
try {
result.addAndGet(getRemotePathCount(partitionGroup, pathsToQuery, level));
} catch (MetadataException e) {
logger.warn(
"Cannot get remote path count of {} from {}",
pathsToQuery,
partitionGroup,
e);
}
return null;
}));
}
waitForThreadPool(remoteFutures, remoteQueryThreadPool, "getPathCount()");
return result.get();
}
private int getLocalPathCount(PartialPath path, int level) throws MetadataException {
int localResult;
if (level == -1) {
localResult = IoTDB.metaManager.getAllTimeseriesCount(path);
} else {
localResult = IoTDB.metaManager.getNodesCountInGivenLevel(path, level);
}
return localResult;
}
private int getRemotePathCount(
PartitionGroup partitionGroup, List<String> pathsToQuery, int level)
throws MetadataException {
// choose the node with lowest latency or highest throughput
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
Integer count;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
client.setTimeout(RaftServer.getReadOperationTimeoutMS());
count =
SyncClientAdaptor.getPathCount(
client, partitionGroup.getHeader(), pathsToQuery, level);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
logger.debug(
"{}: get path count of {} from {}, result {}",
metaGroupMember.getName(),
partitionGroup,
node,
count);
if (count != null) {
return count;
}
} catch (IOException | TException e) {
throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
return 0;
}
@Override
protected Set<PartialPath> getDevices(PartialPath path) throws MetadataException {
// make sure this node knows all storage groups
((CMManager) IoTDB.metaManager).syncMetaLeader();
return ((CMManager) IoTDB.metaManager).getMatchedDevices(path);
}
@Override
protected List<PartialPath> getNodesList(PartialPath schemaPattern, int level)
throws MetadataException {
ConcurrentSkipListSet<PartialPath> nodeSet = new ConcurrentSkipListSet<>();
ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : metaGroupMember.getPartitionTable().getGlobalGroups()) {
futureList.add(
pool.submit(
() -> {
List<PartialPath> paths;
paths = getNodesList(group, schemaPattern, level);
if (paths != null) {
nodeSet.addAll(paths);
} else {
logger.error(
"Fail to get node list of {}@{} from {}", schemaPattern, level, group);
}
return null;
}));
}
waitForThreadPool(futureList, pool, "getNodesList()");
return new ArrayList<>(nodeSet);
}
private List<PartialPath> getNodesList(PartitionGroup group, PartialPath schemaPattern, int level)
throws CheckConsistencyException, MetadataException {
if (group.contains(metaGroupMember.getThisNode())) {
return getLocalNodesList(group, schemaPattern, level);
} else {
return getRemoteNodesList(group, schemaPattern, level);
}
}
private List<PartialPath> getLocalNodesList(
PartitionGroup group, PartialPath schemaPattern, int level)
throws CheckConsistencyException, MetadataException {
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
return IoTDB.metaManager.getNodesList(
schemaPattern,
level,
new SlotSgFilter(
((SlotPartitionTable) metaGroupMember.getPartitionTable())
.getNodeSlots(group.getHeader())));
} catch (MetadataException e) {
logger.error(
"Cannot not get node list of {}@{} from {} locally", schemaPattern, level, group);
throw e;
}
}
private List<PartialPath> getRemoteNodesList(
PartitionGroup group, PartialPath schemaPattern, int level) {
List<String> paths = null;
for (Node node : group) {
try {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths =
SyncClientAdaptor.getNodeList(
client, group.getHeader(), schemaPattern.getFullPath(), level);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
paths =
syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
if (paths != null) {
break;
}
} catch (IOException e) {
logger.error(LOG_FAIL_CONNECT, node, e);
} catch (TException e) {
logger.error("Error occurs when getting node lists in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting node lists in node {}.", node, e);
Thread.currentThread().interrupt();
}
}
return PartialPath.fromStringList(paths);
}
@Override
protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : globalGroups) {
futureList.add(
pool.submit(
() -> {
Set<String> nextChildrenNodes = null;
try {
nextChildrenNodes = getChildNodeInNextLevel(group, path);
} catch (CheckConsistencyException e) {
logger.error("Fail to get next children nodes of {} from {}", path, group, e);
}
if (nextChildrenNodes != null) {
resultSet.addAll(nextChildrenNodes);
} else {
logger.error("Fail to get next children nodes of {} from {}", path, group);
}
return null;
}));
}
waitForThreadPool(futureList, pool, "getChildNodeInNextLevel()");
return resultSet;
}
private Set<String> getChildNodeInNextLevel(PartitionGroup group, PartialPath path)
throws CheckConsistencyException {
if (group.contains(metaGroupMember.getThisNode())) {
return getLocalChildNodeInNextLevel(group, path);
} else {
return getRemoteChildNodeInNextLevel(group, path);
}
}
private Set<String> getLocalChildNodeInNextLevel(PartitionGroup group, PartialPath path)
throws CheckConsistencyException {
DataGroupMember localDataMember =
metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
return IoTDB.metaManager.getChildNodeInNextLevel(path);
} catch (MetadataException e) {
logger.error("Cannot not get next children nodes of {} from {} locally", path, group);
return Collections.emptySet();
}
}
private Set<String> getRemoteChildNodeInNextLevel(PartitionGroup group, PartialPath path) {
Set<String> nextChildrenNodes = null;
for (Node node : group) {
try {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
nextChildrenNodes =
SyncClientAdaptor.getChildNodeInNextLevel(
client, group.getHeader(), path.getFullPath());
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
nextChildrenNodes =
syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
if (nextChildrenNodes != null) {
break;
}
} catch (IOException e) {
logger.error(LOG_FAIL_CONNECT, node, e);
} catch (TException e) {
logger.error("Error occurs when getting node lists in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting node lists in node {}.", node, e);
Thread.currentThread().interrupt();
}
}
return nextChildrenNodes;
}
@Override
protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : metaGroupMember.getPartitionTable().getGlobalGroups()) {
futureList.add(
pool.submit(
() -> {
Set<String> nextChildren = null;
try {
nextChildren = getNextChildren(group, path);
} catch (CheckConsistencyException e) {
logger.error("Fail to get next children of {} from {}", path, group, e);
}
if (nextChildren != null) {
resultSet.addAll(nextChildren);
} else {
logger.error("Fail to get next children of {} from {}", path, group);
}
return null;
}));
}
waitForThreadPool(futureList, pool, "getPathNextChildren()");
return resultSet;
}
public static void waitForThreadPool(
List<Future<Void>> futures, ExecutorService pool, String methodName)
throws MetadataException {
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
logger.error("Unexpected interruption when waiting for {}", methodName, e);
Thread.currentThread().interrupt();
} catch (RuntimeException | ExecutionException e) {
throw new MetadataException(e);
}
}
pool.shutdown();
try {
pool.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Unexpected interruption when waiting for {}", methodName, e);
}
}
private Set<String> getNextChildren(PartitionGroup group, PartialPath path)
throws CheckConsistencyException {
if (group.contains(metaGroupMember.getThisNode())) {
return getLocalNextChildren(group, path);
} else {
return getRemoteNextChildren(group, path);
}
}
private Set<String> getLocalNextChildren(PartitionGroup group, PartialPath path)
throws CheckConsistencyException {
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
return IoTDB.metaManager.getChildNodePathInNextLevel(path);
} catch (MetadataException e) {
logger.error("Cannot not get next children of {} from {} locally", path, group);
return Collections.emptySet();
}
}
private Set<String> getRemoteNextChildren(PartitionGroup group, PartialPath path) {
Set<String> nextChildren = null;
for (Node node : group) {
try {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
nextChildren =
SyncClientAdaptor.getNextChildren(client, group.getHeader(), path.getFullPath());
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
nextChildren =
syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
if (nextChildren != null) {
break;
}
} catch (IOException e) {
logger.error(LOG_FAIL_CONNECT, node, e);
} catch (TException e) {
logger.error("Error occurs when getting node lists in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting node lists in node {}.", node, e);
Thread.currentThread().interrupt();
}
}
return nextChildren;
}
@Override
protected List<IStorageGroupMNode> getAllStorageGroupNodes() {
try {
metaGroupMember.syncLeader(null);
} catch (CheckConsistencyException e) {
logger.warn("Failed to check consistency.", e);
}
return IoTDB.metaManager.getAllStorageGroupNodes();
}
@Override
protected AlignByDeviceDataSet getAlignByDeviceDataSet(
AlignByDevicePlan plan, QueryContext context, IQueryRouter router) {
return new ClusterAlignByDeviceDataSet(plan, context, router);
}
@Override
protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException {
switch (plan.getLoadConfigurationPlanType()) {
case GLOBAL:
IoTDBDescriptor.getInstance().loadHotModifiedProps(plan.getIoTDBProperties());
ClusterDescriptor.getInstance().loadHotModifiedProps(plan.getClusterProperties());
break;
case LOCAL:
IoTDBDescriptor.getInstance().loadHotModifiedProps();
ClusterDescriptor.getInstance().loadHotModifiedProps();
break;
default:
throw new QueryProcessException(
String.format(
"Unrecognized load configuration plan type: %s",
plan.getLoadConfigurationPlanType()));
}
}
@Override
public void delete(DeletePlan deletePlan) throws QueryProcessException {
if (deletePlan.getPaths().isEmpty()) {
logger.info("TimeSeries list to be deleted is empty.");
return;
}
for (PartialPath path : deletePlan.getPaths()) {
delete(
path,
deletePlan.getDeleteStartTime(),
deletePlan.getDeleteEndTime(),
deletePlan.getIndex());
}
}
@Override
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
throws QueryProcessException {
try {
StorageEngine.getInstance().delete(path, startTime, endTime, planIndex);
} catch (StorageEngineException e) {
throw new QueryProcessException(e);
}
}
}