blob: 6e706cc025f3dba98c49c6c06adce27c80676fe2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.metadata;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.cache.LRUCache;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.LOG_FAIL_CONNECT;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.THREAD_POOL_SIZE;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.waitForThreadPool;
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
@SuppressWarnings("java:S1135") // ignore todos
public class CMManager extends MManager {
private static final Logger logger = LoggerFactory.getLogger(CMManager.class);
private ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
// only cache the series who is writing, we need not to cache series who is reading
// because the read is slow, so pull from remote is little cost comparing to the disk io
private RemoteMetaCache mRemoteMetaCache;
private MetaPuller metaPuller;
private MetaGroupMember metaGroupMember;
private Coordinator coordinator;
private CMManager() {
super();
metaPuller = MetaPuller.getInstance();
int remoteCacheSize = config.getmRemoteSchemaCacheSize();
mRemoteMetaCache = new RemoteMetaCache(remoteCacheSize);
}
private static class MManagerHolder {
private MManagerHolder() {
// allowed to do nothing
}
private static final CMManager INSTANCE = new CMManager();
}
/**
* we should not use this function in other place, but only in IoTDB class
*
* @return
*/
public static CMManager getInstance() {
return CMManager.MManagerHolder.INSTANCE;
}
/**
* sync meta leader to get the newest partition table and storage groups.
*
* @throws MetadataException throws MetadataException if necessary
*/
public void syncMetaLeader() throws MetadataException {
try {
metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
}
@Override
public String deleteTimeseries(PartialPath prefixPath) throws MetadataException {
cacheLock.writeLock().lock();
mRemoteMetaCache.removeItem(prefixPath);
cacheLock.writeLock().unlock();
return super.deleteTimeseries(prefixPath);
}
@Override
public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
cacheLock.writeLock().lock();
for (PartialPath storageGroup : storageGroups) {
mRemoteMetaCache.removeItem(storageGroup);
}
cacheLock.writeLock().unlock();
super.deleteStorageGroups(storageGroups);
}
@Override
public TSDataType getSeriesType(PartialPath path) throws MetadataException {
// try remote cache first
try {
cacheLock.readLock().lock();
MeasurementMNode measurementMNode = mRemoteMetaCache.get(path);
if (measurementMNode != null) {
return measurementMNode.getSchema().getType();
}
} finally {
cacheLock.readLock().unlock();
}
// try local MTree
TSDataType seriesType;
try {
seriesType = super.getSeriesType(path);
} catch (PathNotExistException e) {
// pull from remote node
List<MeasurementSchema> schemas =
metaPuller.pullMeasurementSchemas(Collections.singletonList(path));
if (!schemas.isEmpty()) {
MeasurementSchema measurementSchema = schemas.get(0);
MeasurementMNode measurementMNode =
new MeasurementMNode(
null, measurementSchema.getMeasurementId(), measurementSchema, null);
cacheMeta(path, measurementMNode);
return schemas.get(0).getType();
} else {
throw e;
}
}
return seriesType;
}
/**
* the {@link org.apache.iotdb.db.writelog.recover.LogReplayer#replayLogs(Supplier)} will call
* this to get schema after restart we should retry to get schema util we get the schema.
*
* @param deviceId the device id.
* @param measurements the measurements.
*/
@Override
public MeasurementMNode[] getMNodes(PartialPath deviceId, String[] measurements)
throws MetadataException {
try {
return super.getMNodes(deviceId, measurements);
} catch (MetadataException e) {
// some measurements not exist in local
// try cache
MeasurementMNode[] measurementMNodes = new MeasurementMNode[measurements.length];
int failedMeasurementIndex = getMNodesLocally(deviceId, measurements, measurementMNodes);
if (failedMeasurementIndex == -1) {
return measurementMNodes;
}
// will retry util get schema
pullSeriesSchemas(deviceId, measurements);
// try again
failedMeasurementIndex = getMNodesLocally(deviceId, measurements, measurementMNodes);
if (failedMeasurementIndex != -1) {
throw new MetadataException(
deviceId.getFullPath()
+ IoTDBConstant.PATH_SEPARATOR
+ measurements[failedMeasurementIndex]
+ " is not found");
}
return measurementMNodes;
}
}
/** @return -1 if all schemas are found, or the first index of the non-exist schema */
private int getMNodesLocally(
PartialPath deviceId, String[] measurements, MeasurementMNode[] measurementMNodes) {
int failedMeasurementIndex = -1;
cacheLock.readLock().lock();
try {
for (int i = 0; i < measurements.length && failedMeasurementIndex == -1; i++) {
MeasurementMNode measurementMNode =
mRemoteMetaCache.get(deviceId.concatNode(measurements[i]));
if (measurementMNode == null) {
failedMeasurementIndex = i;
} else {
measurementMNodes[i] = measurementMNode;
}
}
} finally {
cacheLock.readLock().unlock();
}
return failedMeasurementIndex;
}
private void pullSeriesSchemas(PartialPath deviceId, String[] measurementList)
throws MetadataException {
List<PartialPath> schemasToPull = new ArrayList<>();
for (String s : measurementList) {
schemasToPull.add(deviceId.concatNode(s));
}
List<MeasurementSchema> schemas = metaPuller.pullMeasurementSchemas(schemasToPull);
for (MeasurementSchema schema : schemas) {
// TODO-Cluster: also pull alias?
// take care, the pulled schema's measurement Id is only series name
MeasurementMNode measurementMNode =
new MeasurementMNode(null, schema.getMeasurementId(), schema, null);
cacheMeta(deviceId.concatNode(schema.getMeasurementId()), measurementMNode);
}
logger.debug("Pulled {}/{} schemas from remote", schemas.size(), measurementList.length);
}
@Override
public void cacheMeta(PartialPath seriesPath, MeasurementMNode measurementMNode) {
cacheLock.writeLock().lock();
mRemoteMetaCache.put(seriesPath, measurementMNode);
cacheLock.writeLock().unlock();
}
@Override
public void updateLastCache(
PartialPath seriesPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime,
MeasurementMNode node) {
cacheLock.writeLock().lock();
try {
MeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
measurementMNode.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
}
} finally {
cacheLock.writeLock().unlock();
}
// maybe local also has the timeseries
super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node);
}
@Override
public TimeValuePair getLastCache(PartialPath seriesPath) {
MeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
return measurementMNode.getCachedLast();
}
return super.getLastCache(seriesPath);
}
@Override
public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException {
MeasurementMNode[] measurementMNodes = new MeasurementMNode[plan.getMeasurements().length];
int nonExistSchemaIndex =
getMNodesLocally(plan.getDeviceId(), plan.getMeasurements(), measurementMNodes);
if (nonExistSchemaIndex == -1) {
plan.setMeasurementMNodes(measurementMNodes);
return new MNode(null, plan.getDeviceId().getDevice());
}
// auto-create schema in IoTDBConfig is always disabled in the cluster version, and we have
// another config in ClusterConfig to do this
return super.getSeriesSchemasAndReadLockDevice(plan);
}
@Override
public MeasurementSchema getSeriesSchema(PartialPath device, String measurement)
throws MetadataException {
try {
MeasurementSchema measurementSchema = super.getSeriesSchema(device, measurement);
if (measurementSchema != null) {
return measurementSchema;
}
} catch (PathNotExistException e) {
// not found in local
}
// try cache
cacheLock.readLock().lock();
try {
MeasurementMNode measurementMNode = mRemoteMetaCache.get(device.concatNode(measurement));
if (measurementMNode != null) {
return measurementMNode.getSchema();
}
} finally {
cacheLock.readLock().unlock();
}
// pull from remote
pullSeriesSchemas(device, new String[] {measurement});
// try again
cacheLock.readLock().lock();
try {
MeasurementMNode measurementMeta = mRemoteMetaCache.get(device.concatNode(measurement));
if (measurementMeta != null) {
return measurementMeta.getSchema();
}
} finally {
cacheLock.readLock().unlock();
}
return super.getSeriesSchema(device, measurement);
}
/**
* Check whether the path exists.
*
* @param path a full path or a prefix path
*/
@Override
public boolean isPathExist(PartialPath path) {
boolean localExist = super.isPathExist(path);
if (localExist) {
return true;
}
// search the cache
cacheLock.readLock().lock();
try {
return mRemoteMetaCache.containsKey(path);
} finally {
cacheLock.readLock().unlock();
}
}
private static class RemoteMetaCache extends LRUCache<PartialPath, MeasurementMNode> {
RemoteMetaCache(int cacheSize) {
super(cacheSize);
}
@Override
protected MeasurementMNode loadObjectByKey(PartialPath key) {
return null;
}
@Override
public synchronized void removeItem(PartialPath key) {
cache.keySet().removeIf(s -> s.getFullPath().startsWith(key.getFullPath()));
}
@Override
public synchronized MeasurementMNode get(PartialPath key) {
try {
return super.get(key);
} catch (IOException e) {
// not happening
return null;
}
}
public synchronized boolean containsKey(PartialPath key) {
return cache.containsKey(key);
}
}
/**
* create storage groups for CreateTimeseriesPlan, CreateMultiTimeseriesPlan and InsertPlan, also
* create timeseries for InsertPlan. Only the three kind of plans can use this method.
*/
public void createSchema(PhysicalPlan plan) throws MetadataException, CheckConsistencyException {
List<PartialPath> storageGroups = new ArrayList<>();
// for InsertPlan, try to just use deviceIds to get related storage groups because there's no
// need to call getPaths to concat deviceId and sensor as they will gain same result,
// for CreateTimeSeriesPlan, use getPath() to get timeseries to get related storage group,
// for CreateMultiTimeSeriesPlan, use getPaths() to get all timeseries to get related storage
// groups.
if (plan instanceof InsertRowPlan
|| plan instanceof InsertRowsOfOneDevicePlan
|| plan instanceof InsertTabletPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((InsertPlan) plan).getDeviceId())));
} else if (plan instanceof InsertRowsPlan) {
storageGroups.addAll(
getStorageGroups(
((InsertRowsPlan) plan)
.getInsertRowPlanList().stream()
.map(InsertPlan::getDeviceId)
.collect(Collectors.toList())));
} else if (plan instanceof InsertMultiTabletPlan) {
storageGroups.addAll(
getStorageGroups(
((InsertMultiTabletPlan) plan)
.getInsertTabletPlanList().stream()
.map(InsertPlan::getDeviceId)
.collect(Collectors.toList())));
} else if (plan instanceof CreateTimeSeriesPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath())));
} else {
storageGroups.addAll(getStorageGroups(plan.getPaths()));
}
// create storage groups
createStorageGroups(storageGroups);
// need to verify the storage group is created
verifyCreatedSgSuccess(storageGroups, plan);
// try to create timeseries for insertPlan
if (plan instanceof InsertPlan && !createTimeseries((InsertPlan) plan)) {
throw new MetadataException("Failed to create timeseries from InsertPlan automatically.");
}
}
/** return storage groups paths for given deviceIds or timeseries. */
private List<PartialPath> getStorageGroups(List<PartialPath> paths) throws MetadataException {
Set<PartialPath> storageGroups = new HashSet<>();
for (PartialPath path : paths) {
storageGroups.add(
MetaUtils.getStorageGroupPathByLevel(
path, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel()));
}
return new ArrayList<>(storageGroups);
}
@SuppressWarnings("squid:S3776")
private void verifyCreatedSgSuccess(List<PartialPath> storageGroups, PhysicalPlan physicalPlan) {
long startTime = System.currentTimeMillis();
boolean[] ready = new boolean[storageGroups.size()];
Arrays.fill(ready, false);
while (true) {
boolean allReady = true;
for (int i = 0; i < storageGroups.size(); i++) {
if (ready[i]) {
continue;
}
if (IoTDB.metaManager.isStorageGroup(storageGroups.get(i))) {
ready[i] = true;
} else {
allReady = false;
}
}
if (allReady
|| System.currentTimeMillis() - startTime
> ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS()) {
break;
} else {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
logger.debug("Failed to wait for creating sgs for plan {}", physicalPlan, e);
Thread.currentThread().interrupt();
}
}
}
}
/**
* Create storage groups automatically for paths.
*
* @param storageGroups the uncreated storage groups
*/
private void createStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
for (PartialPath storageGroup : storageGroups) {
SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(storageGroup);
TSStatus setStorageGroupResult =
metaGroupMember.processNonPartitionedMetaPlan(setStorageGroupPlan);
if (setStorageGroupResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& setStorageGroupResult.getCode()
!= TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
throw new MetadataException(
String.format(
"Status Code: %d, failed to set storage group %s",
setStorageGroupResult.getCode(), storageGroup));
}
}
}
/**
* @param insertMultiTabletPlan the InsertMultiTabletPlan
* @return true if all InsertTabletPlan in InsertMultiTabletPlan create timeseries success,
* otherwise false
*/
public boolean createTimeseries(InsertMultiTabletPlan insertMultiTabletPlan)
throws CheckConsistencyException, IllegalPathException {
boolean allSuccess = true;
for (InsertTabletPlan insertTabletPlan : insertMultiTabletPlan.getInsertTabletPlanList()) {
boolean success = createTimeseries(insertTabletPlan);
allSuccess = allSuccess && success;
if (!success) {
logger.error(
"create timeseries for device={} failed, plan={}",
insertTabletPlan.getDeviceId(),
insertTabletPlan);
}
}
return allSuccess;
}
public boolean createTimeseries(InsertRowsPlan insertRowsPlan)
throws CheckConsistencyException, IllegalPathException {
boolean allSuccess = true;
for (InsertRowPlan insertRowPlan : insertRowsPlan.getInsertRowPlanList()) {
boolean success = createTimeseries(insertRowPlan);
allSuccess = allSuccess && success;
if (!success) {
logger.error(
"create timeseries for device={} failed, plan={}",
insertRowPlan.getDeviceId(),
insertRowPlan);
}
}
return allSuccess;
}
/**
* Create timeseries automatically for an InsertPlan.
*
* @param insertPlan some of the timeseries in it are not created yet
* @return true of all uncreated timeseries are created
*/
public boolean createTimeseries(InsertPlan insertPlan)
throws IllegalPathException, CheckConsistencyException {
if (insertPlan instanceof InsertMultiTabletPlan) {
return createTimeseries((InsertMultiTabletPlan) insertPlan);
}
if (insertPlan instanceof InsertRowsPlan) {
return createTimeseries((InsertRowsPlan) insertPlan);
}
List<String> seriesList = new ArrayList<>();
PartialPath deviceId = insertPlan.getDeviceId();
PartialPath storageGroupName;
try {
storageGroupName =
MetaUtils.getStorageGroupPathByLevel(
deviceId, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel());
} catch (MetadataException e) {
logger.error("Failed to infer storage group from deviceId {}", deviceId);
return false;
}
for (String measurementId : insertPlan.getMeasurements()) {
seriesList.add(deviceId.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurementId);
}
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName.getFullPath(), 0);
List<String> unregisteredSeriesList = getUnregisteredSeriesList(seriesList, partitionGroup);
if (unregisteredSeriesList.isEmpty()) {
return true;
}
logger.debug("Unregisterd series of {} are {}", seriesList, unregisteredSeriesList);
return createTimeseries(unregisteredSeriesList, seriesList, insertPlan);
}
/**
* create timeseries from paths in "unregisteredSeriesList". If data types are provided by the
* InsertPlan, use them, otherwise infer the types from the values. Use default encodings and
* compressions of the corresponding data type.
*/
private boolean createTimeseries(
List<String> unregisteredSeriesList, List<String> seriesList, InsertPlan insertPlan)
throws IllegalPathException {
List<PartialPath> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
List<TSEncoding> encodings = new ArrayList<>();
List<CompressionType> compressionTypes = new ArrayList<>();
for (String seriesPath : unregisteredSeriesList) {
paths.add(new PartialPath(seriesPath));
int index = seriesList.indexOf(seriesPath);
TSDataType dataType;
// use data types in insertPlan if provided, otherwise infer them from the values
if (insertPlan.getDataTypes() != null && insertPlan.getDataTypes()[index] != null) {
dataType = insertPlan.getDataTypes()[index];
} else {
dataType =
TypeInferenceUtils.getPredictedDataType(
insertPlan instanceof InsertTabletPlan
? Array.get(((InsertTabletPlan) insertPlan).getColumns()[index], 0)
: ((InsertRowPlan) insertPlan).getValues()[index],
true);
}
dataTypes.add(dataType);
// use default encoding and compression from the config
encodings.add(getDefaultEncoding(dataType));
compressionTypes.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
}
CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan();
plan.setPaths(paths);
plan.setDataTypes(dataTypes);
plan.setEncodings(encodings);
plan.setCompressors(compressionTypes);
TSStatus result;
try {
result = coordinator.processPartitionedPlan(plan);
} catch (UnsupportedPlanException e) {
logger.error(
"Failed to create timeseries {} automatically. Unsupported plan exception {} ",
paths,
e.getMessage());
return false;
}
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()
&& result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
logger.error(
"{} failed to execute create timeseries {}: {}",
metaGroupMember.getThisNode(),
paths,
result);
return false;
}
return true;
}
public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
this.metaGroupMember = metaGroupMember;
}
public void setCoordinator(Coordinator coordinator) {
this.coordinator = coordinator;
}
/**
* To check which timeseries in the input list is unregistered from one node in "partitionGroup".
*/
private List<String> getUnregisteredSeriesList(
List<String> seriesList, PartitionGroup partitionGroup) throws CheckConsistencyException {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
return getUnregisteredSeriesListLocally(seriesList, partitionGroup);
} else {
return getUnregisteredSeriesListRemotely(seriesList, partitionGroup);
}
}
private List<String> getUnregisteredSeriesListLocally(
List<String> seriesList, PartitionGroup partitionGroup) throws CheckConsistencyException {
DataGroupMember dataMember =
metaGroupMember
.getDataClusterServer()
.getDataMember(partitionGroup.getHeader(), null, null);
return dataMember.getLocalQueryExecutor().getUnregisteredTimeseries(seriesList);
}
private List<String> getUnregisteredSeriesListRemotely(
List<String> seriesList, PartitionGroup partitionGroup) {
for (Node node : partitionGroup) {
try {
List<String> result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result =
SyncClientAdaptor.getUnregisteredMeasurements(
client, partitionGroup.getHeader(), seriesList);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
result =
syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
}
}
if (result != null) {
return result;
}
} catch (TException | IOException e) {
logger.error(
"{}: cannot getting unregistered {} and other {} paths from {}",
metaGroupMember.getName(),
seriesList.get(0),
seriesList.get(seriesList.size() - 1),
node,
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(
"{}: getting unregistered series list {} ... {} is interrupted from {}",
metaGroupMember.getName(),
seriesList.get(0),
seriesList.get(seriesList.size() - 1),
node,
e);
}
}
return Collections.emptyList();
}
/**
* Pull the all timeseries schemas of given prefixPaths from remote nodes. All prefixPaths must
* contain a storage group. The pulled schemas will be cache in CMManager.
*
* @param ignoredGroup do not pull schema from the group to avoid backward dependency. If a user
* send an insert request before registering schemas, then this method may pull schemas from
* the same groups. If this method is called by an applier, it holds the lock of LogManager,
* while the pulling thread may want this lock too, resulting in a deadlock.
*/
public void pullTimeSeriesSchemas(List<PartialPath> prefixPaths, Node ignoredGroup)
throws MetadataException {
logger.debug(
"{}: Pulling timeseries schemas of {}, ignored group {}",
metaGroupMember.getName(),
prefixPaths,
ignoredGroup);
// split the paths by the data groups that should hold them
Map<PartitionGroup, List<String>> partitionGroupPathMap = new HashMap<>();
for (PartialPath prefixPath : prefixPaths) {
if (SQLConstant.RESERVED_TIME.equalsIgnoreCase(prefixPath.getFullPath())) {
continue;
}
PartitionGroup partitionGroup =
ClusterUtils.partitionByPathTimeWithSync(prefixPath, metaGroupMember);
if (!partitionGroup.getHeader().equals(ignoredGroup)) {
partitionGroupPathMap
.computeIfAbsent(partitionGroup, g -> new ArrayList<>())
.add(prefixPath.getFullPath());
}
}
// pull timeseries schema from every group involved
if (logger.isDebugEnabled()) {
logger.debug(
"{}: pulling schemas of {} and other {} paths from {} groups",
metaGroupMember.getName(),
prefixPaths.get(0),
prefixPaths.size() - 1,
partitionGroupPathMap.size());
}
for (Entry<PartitionGroup, List<String>> partitionGroupListEntry :
partitionGroupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupListEntry.getKey();
List<String> paths = partitionGroupListEntry.getValue();
pullTimeSeriesSchemas(partitionGroup, paths);
}
}
/**
* Pull timeseries schemas of "prefixPaths" from "partitionGroup". If this node is a member of
* "partitionGroup", synchronize with the group leader and collect local schemas. Otherwise pull
* schemas from one node in the group. The pulled schemas will be cached in CMManager.
*/
private void pullTimeSeriesSchemas(PartitionGroup partitionGroup, List<String> prefixPaths) {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the node is in the target group, synchronize with leader should be enough
try {
metaGroupMember
.getLocalDataMember(partitionGroup.getHeader(), "Pull timeseries of " + prefixPaths)
.syncLeader(null);
} catch (CheckConsistencyException e) {
logger.warn("Failed to check consistency.", e);
}
return;
}
// pull schemas from a remote node
PullSchemaRequest pullSchemaRequest = new PullSchemaRequest();
pullSchemaRequest.setHeader(partitionGroup.getHeader());
pullSchemaRequest.setPrefixPaths(prefixPaths);
// decide the node access order with the help of QueryCoordinator
List<Node> nodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : nodes) {
if (tryPullTimeSeriesSchemas(node, pullSchemaRequest)) {
break;
}
}
}
/**
* send the PullSchemaRequest to "node" and cache the results in CMManager if they are
* successfully returned.
*
* @return true if the pull succeeded, false otherwise
*/
private boolean tryPullTimeSeriesSchemas(Node node, PullSchemaRequest request) {
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Pulling timeseries schemas of {} and other {} paths from {}",
metaGroupMember.getName(),
request.getPrefixPaths().get(0),
request.getPrefixPaths().size() - 1,
node);
}
List<TimeseriesSchema> schemas = null;
try {
schemas = pullTimeSeriesSchemas(node, request);
} catch (IOException | TException e) {
logger.error(
"{}: Cannot pull timeseries schemas of {} and other {} paths from {}",
metaGroupMember.getName(),
request.getPrefixPaths().get(0),
request.getPrefixPaths().size() - 1,
node,
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(
"{}: Cannot pull timeseries schemas of {} and other {} paths from {}",
metaGroupMember.getName(),
request.getPrefixPaths().get(0),
request.getPrefixPaths().size() - 1,
node,
e);
}
if (schemas != null) {
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Pulled {} timeseries schemas of {} and other {} paths from {} of {}",
metaGroupMember.getName(),
schemas.size(),
request.getPrefixPaths().get(0),
request.getPrefixPaths().size() - 1,
node,
request.getHeader());
}
for (TimeseriesSchema schema : schemas) {
SchemaUtils.cacheTimeseriesSchema(schema);
}
return true;
}
return false;
}
/**
* send a PullSchemaRequest to a node to pull TimeseriesSchemas, and return the pulled schema or
* null if there was a timeout.
*/
private List<TimeseriesSchema> pullTimeSeriesSchemas(Node node, PullSchemaRequest request)
throws TException, InterruptedException, IOException {
List<TimeseriesSchema> schemas;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
schemas = SyncClientAdaptor.pullTimeseriesSchema(client, request);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
ByteBuffer buffer = pullSchemaResp.schemaBytes;
int size = buffer.getInt();
schemas = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
schemas.add(TimeseriesSchema.deserializeFrom(buffer));
}
}
}
return schemas;
}
/**
* Get the data types of "paths". If "aggregation" is not null, every path will use the
* aggregation. First get types locally and if some paths does not exists, pull them from other
* nodes.
*
* @return the left one of the pair is the column types (considering aggregation), the right one
* of the pair is the measurement types (not considering aggregation)
*/
public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPaths(
List<PartialPath> pathStrs, String aggregation) throws MetadataException {
try {
return getSeriesTypesByPathsLocally(pathStrs, aggregation);
} catch (PathNotExistException e) {
// pull schemas remotely and cache them
pullTimeSeriesSchemas(pathStrs, null);
return getSeriesTypesByPathsLocally(pathStrs, aggregation);
}
}
private Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPathsLocally(
List<PartialPath> pathStrs, String aggregation) throws MetadataException {
List<TSDataType> measurementDataTypes =
SchemaUtils.getSeriesTypesByPaths(pathStrs, (String) null);
// if the aggregation function is null, the type of column in result set
// is equal to the real type of the measurement
if (aggregation == null) {
return new Pair<>(measurementDataTypes, measurementDataTypes);
} else {
// if the aggregation function is not null,
// we should recalculate the type of column in result set
List<TSDataType> columnDataTypes =
SchemaUtils.getAggregatedDataTypes(measurementDataTypes, aggregation);
return new Pair<>(columnDataTypes, measurementDataTypes);
}
}
/**
* Get the data types of "paths". If "aggregations" is not null, each one of it correspond to one
* in "paths". First get types locally and if some paths does not exists, pull them from other
* nodes.
*
* @param aggregations nullable, when not null, correspond to "paths" one-to-one.
* @return the left one of the pair is the column types (considering aggregation), the right one
* of the pair is the measurement types (not considering aggregation)
*/
public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPath(
List<PartialPath> paths, List<String> aggregations) throws MetadataException {
try {
return getSeriesTypesByPathLocally(paths, aggregations);
} catch (PathNotExistException e) {
return getSeriesTypesByPathRemotely(paths, aggregations);
}
}
/**
* get data types of the given paths considering the aggregations from CMManger.
*
* @param aggregations nullable, when not null, correspond to "paths" one-to-one.
* @return the left one of the pair is the column types (considering aggregation), the right one
* of the pair is the measurement types (not considering aggregation)
*/
private Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPathLocally(
List<PartialPath> paths, List<String> aggregations) throws MetadataException {
List<TSDataType> measurementDataTypes = SchemaUtils.getSeriesTypesByPaths(paths);
// if the aggregation function is null, the type of column in result set
// is equal to the real type of the measurement
if (aggregations == null) {
return new Pair<>(measurementDataTypes, measurementDataTypes);
} else {
// if the aggregation function is not null,
// we should recalculate the type of column in result set
List<TSDataType> columnDataTypes = SchemaUtils.getSeriesTypesByPaths(paths, aggregations);
return new Pair<>(columnDataTypes, measurementDataTypes);
}
}
/**
* pull schemas from remote nodes and cache them, then get data types of the given paths
* considering the aggregations from CMManger.
*
* @param aggregations nullable, when not null, correspond to "paths" one-to-one.
* @return the left one of the pair is the column types (considering aggregation), the right one
* of the pair is the measurement types (not considering aggregation)
*/
private Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPathRemotely(
List<PartialPath> paths, List<String> aggregations) throws MetadataException {
// pull schemas remotely and cache them
pullTimeSeriesSchemas(paths, null);
return getSeriesTypesByPathLocally(paths, aggregations);
}
/**
* Get all devices after removing wildcards in the path
*
* @param originPath a path potentially with wildcard
* @return all paths after removing wildcards in the path
*/
public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException {
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
// "root.*" will be translated into:
// "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ...
Map<String, String> sgPathMap = determineStorageGroup(originPath);
Set<PartialPath> ret = getMatchedDevices(sgPathMap);
logger.debug("The devices of path {} are {}", originPath, ret);
return ret;
}
/**
* Split the paths by the data group they belong to and query them from the groups separately.
*
* @param sgPathMap the key is the storage group name and the value is the path to be queried with
* storage group added
* @return a collection of all queried paths
*/
private List<PartialPath> getMatchedPaths(Map<String, String> sgPathMap, boolean withAlias)
throws MetadataException {
List<PartialPath> result = new ArrayList<>();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
String storageGroupName = sgPathEntry.getKey();
PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
// find the data group that should hold the timeseries schemas of the storage group
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName, 0);
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
try {
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader(null);
} catch (CheckConsistencyException e) {
logger.warn("Failed to check consistency.", e);
}
List<PartialPath> allTimeseriesName = getMatchedPathsLocally(pathUnderSG, withAlias);
logger.debug(
"{}: get matched paths of {} locally, result {}",
metaGroupMember.getName(),
partitionGroup,
allTimeseriesName);
result.addAll(allTimeseriesName);
} else {
// batch the queries of the same group to reduce communication
groupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
.add(pathUnderSG.getFullPath());
}
}
// query each data group separately
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
result.addAll(getMatchedPaths(partitionGroup, pathsToQuery, withAlias));
}
return result;
}
private List<PartialPath> getMatchedPathsLocally(PartialPath partialPath, boolean withAlias)
throws MetadataException {
if (!withAlias) {
return getAllTimeseriesPath(partialPath);
} else {
return super.getAllTimeseriesPathWithAlias(partialPath, -1, -1).left;
}
}
private List<PartialPath> getMatchedPaths(
PartitionGroup partitionGroup, List<String> pathsToQuery, boolean withAlias)
throws MetadataException {
// choose the node with lowest latency or highest throughput
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
List<PartialPath> paths =
getMatchedPaths(node, partitionGroup.getHeader(), pathsToQuery, withAlias);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: get matched paths of {} and other {} paths from {} in {}, result {}",
metaGroupMember.getName(),
pathsToQuery.get(0),
pathsToQuery.size() - 1,
node,
partitionGroup.getHeader(),
paths);
}
if (paths != null) {
// a non-null result contains correct result even if it is empty, so query next group
return paths;
}
} catch (IOException | TException e) {
throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
return Collections.emptyList();
}
@SuppressWarnings("java:S1168") // null and empty list are different
private List<PartialPath> getMatchedPaths(
Node node, Node header, List<String> pathsToQuery, boolean withAlias)
throws IOException, TException, InterruptedException {
GetAllPathsResult result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, withAlias);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
}
}
if (result != null) {
// paths may be empty, implying that the group does not contain matched paths, so we do not
// need to query other nodes in the group
List<PartialPath> partialPaths = new ArrayList<>();
for (int i = 0; i < result.paths.size(); i++) {
try {
PartialPath partialPath = new PartialPath(result.paths.get(i));
if (withAlias) {
partialPath.setMeasurementAlias(result.aliasList.get(i));
}
partialPaths.add(partialPath);
} catch (IllegalPathException e) {
// ignore
}
}
return partialPaths;
} else {
// a null implies a network failure, so we have to query other nodes in the group
return null;
}
}
/**
* Split the paths by the data group they belong to and query them from the groups separately.
*
* @param sgPathMap the key is the storage group name and the value is the path to be queried with
* storage group added
* @return a collection of all queried devices
*/
private Set<PartialPath> getMatchedDevices(Map<String, String> sgPathMap)
throws MetadataException {
Set<PartialPath> result = new HashSet<>();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
String storageGroupName = sgPathEntry.getKey();
PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
// find the data group that should hold the timeseries schemas of the storage group
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName, 0);
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
try {
metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader(null);
} catch (CheckConsistencyException e) {
logger.warn("Failed to check consistency.", e);
}
Set<PartialPath> allDevices = getDevices(pathUnderSG);
logger.debug(
"{}: get matched paths of {} locally, result {}",
metaGroupMember.getName(),
partitionGroup,
allDevices);
result.addAll(allDevices);
} else {
// batch the queries of the same group to reduce communication
groupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
.add(pathUnderSG.getFullPath());
}
}
// query each data group separately
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
result.addAll(getMatchedDevices(partitionGroup, pathsToQuery));
}
return result;
}
private Set<PartialPath> getMatchedDevices(
PartitionGroup partitionGroup, List<String> pathsToQuery) throws MetadataException {
// choose the node with lowest latency or highest throughput
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
logger.debug(
"{}: get matched paths of {} from {}, result {} for {}",
metaGroupMember.getName(),
partitionGroup,
node,
paths,
pathsToQuery);
if (paths != null) {
// query next group
Set<PartialPath> partialPaths = new HashSet<>();
for (String path : paths) {
partialPaths.add(new PartialPath(path));
}
return partialPaths;
}
} catch (IOException | TException e) {
throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
return Collections.emptySet();
}
private Set<String> getMatchedDevices(Node node, Node header, List<String> pathsToQuery)
throws IOException, TException, InterruptedException {
Set<String> paths;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
paths = syncDataClient.getAllDevices(header, pathsToQuery);
}
}
return paths;
}
/** Similar to method getAllTimeseriesPath(), but return Path with alias alias. */
@Override
public Pair<List<PartialPath>, Integer> getAllTimeseriesPathWithAlias(
PartialPath prefixPath, int limit, int offset) throws MetadataException {
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
// "root.*" will be translated into:
// "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ...
Map<String, String> sgPathMap = determineStorageGroup(prefixPath);
List<PartialPath> result = getMatchedPaths(sgPathMap, true);
int skippedOffset = 0;
// apply offset and limit
if (offset > 0 && result.size() > offset) {
skippedOffset = offset;
result = result.subList(offset, result.size());
} else if (offset > 0) {
skippedOffset = result.size();
result = Collections.emptyList();
}
if (limit > 0 && result.size() > limit) {
result = result.subList(0, limit);
}
logger.debug("The paths of path {} are {}", prefixPath, result);
return new Pair<>(result, skippedOffset);
}
/**
* Get all paths after removing wildcards in the path
*
* @param originPath a path potentially with wildcard
* @return all paths after removing wildcards in the path
*/
public List<PartialPath> getMatchedPaths(PartialPath originPath) throws MetadataException {
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
// "root.*" will be translated into:
// "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ...
Map<String, String> sgPathMap = determineStorageGroup(originPath);
List<PartialPath> ret = getMatchedPaths(sgPathMap, false);
logger.debug("The paths of path {} are {}", originPath, ret);
return ret;
}
/**
* Get all paths after removing wildcards in the path
*
* @param originalPaths a list of paths, potentially with wildcard
* @return a pair of path lists, the first are the existing full paths, the second are invalid
* original paths
*/
public Pair<List<PartialPath>, List<PartialPath>> getMatchedPaths(
List<PartialPath> originalPaths) {
ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>();
ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>();
ExecutorService getAllPathsService =
Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size());
for (PartialPath pathStr : originalPaths) {
getAllPathsService.submit(
() -> {
try {
List<PartialPath> fullPathStrs = getMatchedPaths(pathStr);
if (fullPathStrs.isEmpty()) {
nonExistPaths.add(pathStr);
logger.debug("Path {} is not found.", pathStr);
} else {
fullPaths.addAll(fullPathStrs);
}
} catch (MetadataException e) {
logger.error("Failed to get full paths of the prefix path: {} because", pathStr, e);
}
});
}
getAllPathsService.shutdown();
try {
getAllPathsService.awaitTermination(
RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
}
return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths));
}
/**
* Get the local paths that match any path in "paths". The result is not deduplicated.
*
* @param paths paths potentially contain wildcards
*/
public List<String> getAllPaths(List<String> paths) throws MetadataException {
List<String> ret = new ArrayList<>();
for (String path : paths) {
getAllTimeseriesPath(new PartialPath(path)).stream()
.map(PartialPath::getFullPath)
.forEach(ret::add);
}
return ret;
}
/**
* Get the local devices that match any path in "paths". The result is deduplicated.
*
* @param paths paths potentially contain wildcards
*/
public Set<String> getAllDevices(List<String> paths) throws MetadataException {
Set<String> results = new HashSet<>();
for (String path : paths) {
getDevices(new PartialPath(path)).stream()
.map(PartialPath::getFullPath)
.forEach(results::add);
}
return results;
}
/**
* Get the nodes of a prefix "path" at "nodeLevel". The method currently requires strong
* consistency.
*
* @param path
* @param nodeLevel
*/
public List<String> getNodeList(String path, int nodeLevel) throws MetadataException {
return getNodesList(new PartialPath(path), nodeLevel).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList());
}
public Set<String> getChildNodeInNextLevel(String path) throws MetadataException {
return getChildNodeInNextLevel(new PartialPath(path));
}
public Set<String> getChildNodePathInNextLevel(String path) throws MetadataException {
return getChildNodePathInNextLevel(new PartialPath(path));
}
/**
* Replace partial paths (paths not containing measurements), and abstract paths (paths containing
* wildcards) with full paths.
*/
public void convertToFullPaths(PhysicalPlan plan)
throws PathNotExistException, CheckConsistencyException {
// make sure this node knows all storage groups
metaGroupMember.syncLeaderWithConsistencyCheck(false);
Pair<List<PartialPath>, List<PartialPath>> getMatchedPathsRet =
getMatchedPaths(plan.getPaths());
List<PartialPath> fullPaths = getMatchedPathsRet.left;
List<PartialPath> nonExistPath = getMatchedPathsRet.right;
plan.setPaths(fullPaths);
if (!nonExistPath.isEmpty()) {
throw new PathNotExistException(
nonExistPath.stream().map(PartialPath::getFullPath).collect(Collectors.toList()));
}
}
@Override
public MNode getMNode(MNode deviceMNode, String measurementName) {
MNode child = deviceMNode.getChild(measurementName);
if (child == null) {
child = mRemoteMetaCache.get(deviceMNode.getPartialPath().concatNode(measurementName));
}
return child;
}
public List<ShowTimeSeriesResult> showLocalTimeseries(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
return super.showTimeseries(plan, context);
}
public List<ShowDevicesResult> getLocalDevices(ShowDevicesPlan plan) throws MetadataException {
return super.getDevices(plan);
}
@Override
public List<ShowDevicesResult> getDevices(ShowDevicesPlan plan) throws MetadataException {
ConcurrentSkipListSet<ShowDevicesResult> resultSet = new ConcurrentSkipListSet<>();
ExecutorService pool =
new ThreadPoolExecutor(
THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
int offset = plan.getOffset();
// do not use limit and offset in sub-queries unless offset is 0, otherwise the results are
// not combinable
if (offset != 0) {
plan.setLimit(0);
plan.setOffset(0);
}
if (logger.isDebugEnabled()) {
logger.debug(
"Fetch devices schemas of {} from {} groups", plan.getPath(), globalGroups.size());
}
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : globalGroups) {
futureList.add(
pool.submit(
() -> {
try {
getDevices(group, plan, resultSet);
} catch (CheckConsistencyException e) {
logger.error("Cannot get show devices result of {} from {}", plan, group);
}
return null;
}));
}
waitForThreadPool(futureList, pool, "getDevices()");
List<ShowDevicesResult> showDevicesResults =
applyShowDevicesLimitOffset(resultSet, limit, offset);
logger.debug("show devices {} has {} results", plan.getPath(), showDevicesResults.size());
return showDevicesResults;
}
@Override
public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
throws MetadataException {
ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet = new ConcurrentSkipListSet<>();
ExecutorService pool =
new ThreadPoolExecutor(
THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
int offset = plan.getOffset();
// do not use limit and offset in sub-queries unless offset is 0, otherwise the results are
// not combinable
if (offset != 0) {
plan.setLimit(0);
plan.setOffset(0);
}
if (logger.isDebugEnabled()) {
logger.debug(
"Fetch timeseries schemas of {} from {} groups", plan.getPath(), globalGroups.size());
}
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : globalGroups) {
futureList.add(
pool.submit(
() -> {
try {
showTimeseries(group, plan, resultSet, context);
} catch (CheckConsistencyException e) {
logger.error("Cannot get show timeseries result of {} from {}", plan, group);
}
return null;
}));
}
waitForThreadPool(futureList, pool, "showTimeseries()");
List<ShowTimeSeriesResult> showTimeSeriesResults =
applyShowTimeseriesLimitOffset(resultSet, limit, offset);
logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size());
return showTimeSeriesResults;
}
private List<ShowTimeSeriesResult> applyShowTimeseriesLimitOffset(
ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet, int limit, int offset) {
List<ShowTimeSeriesResult> showTimeSeriesResults = new ArrayList<>();
Iterator<ShowTimeSeriesResult> iterator = resultSet.iterator();
while (iterator.hasNext() && limit > 0) {
if (offset > 0) {
offset--;
iterator.next();
} else {
limit--;
showTimeSeriesResults.add(iterator.next());
}
}
return showTimeSeriesResults;
}
private List<ShowDevicesResult> applyShowDevicesLimitOffset(
Set<ShowDevicesResult> resultSet, int limit, int offset) {
List<ShowDevicesResult> showDevicesResults = new ArrayList<>();
Iterator<ShowDevicesResult> iterator = resultSet.iterator();
while (iterator.hasNext() && limit > 0) {
if (offset > 0) {
offset--;
iterator.next();
} else {
limit--;
showDevicesResults.add(iterator.next());
}
}
return showDevicesResults;
}
private void showTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
Set<ShowTimeSeriesResult> resultSet,
QueryContext context)
throws CheckConsistencyException, MetadataException {
if (group.contains(metaGroupMember.getThisNode())) {
showLocalTimeseries(group, plan, resultSet, context);
} else {
showRemoteTimeseries(group, plan, resultSet);
}
}
private void getDevices(
PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet)
throws CheckConsistencyException, MetadataException {
if (group.contains(metaGroupMember.getThisNode())) {
getLocalDevices(group, plan, resultSet);
} else {
getRemoteDevices(group, plan, resultSet);
}
}
private void getLocalDevices(
PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet)
throws CheckConsistencyException, MetadataException {
Node header = group.getHeader();
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
List<ShowDevicesResult> localResult = super.getDevices(plan);
resultSet.addAll(localResult);
logger.debug("Fetched {} devices of {} from {}", localResult.size(), plan.getPath(), group);
} catch (MetadataException e) {
logger.error("Cannot execute show devices plan {} from {} locally.", plan, group);
throw e;
}
}
private void showLocalTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
Set<ShowTimeSeriesResult> resultSet,
QueryContext context)
throws CheckConsistencyException, MetadataException {
Node header = group.getHeader();
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
resultSet.addAll(localResult);
logger.debug(
"Fetched local timeseries {} schemas of {} from {}",
localResult.size(),
plan.getPath(),
group);
} catch (MetadataException e) {
logger.error("Cannot execute show timeseries plan {} from {} locally.", plan, group);
throw e;
}
}
private void showRemoteTimeseries(
PartitionGroup group, ShowTimeSeriesPlan plan, Set<ShowTimeSeriesResult> resultSet) {
ByteBuffer resultBinary = null;
for (Node node : group) {
try {
resultBinary = showRemoteTimeseries(node, group, plan);
if (resultBinary != null) {
break;
}
} catch (IOException e) {
logger.error(LOG_FAIL_CONNECT, node, e);
} catch (TException e) {
logger.error("Error occurs when getting timeseries schemas in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting timeseries schemas in node {}.", node, e);
Thread.currentThread().interrupt();
}
}
if (resultBinary != null) {
int size = resultBinary.getInt();
logger.debug(
"Fetched remote timeseries {} schemas of {} from {}", size, plan.getPath(), group);
for (int i = 0; i < size; i++) {
resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary));
}
} else {
logger.error("Failed to execute show timeseries {} in group: {}.", plan, group);
}
}
private void getRemoteDevices(
PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet) {
ByteBuffer resultBinary = null;
for (Node node : group) {
try {
resultBinary = getRemoteDevices(node, group, plan);
if (resultBinary != null) {
break;
}
} catch (IOException e) {
logger.error(LOG_FAIL_CONNECT, node, e);
} catch (TException e) {
logger.error("Error occurs when getting devices schemas in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting devices schemas in node {}.", node, e);
Thread.currentThread().interrupt();
}
}
if (resultBinary != null) {
int size = resultBinary.getInt();
logger.debug("Fetched remote devices {} schemas of {} from {}", size, plan.getPath(), group);
for (int i = 0; i < size; i++) {
resultSet.add(ShowDevicesResult.deserialize(resultBinary));
}
} else {
logger.error("Failed to execute show devices {} in group: {}.", plan, group);
}
}
private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, ShowTimeSeriesPlan plan)
throws IOException, TException, InterruptedException {
ByteBuffer resultBinary;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan);
} else {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
plan.serialize(dataOutputStream);
resultBinary =
syncDataClient.getAllMeasurementSchema(
group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
}
}
return resultBinary;
}
private ByteBuffer getRemoteDevices(Node node, PartitionGroup group, ShowDevicesPlan plan)
throws IOException, TException, InterruptedException {
ByteBuffer resultBinary;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getDevices(client, group.getHeader(), plan);
} else {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
plan.serialize(dataOutputStream);
resultBinary =
syncDataClient.getDevices(
group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
}
}
return resultBinary;
}
public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
throws MetadataException {
List<String> retPaths = new ArrayList<>();
List<String> alias = null;
if (withAlias) {
alias = new ArrayList<>();
}
if (withAlias) {
for (String path : paths) {
List<PartialPath> allTimeseriesPathWithAlias =
super.getAllTimeseriesPathWithAlias(new PartialPath(path), -1, -1).left;
for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
retPaths.add(timeseriesPathWithAlias.getFullPath());
alias.add(timeseriesPathWithAlias.getMeasurementAlias());
}
}
} else {
retPaths = getAllPaths(paths);
}
GetAllPathsResult getAllPathsResult = new GetAllPathsResult();
getAllPathsResult.setPaths(retPaths);
getAllPathsResult.setAliasList(alias);
return getAllPathsResult;
}
@Override
public PartialPath getStorageGroupPath(PartialPath path) throws StorageGroupNotSetException {
try {
return super.getStorageGroupPath(path);
} catch (StorageGroupNotSetException e) {
try {
metaGroupMember.syncLeader(null);
} catch (CheckConsistencyException ex) {
logger.warn("Failed to check consistency.", e);
}
return super.getStorageGroupPath(path);
}
}
}