blob: 1b10399f8f107facb35c6384490635cfb9cd83cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.metadata;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.cache.LRUCache;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.LOG_FAIL_CONNECT;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.THREAD_POOL_SIZE;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.waitForThreadPool;
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
@SuppressWarnings("java:S1135") // ignore todos
public class CMManager extends MManager {
private static final Logger logger = LoggerFactory.getLogger(CMManager.class);
private ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
// only cache the series who is writing, we need not to cache series who is reading
// because the read is slow, so pull from remote is little cost comparing to the disk io
private RemoteMetaCache mRemoteMetaCache;
private MetaPuller metaPuller;
private MetaGroupMember metaGroupMember;
private Coordinator coordinator;
private CMManager() {
super();
metaPuller = MetaPuller.getInstance();
int remoteCacheSize = config.getmRemoteSchemaCacheSize();
mRemoteMetaCache = new RemoteMetaCache(remoteCacheSize);
}
private static class MManagerHolder {
private MManagerHolder() {
// allowed to do nothing
}
private static final CMManager INSTANCE = new CMManager();
}
/**
* we should not use this function in other place, but only in IoTDB class
*
* @return
*/
public static CMManager getInstance() {
return CMManager.MManagerHolder.INSTANCE;
}
/**
* sync meta leader to get the newest partition table and storage groups.
*
* @throws MetadataException throws MetadataException if necessary
*/
public void syncMetaLeader() throws MetadataException {
try {
metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new MetadataException(e);
}
}
@Override
public String deleteTimeseries(PartialPath prefixPath) throws MetadataException {
cacheLock.writeLock().lock();
mRemoteMetaCache.removeItem(prefixPath);
cacheLock.writeLock().unlock();
return super.deleteTimeseries(prefixPath);
}
@Override
public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
cacheLock.writeLock().lock();
for (PartialPath storageGroup : storageGroups) {
mRemoteMetaCache.removeItem(storageGroup);
}
cacheLock.writeLock().unlock();
super.deleteStorageGroups(storageGroups);
}
@Override
public TSDataType getSeriesType(PartialPath path) throws MetadataException {
if (path.equals(SQLConstant.TIME_PATH)) {
return TSDataType.INT64;
}
if (path instanceof VectorPartialPath) {
if (((VectorPartialPath) path).getSubSensorsPathList().size() != 1) {
return TSDataType.VECTOR;
} else {
path = ((VectorPartialPath) path).getSubSensorsPathList().get(0);
}
}
// try remote cache first
try {
cacheLock.readLock().lock();
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(path);
if (measurementMNode != null) {
return measurementMNode.getDataType(path.getMeasurement());
}
} finally {
cacheLock.readLock().unlock();
}
// try local MTree
TSDataType seriesType;
try {
seriesType = super.getSeriesType(path);
} catch (PathNotExistException e) {
// pull from remote node
List<IMeasurementSchema> schemas =
metaPuller.pullMeasurementSchemas(Collections.singletonList(path));
if (!schemas.isEmpty()) {
IMeasurementSchema measurementSchema = schemas.get(0);
IMeasurementMNode measurementMNode =
new MeasurementMNode(
null, measurementSchema.getMeasurementId(), measurementSchema, null);
if (measurementSchema instanceof VectorMeasurementSchema) {
for (String subSensorId : measurementSchema.getValueMeasurementIdList()) {
cacheMeta(new PartialPath(path.getDevice(), subSensorId), measurementMNode, false);
}
cacheMeta(
new PartialPath(path.getDevice(), measurementSchema.getMeasurementId()),
measurementMNode,
true);
} else {
cacheMeta(path, measurementMNode, true);
}
return measurementMNode.getDataType(path.getMeasurement());
} else {
throw e;
}
}
return seriesType;
}
@Override
public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchemas(List<PartialPath> fullPaths)
throws MetadataException {
Map<IMNode, PartialPath> nodeToPartialPath = new LinkedHashMap<>();
Map<IMNode, List<Integer>> nodeToIndex = new LinkedHashMap<>();
for (int i = 0; i < fullPaths.size(); i++) {
PartialPath path = fullPaths.get(i);
IMeasurementMNode node = getMeasurementMNode(path);
super.getNodeToPartialPath(node, nodeToPartialPath, nodeToIndex, path, i);
}
return getPair(fullPaths, nodeToPartialPath, nodeToIndex);
}
@Override
public IMeasurementSchema getSeriesSchema(PartialPath fullPath) throws MetadataException {
return super.getSeriesSchema(fullPath, getMeasurementMNode(fullPath));
}
/**
* Transform the PartialPath to VectorPartialPath if it is a sub sensor of one vector. otherwise,
* we don't change it.
*/
@Override
public PartialPath transformPath(PartialPath partialPath) throws MetadataException {
IMeasurementMNode node = getMeasurementMNode(partialPath);
if (node.getSchema() instanceof MeasurementSchema) {
return partialPath;
} else {
return toVectorPath(partialPath);
}
}
private IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
IMeasurementMNode node = null;
// try remote cache first
try {
cacheLock.readLock().lock();
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(fullPath);
if (measurementMNode != null) {
node = measurementMNode;
}
} finally {
cacheLock.readLock().unlock();
}
if (node == null) {
// try local MTree
try {
node = (IMeasurementMNode) super.getNodeByPath(fullPath);
} catch (PathNotExistException e) {
// pull from remote node
List<IMeasurementSchema> schemas =
metaPuller.pullMeasurementSchemas(Collections.singletonList(fullPath));
if (!schemas.isEmpty()) {
IMeasurementSchema measurementSchema = schemas.get(0);
IMeasurementMNode measurementMNode =
new MeasurementMNode(
null, measurementSchema.getMeasurementId(), measurementSchema, null);
cacheMeta(fullPath, measurementMNode, true);
node = measurementMNode;
} else {
throw e;
}
}
}
return node;
}
/**
* the {@link org.apache.iotdb.db.writelog.recover.LogReplayer#replayLogs(Supplier)} will call
* this to get schema after restart we should retry to get schema util we get the schema.
*
* @param deviceId the device id.
* @param measurements the measurements.
*/
@Override
public IMeasurementMNode[] getMNodes(PartialPath deviceId, String[] measurements)
throws MetadataException {
try {
return super.getMNodes(deviceId, measurements);
} catch (MetadataException e) {
// some measurements not exist in local
// try cache
IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[measurements.length];
int failedMeasurementIndex = getMNodesLocally(deviceId, measurements, measurementMNodes);
if (failedMeasurementIndex == -1) {
return measurementMNodes;
}
// will retry util get schema
pullSeriesSchemas(deviceId, measurements);
// try again
failedMeasurementIndex = getMNodesLocally(deviceId, measurements, measurementMNodes);
if (failedMeasurementIndex != -1) {
throw new MetadataException(
deviceId.getFullPath()
+ IoTDBConstant.PATH_SEPARATOR
+ measurements[failedMeasurementIndex]
+ " is not found");
}
return measurementMNodes;
}
}
/** @return -1 if all schemas are found, or the first index of the non-exist schema */
private int getMNodesLocally(
PartialPath deviceId, String[] measurements, IMeasurementMNode[] measurementMNodes) {
int failedMeasurementIndex = -1;
cacheLock.readLock().lock();
try {
for (int i = 0; i < measurements.length && failedMeasurementIndex == -1; i++) {
IMeasurementMNode measurementMNode =
mRemoteMetaCache.get(deviceId.concatNode(measurements[i]));
if (measurementMNode == null) {
failedMeasurementIndex = i;
} else {
measurementMNodes[i] = measurementMNode;
}
}
} finally {
cacheLock.readLock().unlock();
}
return failedMeasurementIndex;
}
private void pullSeriesSchemas(PartialPath deviceId, String[] measurementList)
throws MetadataException {
List<PartialPath> schemasToPull = new ArrayList<>();
for (String s : measurementList) {
schemasToPull.add(deviceId.concatNode(s));
}
List<IMeasurementSchema> schemas = metaPuller.pullMeasurementSchemas(schemasToPull);
for (IMeasurementSchema schema : schemas) {
// TODO-Cluster: also pull alias?
// take care, the pulled schema's measurement Id is only series name
IMeasurementMNode measurementMNode =
new MeasurementMNode(null, schema.getMeasurementId(), schema, null);
cacheMeta(deviceId.concatNode(schema.getMeasurementId()), measurementMNode, true);
}
logger.debug("Pulled {}/{} schemas from remote", schemas.size(), measurementList.length);
}
/*
do not set FullPath for Vector subSensor
*/
@Override
public void cacheMeta(
PartialPath seriesPath, IMeasurementMNode measurementMNode, boolean needSetFullPath) {
if (needSetFullPath) {
measurementMNode.setFullPath(seriesPath.getFullPath());
}
cacheLock.writeLock().lock();
mRemoteMetaCache.put(seriesPath, measurementMNode);
cacheLock.writeLock().unlock();
}
@Override
public void updateLastCache(
PartialPath seriesPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime,
IMeasurementMNode node) {
cacheLock.writeLock().lock();
try {
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
measurementMNode.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
}
} finally {
cacheLock.writeLock().unlock();
}
// maybe local also has the timeseries
super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node);
}
@Override
public TimeValuePair getLastCache(PartialPath seriesPath) {
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
return measurementMNode.getCachedLast();
}
return super.getLastCache(seriesPath);
}
@Override
public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
throws MetadataException, IOException {
IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[plan.getMeasurements().length];
int nonExistSchemaIndex =
getMNodesLocally(plan.getPrefixPath(), plan.getMeasurements(), measurementMNodes);
if (nonExistSchemaIndex == -1) {
plan.setMeasurementMNodes(measurementMNodes);
return new InternalMNode(null, plan.getPrefixPath().getDevice());
}
// auto-create schema in IoTDBConfig is always disabled in the cluster version, and we have
// another config in ClusterConfig to do this
return super.getSeriesSchemasAndReadLockDevice(plan);
}
@Override
public IMeasurementSchema getSeriesSchema(PartialPath device, String measurement)
throws MetadataException {
try {
IMeasurementSchema measurementSchema = super.getSeriesSchema(device, measurement);
if (measurementSchema != null) {
return measurementSchema;
}
} catch (PathNotExistException e) {
// not found in local
}
// try cache
cacheLock.readLock().lock();
try {
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(device.concatNode(measurement));
if (measurementMNode != null) {
return measurementMNode.getSchema();
}
} finally {
cacheLock.readLock().unlock();
}
// pull from remote
pullSeriesSchemas(device, new String[] {measurement});
// try again
cacheLock.readLock().lock();
try {
IMeasurementMNode measurementMeta = mRemoteMetaCache.get(device.concatNode(measurement));
if (measurementMeta != null) {
return measurementMeta.getSchema();
}
} finally {
cacheLock.readLock().unlock();
}
return super.getSeriesSchema(device, measurement);
}
/**
* Check whether the path exists.
*
* @param path a full path or a prefix path
*/
@Override
public boolean isPathExist(PartialPath path) {
boolean localExist = super.isPathExist(path);
if (localExist) {
return true;
}
// search the cache
cacheLock.readLock().lock();
try {
return mRemoteMetaCache.containsKey(path);
} finally {
cacheLock.readLock().unlock();
}
}
@Override
public Pair<IMNode, Template> getDeviceNodeWithAutoCreate(PartialPath path)
throws MetadataException, IOException {
return getDeviceNodeWithAutoCreate(
path,
ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema(),
false,
config.getDefaultStorageGroupLevel());
}
private static class RemoteMetaCache extends LRUCache<PartialPath, IMeasurementMNode> {
RemoteMetaCache(int cacheSize) {
super(cacheSize);
}
@Override
protected IMeasurementMNode loadObjectByKey(PartialPath key) {
return null;
}
@Override
public synchronized void removeItem(PartialPath key) {
cache.keySet().removeIf(s -> s.getFullPath().startsWith(key.getFullPath()));
}
@Override
public synchronized IMeasurementMNode get(PartialPath key) {
try {
return super.get(key);
} catch (IOException e) {
// not happening
return null;
}
}
public synchronized boolean containsKey(PartialPath key) {
return cache.containsKey(key);
}
}
/**
* create storage groups for CreateTimeseriesPlan, CreateMultiTimeseriesPlan and InsertPlan, also
* create timeseries for InsertPlan. Only the three kind of plans can use this method.
*/
public void createSchema(PhysicalPlan plan) throws MetadataException, CheckConsistencyException {
List<PartialPath> storageGroups = new ArrayList<>();
// for InsertPlan, try to just use deviceIds to get related storage groups because there's no
// need to call getPaths to concat deviceId and sensor as they will gain same result,
// for CreateTimeSeriesPlan, use getPath() to get timeseries to get related storage group,
// for CreateMultiTimeSeriesPlan, use getPaths() to get all timeseries to get related storage
// groups.
if (plan instanceof BatchPlan) {
storageGroups.addAll(getStorageGroups(getValidStorageGroups(plan)));
} else if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((InsertPlan) plan).getPrefixPath())));
} else if (plan instanceof CreateTimeSeriesPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath())));
} else if (plan instanceof CreateAlignedTimeSeriesPlan) {
storageGroups.addAll(
getStorageGroups(
Collections.singletonList(((CreateAlignedTimeSeriesPlan) plan).getPrefixPath())));
} else if (plan instanceof SetSchemaTemplatePlan) {
storageGroups.addAll(
getStorageGroups(
Collections.singletonList(
new PartialPath(((SetSchemaTemplatePlan) plan).getPrefixPath()))));
} else {
storageGroups.addAll(getStorageGroups(plan.getPaths()));
}
// create storage groups
createStorageGroups(storageGroups);
// need to verify the storage group is created
verifyCreatedSgSuccess(storageGroups, plan);
// try to create timeseries for insertPlan
if (plan instanceof InsertPlan && !createTimeseries((InsertPlan) plan)) {
throw new MetadataException("Failed to create timeseries from InsertPlan automatically.");
}
}
private List<PartialPath> getValidStorageGroups(PhysicalPlan plan) {
List<PartialPath> paths = new ArrayList<>();
for (int i = 0; i < plan.getPaths().size(); i++) {
// has permission to create sg
if (!((BatchPlan) plan).getResults().containsKey(i)) {
paths.add(plan.getPaths().get(i));
}
}
return paths;
}
/** return storage groups paths for given deviceIds or timeseries. */
private List<PartialPath> getStorageGroups(List<PartialPath> paths) throws MetadataException {
Set<PartialPath> storageGroups = new HashSet<>();
for (PartialPath path : paths) {
storageGroups.add(
MetaUtils.getStorageGroupPathByLevel(
path, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel()));
}
return new ArrayList<>(storageGroups);
}
@SuppressWarnings("squid:S3776")
private void verifyCreatedSgSuccess(List<PartialPath> storageGroups, PhysicalPlan physicalPlan) {
long startTime = System.currentTimeMillis();
boolean[] ready = new boolean[storageGroups.size()];
Arrays.fill(ready, false);
while (true) {
boolean allReady = true;
for (int i = 0; i < storageGroups.size(); i++) {
if (ready[i]) {
continue;
}
if (IoTDB.metaManager.isStorageGroup(storageGroups.get(i))) {
ready[i] = true;
} else {
allReady = false;
}
}
if (allReady
|| System.currentTimeMillis() - startTime
> ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS()) {
break;
} else {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
logger.debug("Failed to wait for creating sgs for plan {}", physicalPlan, e);
Thread.currentThread().interrupt();
}
}
}
}
/**
* Create storage groups automatically for paths.
*
* @param storageGroups the uncreated storage groups
*/
private void createStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
for (PartialPath storageGroup : storageGroups) {
SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(storageGroup);
TSStatus setStorageGroupResult =
metaGroupMember.processNonPartitionedMetaPlan(setStorageGroupPlan);
if (setStorageGroupResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& setStorageGroupResult.getCode()
!= TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
throw new MetadataException(
String.format(
"Status Code: %d, failed to set storage group %s",
setStorageGroupResult.getCode(), storageGroup));
}
}
}
/**
* @param insertMultiTabletPlan the InsertMultiTabletPlan
* @return true if all InsertTabletPlan in InsertMultiTabletPlan create timeseries success,
* otherwise false
*/
public boolean createTimeseries(InsertMultiTabletPlan insertMultiTabletPlan)
throws CheckConsistencyException, IllegalPathException {
boolean allSuccess = true;
for (InsertTabletPlan insertTabletPlan : insertMultiTabletPlan.getInsertTabletPlanList()) {
boolean success = createTimeseries(insertTabletPlan);
allSuccess = allSuccess && success;
if (!success) {
logger.error(
"create timeseries for device={} failed, plan={}",
insertTabletPlan.getPrefixPath(),
insertTabletPlan);
}
}
return allSuccess;
}
public boolean createTimeseries(InsertRowsPlan insertRowsPlan)
throws CheckConsistencyException, IllegalPathException {
boolean allSuccess = true;
for (InsertRowPlan insertRowPlan : insertRowsPlan.getInsertRowPlanList()) {
boolean success = createTimeseries(insertRowPlan);
allSuccess = allSuccess && success;
if (!success) {
logger.error(
"create timeseries for device={} failed, plan={}",
insertRowPlan.getPrefixPath(),
insertRowPlan);
}
}
return allSuccess;
}
/**
* Create timeseries automatically for an InsertPlan.
*
* @param insertPlan some of the timeseries in it are not created yet
* @return true of all uncreated timeseries are created
*/
public boolean createTimeseries(InsertPlan insertPlan)
throws IllegalPathException, CheckConsistencyException {
if (insertPlan instanceof InsertMultiTabletPlan) {
return createTimeseries((InsertMultiTabletPlan) insertPlan);
}
if (insertPlan instanceof InsertRowsPlan) {
return createTimeseries((InsertRowsPlan) insertPlan);
}
List<String> seriesList = new ArrayList<>();
PartialPath deviceId = insertPlan.getPrefixPath();
PartialPath storageGroupName;
try {
storageGroupName =
MetaUtils.getStorageGroupPathByLevel(
deviceId, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel());
} catch (MetadataException e) {
logger.error("Failed to infer storage group from deviceId {}", deviceId);
return false;
}
for (String measurementId : insertPlan.getMeasurements()) {
seriesList.add(deviceId.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurementId);
}
if (insertPlan.isAligned()) {
return createAlignedTimeseries(seriesList, insertPlan);
}
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName.getFullPath(), 0);
List<String> unregisteredSeriesList = getUnregisteredSeriesList(seriesList, partitionGroup);
if (unregisteredSeriesList.isEmpty()) {
return true;
}
logger.debug("Unregisterd series of {} are {}", seriesList, unregisteredSeriesList);
return createTimeseries(unregisteredSeriesList, seriesList, insertPlan);
}
private boolean createAlignedTimeseries(List<String> seriesList, InsertPlan insertPlan)
throws IllegalPathException {
List<String> measurements = new ArrayList<>();
for (String series : seriesList) {
measurements.addAll(MetaUtils.getMeasurementsInPartialPath(new PartialPath(series)));
}
List<TSDataType> dataTypes = new ArrayList<>(measurements.size());
List<TSEncoding> encodings = new ArrayList<>(measurements.size());
for (int index = 0; index < measurements.size(); index++) {
TSDataType dataType;
if (insertPlan.getDataTypes() != null && insertPlan.getDataTypes()[index] != null) {
dataType = insertPlan.getDataTypes()[index];
} else {
dataType =
TypeInferenceUtils.getPredictedDataType(
insertPlan instanceof InsertTabletPlan
? Array.get(((InsertTabletPlan) insertPlan).getColumns()[index], 0)
: ((InsertRowPlan) insertPlan).getValues()[index],
true);
}
dataTypes.add(dataType);
encodings.add(getDefaultEncoding(dataType));
}
CreateAlignedTimeSeriesPlan plan =
new CreateAlignedTimeSeriesPlan(
insertPlan.getPrefixPath(),
measurements,
dataTypes,
encodings,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
null);
TSStatus result;
try {
result = coordinator.processPartitionedPlan(plan);
} catch (UnsupportedPlanException e) {
logger.error(
"Failed to create timeseries {} automatically. Unsupported plan exception {} ",
plan,
e.getMessage());
return false;
}
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()
&& result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
logger.error(
"{} failed to execute create timeseries {}: {}",
metaGroupMember.getThisNode(),
plan,
result);
return false;
}
return true;
}
/**
* create timeseries from paths in "unregisteredSeriesList". If data types are provided by the
* InsertPlan, use them, otherwise infer the types from the values. Use default encodings and
* compressions of the corresponding data type.
*/
private boolean createTimeseries(
List<String> unregisteredSeriesList, List<String> seriesList, InsertPlan insertPlan)
throws IllegalPathException {
List<PartialPath> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
List<TSEncoding> encodings = new ArrayList<>();
List<CompressionType> compressionTypes = new ArrayList<>();
for (String seriesPath : unregisteredSeriesList) {
paths.add(new PartialPath(seriesPath));
int index = seriesList.indexOf(seriesPath);
TSDataType dataType;
// use data types in insertPlan if provided, otherwise infer them from the values
if (insertPlan.getDataTypes() != null && insertPlan.getDataTypes()[index] != null) {
dataType = insertPlan.getDataTypes()[index];
} else {
dataType =
TypeInferenceUtils.getPredictedDataType(
insertPlan instanceof InsertTabletPlan
? Array.get(((InsertTabletPlan) insertPlan).getColumns()[index], 0)
: ((InsertRowPlan) insertPlan).getValues()[index],
true);
}
dataTypes.add(dataType);
// use default encoding and compression from the config
encodings.add(getDefaultEncoding(dataType));
compressionTypes.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
}
CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan();
plan.setPaths(paths);
plan.setDataTypes(dataTypes);
plan.setEncodings(encodings);
plan.setCompressors(compressionTypes);
TSStatus result;
try {
result = coordinator.processPartitionedPlan(plan);
} catch (UnsupportedPlanException e) {
logger.error(
"Failed to create timeseries {} automatically. Unsupported plan exception {} ",
paths,
e.getMessage());
return false;
}
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()
&& result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
logger.error(
"{} failed to execute create timeseries {}: {}",
metaGroupMember.getThisNode(),
paths,
result);
return false;
}
return true;
}
public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
this.metaGroupMember = metaGroupMember;
}
public void setCoordinator(Coordinator coordinator) {
this.coordinator = coordinator;
}
/**
* To check which timeseries in the input list is unregistered from one node in "partitionGroup".
*/
private List<String> getUnregisteredSeriesList(
List<String> seriesList, PartitionGroup partitionGroup) throws CheckConsistencyException {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
return getUnregisteredSeriesListLocally(seriesList, partitionGroup);
} else {
return getUnregisteredSeriesListRemotely(seriesList, partitionGroup);
}
}
private List<String> getUnregisteredSeriesListLocally(
List<String> seriesList, PartitionGroup partitionGroup) throws CheckConsistencyException {
DataGroupMember dataMember =
metaGroupMember
.getDataClusterServer()
.getDataMember(partitionGroup.getHeader(), null, null);
return dataMember.getLocalQueryExecutor().getUnregisteredTimeseries(seriesList);
}
private List<String> getUnregisteredSeriesListRemotely(
List<String> seriesList, PartitionGroup partitionGroup) {
for (Node node : partitionGroup) {
try {
List<String> result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result =
SyncClientAdaptor.getUnregisteredMeasurements(
client, partitionGroup.getHeader(), seriesList);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
result =
syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
if (result != null) {
return result;
}
} catch (TException | IOException e) {
logger.error(
"{}: cannot getting unregistered {} and other {} paths from {}",
metaGroupMember.getName(),
seriesList.get(0),
seriesList.get(seriesList.size() - 1),
node,
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(
"{}: getting unregistered series list {} ... {} is interrupted from {}",
metaGroupMember.getName(),
seriesList.get(0),
seriesList.get(seriesList.size() - 1),
node,
e);
}
}
return Collections.emptyList();
}
/**
* Get all devices after removing wildcards in the path
*
* @param originPath a path potentially with wildcard
* @return all paths after removing wildcards in the path
*/
public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException {
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
// "root.*" will be translated into:
// "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ...
Map<String, String> sgPathMap = determineStorageGroup(originPath);
Set<PartialPath> ret = getMatchedDevices(sgPathMap);
logger.debug("The devices of path {} are {}", originPath, ret);
return ret;
}
/**
* Split the paths by the data group they belong to and query them from the groups separately.
*
* @param sgPathMap the key is the storage group name and the value is the path to be queried with
* storage group added
* @return a collection of all queried paths
*/
private List<PartialPath> getMatchedPaths(Map<String, String> sgPathMap, boolean withAlias)
throws MetadataException {
List<PartialPath> result = new ArrayList<>();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
String storageGroupName = sgPathEntry.getKey();
PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
// find the data group that should hold the timeseries schemas of the storage group
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName, 0);
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
try {
metaGroupMember
.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
.syncLeader(null);
} catch (CheckConsistencyException e) {
logger.warn("Failed to check consistency.", e);
}
List<PartialPath> allTimeseriesName = getMatchedPathsLocally(pathUnderSG, withAlias);
logger.debug(
"{}: get matched paths of {} locally, result {}",
metaGroupMember.getName(),
partitionGroup,
allTimeseriesName);
result.addAll(allTimeseriesName);
} else {
// batch the queries of the same group to reduce communication
groupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
.add(pathUnderSG.getFullPath());
}
}
// query each data group separately
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
result.addAll(getMatchedPaths(partitionGroup, pathsToQuery, withAlias));
}
return result;
}
private List<PartialPath> getMatchedPathsLocally(PartialPath partialPath, boolean withAlias)
throws MetadataException {
if (!withAlias) {
return getAllTimeseriesPath(partialPath);
} else {
return super.getAllTimeseriesPathWithAlias(partialPath, -1, -1).left;
}
}
private List<PartialPath> getMatchedPaths(
PartitionGroup partitionGroup, List<String> pathsToQuery, boolean withAlias)
throws MetadataException {
// choose the node with lowest latency or highest throughput
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
List<PartialPath> paths =
getMatchedPaths(node, partitionGroup.getHeader(), pathsToQuery, withAlias);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: get matched paths of {} and other {} paths from {} in {}, result {}",
metaGroupMember.getName(),
pathsToQuery.get(0),
pathsToQuery.size() - 1,
node,
partitionGroup.getHeader(),
paths);
}
if (paths != null) {
// a non-null result contains correct result even if it is empty, so query next group
return paths;
}
} catch (IOException | TException e) {
throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
return Collections.emptyList();
}
@SuppressWarnings("java:S1168") // null and empty list are different
private List<PartialPath> getMatchedPaths(
Node node, RaftNode header, List<String> pathsToQuery, boolean withAlias)
throws IOException, TException, InterruptedException {
GetAllPathsResult result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, withAlias);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
if (result != null) {
// paths may be empty, implying that the group does not contain matched paths, so we do not
// need to query other nodes in the group
List<PartialPath> partialPaths = new ArrayList<>();
for (int i = 0; i < result.paths.size(); i++) {
try {
PartialPath partialPath = new PartialPath(result.paths.get(i));
if (withAlias) {
partialPath.setMeasurementAlias(result.aliasList.get(i));
}
partialPaths.add(partialPath);
} catch (IllegalPathException e) {
// ignore
}
}
return partialPaths;
} else {
// a null implies a network failure, so we have to query other nodes in the group
return null;
}
}
/**
* Split the paths by the data group they belong to and query them from the groups separately.
*
* @param sgPathMap the key is the storage group name and the value is the path to be queried with
* storage group added
* @return a collection of all queried devices
*/
private Set<PartialPath> getMatchedDevices(Map<String, String> sgPathMap)
throws MetadataException {
Set<PartialPath> result = new HashSet<>();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
String storageGroupName = sgPathEntry.getKey();
PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
// find the data group that should hold the timeseries schemas of the storage group
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().route(storageGroupName, 0);
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
try {
metaGroupMember
.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
.syncLeader(null);
} catch (CheckConsistencyException e) {
logger.warn("Failed to check consistency.", e);
}
Set<PartialPath> allDevices = getDevices(pathUnderSG);
logger.debug(
"{}: get matched paths of {} locally, result {}",
metaGroupMember.getName(),
partitionGroup,
allDevices);
result.addAll(allDevices);
} else {
// batch the queries of the same group to reduce communication
groupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
.add(pathUnderSG.getFullPath());
}
}
// query each data group separately
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
result.addAll(getMatchedDevices(partitionGroup, pathsToQuery));
}
return result;
}
private Set<PartialPath> getMatchedDevices(
PartitionGroup partitionGroup, List<String> pathsToQuery) throws MetadataException {
// choose the node with lowest latency or highest throughput
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
logger.debug(
"{}: get matched paths of {} from {}, result {} for {}",
metaGroupMember.getName(),
partitionGroup,
node,
paths,
pathsToQuery);
if (paths != null) {
// query next group
Set<PartialPath> partialPaths = new HashSet<>();
for (String path : paths) {
partialPaths.add(new PartialPath(path));
}
return partialPaths;
}
} catch (IOException | TException e) {
throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
return Collections.emptySet();
}
private Set<String> getMatchedDevices(Node node, RaftNode header, List<String> pathsToQuery)
throws IOException, TException, InterruptedException {
Set<String> paths;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
paths = syncDataClient.getAllDevices(header, pathsToQuery);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
return paths;
}
/** Similar to method getAllTimeseriesPath(), but return Path with alias alias. */
@Override
public Pair<List<PartialPath>, Integer> getAllTimeseriesPathWithAlias(
PartialPath prefixPath, int limit, int offset) throws MetadataException {
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
// "root.*" will be translated into:
// "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ...
Map<String, String> sgPathMap = determineStorageGroup(prefixPath);
List<PartialPath> result = getMatchedPaths(sgPathMap, true);
int skippedOffset = 0;
// apply offset and limit
if (offset > 0 && result.size() > offset) {
skippedOffset = offset;
result = result.subList(offset, result.size());
} else if (offset > 0) {
skippedOffset = result.size();
result = Collections.emptyList();
}
if (limit > 0 && result.size() > limit) {
result = result.subList(0, limit);
}
logger.debug("The paths of path {} are {}", prefixPath, result);
return new Pair<>(result, skippedOffset);
}
/**
* Get all paths after removing wildcards in the path
*
* @param originPath a path potentially with wildcard
* @return all paths after removing wildcards in the path
*/
public List<PartialPath> getMatchedPaths(PartialPath originPath) throws MetadataException {
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
// "root.*" will be translated into:
// "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ...
Map<String, String> sgPathMap = determineStorageGroup(originPath);
List<PartialPath> ret = getMatchedPaths(sgPathMap, false);
logger.debug("The paths of path {} are {}", originPath, ret);
return ret;
}
/**
* Get all paths after removing wildcards in the path
*
* @param originalPaths a list of paths, potentially with wildcard
* @return a pair of path lists, the first are the existing full paths, the second are invalid
* original paths
*/
public Pair<List<PartialPath>, List<PartialPath>> getMatchedPaths(
List<PartialPath> originalPaths) {
ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>();
ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>();
ExecutorService getAllPathsService =
Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size());
for (PartialPath pathStr : originalPaths) {
getAllPathsService.submit(
() -> {
try {
List<PartialPath> fullPathStrs = getMatchedPaths(pathStr);
if (fullPathStrs.isEmpty()) {
nonExistPaths.add(pathStr);
logger.debug("Path {} is not found.", pathStr);
} else {
fullPaths.addAll(fullPathStrs);
}
} catch (MetadataException e) {
logger.error("Failed to get full paths of the prefix path: {} because", pathStr, e);
}
});
}
getAllPathsService.shutdown();
try {
getAllPathsService.awaitTermination(
RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
}
return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths));
}
/**
* Get the local paths that match any path in "paths". The result is not deduplicated.
*
* @param paths paths potentially contain wildcards
*/
public List<String> getAllPaths(List<String> paths) throws MetadataException {
List<String> ret = new ArrayList<>();
for (String path : paths) {
getAllTimeseriesPath(new PartialPath(path)).stream()
.map(PartialPath::getFullPath)
.forEach(ret::add);
}
return ret;
}
/**
* Get the local devices that match any path in "paths". The result is deduplicated.
*
* @param paths paths potentially contain wildcards
*/
public Set<String> getAllDevices(List<String> paths) throws MetadataException {
Set<String> results = new HashSet<>();
for (String path : paths) {
getDevices(new PartialPath(path)).stream()
.map(PartialPath::getFullPath)
.forEach(results::add);
}
return results;
}
/**
* Get the nodes of a prefix "path" at "nodeLevel". The method currently requires strong
* consistency.
*
* @param path
* @param nodeLevel
*/
public List<String> getNodeList(String path, int nodeLevel) throws MetadataException {
return getNodesList(new PartialPath(path), nodeLevel).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList());
}
public Set<String> getChildNodeInNextLevel(String path) throws MetadataException {
return getChildNodeInNextLevel(new PartialPath(path));
}
public Set<String> getChildNodePathInNextLevel(String path) throws MetadataException {
return getChildNodePathInNextLevel(new PartialPath(path));
}
/**
* Replace partial paths (paths not containing measurements), and abstract paths (paths containing
* wildcards) with full paths.
*/
public void convertToFullPaths(PhysicalPlan plan)
throws PathNotExistException, CheckConsistencyException {
// make sure this node knows all storage groups
metaGroupMember.syncLeaderWithConsistencyCheck(false);
Pair<List<PartialPath>, List<PartialPath>> getMatchedPathsRet =
getMatchedPaths(plan.getPaths());
List<PartialPath> fullPaths = getMatchedPathsRet.left;
List<PartialPath> nonExistPath = getMatchedPathsRet.right;
plan.setPaths(fullPaths);
if (!nonExistPath.isEmpty()) {
throw new PathNotExistException(
nonExistPath.stream().map(PartialPath::getFullPath).collect(Collectors.toList()));
}
}
@Override
public IMNode getMNode(IMNode deviceMNode, String measurementName) {
IMNode child = deviceMNode.getChild(measurementName);
if (child == null) {
child = mRemoteMetaCache.get(deviceMNode.getPartialPath().concatNode(measurementName));
}
return child;
}
public List<ShowTimeSeriesResult> showLocalTimeseries(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
return super.showTimeseries(plan, context);
}
public List<ShowDevicesResult> getLocalDevices(ShowDevicesPlan plan) throws MetadataException {
return super.getDevices(plan);
}
@Override
public List<ShowDevicesResult> getDevices(ShowDevicesPlan plan) throws MetadataException {
ConcurrentSkipListSet<ShowDevicesResult> resultSet = new ConcurrentSkipListSet<>();
ExecutorService pool =
new ThreadPoolExecutor(
THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
int offset = plan.getOffset();
// do not use limit and offset in sub-queries unless offset is 0, otherwise the results are
// not combinable
if (offset != 0) {
if (limit > Integer.MAX_VALUE - offset) {
plan.setLimit(0);
} else {
plan.setLimit(limit + offset);
}
plan.setOffset(0);
}
if (logger.isDebugEnabled()) {
logger.debug(
"Fetch devices schemas of {} from {} groups", plan.getPath(), globalGroups.size());
}
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : globalGroups) {
futureList.add(
pool.submit(
() -> {
try {
getDevices(group, plan, resultSet);
} catch (CheckConsistencyException e) {
logger.error("Cannot get show devices result of {} from {}", plan, group);
}
return null;
}));
}
waitForThreadPool(futureList, pool, "getDevices()");
List<ShowDevicesResult> showDevicesResults =
applyShowDevicesLimitOffset(resultSet, limit, offset);
logger.debug("show devices {} has {} results", plan.getPath(), showDevicesResults.size());
return showDevicesResults;
}
@Override
public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
throws MetadataException {
ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet = new ConcurrentSkipListSet<>();
ExecutorService pool =
new ThreadPoolExecutor(
THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
List<PartitionGroup> globalGroups = new ArrayList<>();
try {
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().partitionByPathTime(plan.getPath(), 0);
globalGroups.add(partitionGroup);
} catch (MetadataException e) {
// if the path location is not find, obtain the path location from all groups.
globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
}
int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
int offset = plan.getOffset();
// do not use limit and offset in sub-queries unless offset is 0, otherwise the results are
// not combinable
if (offset != 0) {
if (limit > Integer.MAX_VALUE - offset) {
plan.setLimit(0);
} else {
plan.setLimit(limit + offset);
}
plan.setOffset(0);
}
if (logger.isDebugEnabled()) {
logger.debug(
"Fetch timeseries schemas of {} from {} groups", plan.getPath(), globalGroups.size());
}
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : globalGroups) {
futureList.add(
pool.submit(
() -> {
try {
showTimeseries(group, plan, resultSet, context);
} catch (CheckConsistencyException | MetadataException e) {
logger.error("Cannot get show timeseries result of {} from {}", plan, group);
}
return null;
}));
}
waitForThreadPool(futureList, pool, "showTimeseries()");
List<ShowTimeSeriesResult> showTimeSeriesResults =
applyShowTimeseriesLimitOffset(resultSet, limit, offset);
logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size());
return showTimeSeriesResults;
}
private List<ShowTimeSeriesResult> applyShowTimeseriesLimitOffset(
ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet, int limit, int offset) {
List<ShowTimeSeriesResult> showTimeSeriesResults = new ArrayList<>();
Iterator<ShowTimeSeriesResult> iterator = resultSet.iterator();
while (iterator.hasNext() && limit > 0) {
if (offset > 0) {
offset--;
iterator.next();
} else {
limit--;
showTimeSeriesResults.add(iterator.next());
}
}
return showTimeSeriesResults;
}
private List<ShowDevicesResult> applyShowDevicesLimitOffset(
Set<ShowDevicesResult> resultSet, int limit, int offset) {
List<ShowDevicesResult> showDevicesResults = new ArrayList<>();
Iterator<ShowDevicesResult> iterator = resultSet.iterator();
while (iterator.hasNext() && limit > 0) {
if (offset > 0) {
offset--;
iterator.next();
} else {
limit--;
showDevicesResults.add(iterator.next());
}
}
return showDevicesResults;
}
private void showTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
Set<ShowTimeSeriesResult> resultSet,
QueryContext context)
throws CheckConsistencyException, MetadataException {
if (group.contains(metaGroupMember.getThisNode())) {
showLocalTimeseries(group, plan, resultSet, context);
} else {
showRemoteTimeseries(group, plan, resultSet);
}
}
private void getDevices(
PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet)
throws CheckConsistencyException, MetadataException {
if (group.contains(metaGroupMember.getThisNode())) {
getLocalDevices(group, plan, resultSet);
} else {
getRemoteDevices(group, plan, resultSet);
}
}
private void getLocalDevices(
PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet)
throws CheckConsistencyException, MetadataException {
DataGroupMember localDataMember =
metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
List<ShowDevicesResult> localResult = super.getDevices(plan);
resultSet.addAll(localResult);
logger.debug("Fetched {} devices of {} from {}", localResult.size(), plan.getPath(), group);
} catch (MetadataException e) {
logger.error("Cannot execute show devices plan {} from {} locally.", plan, group);
throw e;
}
}
private void showLocalTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
Set<ShowTimeSeriesResult> resultSet,
QueryContext context)
throws CheckConsistencyException, MetadataException {
DataGroupMember localDataMember =
metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
resultSet.addAll(localResult);
logger.debug(
"Fetched local timeseries {} schemas of {} from {}",
localResult.size(),
plan.getPath(),
group);
} catch (MetadataException e) {
logger.error("Cannot execute show timeseries plan {} from {} locally.", plan, group);
throw e;
}
}
private void showRemoteTimeseries(
PartitionGroup group, ShowTimeSeriesPlan plan, Set<ShowTimeSeriesResult> resultSet) {
ByteBuffer resultBinary = null;
for (Node node : group) {
try {
resultBinary = showRemoteTimeseries(node, group, plan);
if (resultBinary != null) {
break;
}
} catch (IOException e) {
logger.error(LOG_FAIL_CONNECT, node, e);
} catch (TException e) {
logger.error("Error occurs when getting timeseries schemas in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting timeseries schemas in node {}.", node, e);
Thread.currentThread().interrupt();
}
}
if (resultBinary != null) {
int size = resultBinary.getInt();
logger.debug(
"Fetched remote timeseries {} schemas of {} from {}", size, plan.getPath(), group);
for (int i = 0; i < size; i++) {
resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary));
}
} else {
logger.error("Failed to execute show timeseries {} in group: {}.", plan, group);
}
}
private void getRemoteDevices(
PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet) {
ByteBuffer resultBinary = null;
for (Node node : group) {
try {
resultBinary = getRemoteDevices(node, group, plan);
if (resultBinary != null) {
break;
}
} catch (IOException e) {
logger.error(LOG_FAIL_CONNECT, node, e);
} catch (TException e) {
logger.error("Error occurs when getting devices schemas in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting devices schemas in node {}.", node, e);
Thread.currentThread().interrupt();
}
}
if (resultBinary != null) {
int size = resultBinary.getInt();
logger.debug("Fetched remote devices {} schemas of {} from {}", size, plan.getPath(), group);
for (int i = 0; i < size; i++) {
resultSet.add(ShowDevicesResult.deserialize(resultBinary));
}
} else {
logger.error("Failed to execute show devices {} in group: {}.", plan, group);
}
}
private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, ShowTimeSeriesPlan plan)
throws IOException, TException, InterruptedException {
ByteBuffer resultBinary;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan);
} else {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
plan.serialize(dataOutputStream);
resultBinary =
syncDataClient.getAllMeasurementSchema(
group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
return resultBinary;
}
private ByteBuffer getRemoteDevices(Node node, PartitionGroup group, ShowDevicesPlan plan)
throws IOException, TException, InterruptedException {
ByteBuffer resultBinary;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
metaGroupMember
.getClientProvider()
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getDevices(client, group.getHeader(), plan);
} else {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
plan.serialize(dataOutputStream);
resultBinary =
syncDataClient.getDevices(
group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
throw e;
}
}
}
return resultBinary;
}
public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
throws MetadataException {
List<String> retPaths = new ArrayList<>();
List<String> alias = null;
if (withAlias) {
alias = new ArrayList<>();
}
if (withAlias) {
for (String path : paths) {
List<PartialPath> allTimeseriesPathWithAlias =
super.getAllTimeseriesPathWithAlias(new PartialPath(path), -1, -1).left;
for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
retPaths.add(timeseriesPathWithAlias.getFullPath());
alias.add(timeseriesPathWithAlias.getMeasurementAlias());
}
}
} else {
retPaths = getAllPaths(paths);
}
GetAllPathsResult getAllPathsResult = new GetAllPathsResult();
getAllPathsResult.setPaths(retPaths);
getAllPathsResult.setAliasList(alias);
return getAllPathsResult;
}
@Override
public PartialPath getStorageGroupPath(PartialPath path) throws StorageGroupNotSetException {
try {
return super.getStorageGroupPath(path);
} catch (StorageGroupNotSetException e) {
try {
metaGroupMember.syncLeader(null);
} catch (CheckConsistencyException ex) {
logger.warn("Failed to check consistency.", e);
}
return super.getStorageGroupPath(path);
}
}
}