| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.cluster.metadata; |
| |
| import org.apache.iotdb.cluster.client.async.AsyncDataClient; |
| import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor; |
| import org.apache.iotdb.cluster.client.sync.SyncDataClient; |
| import org.apache.iotdb.cluster.config.ClusterDescriptor; |
| import org.apache.iotdb.cluster.coordinator.Coordinator; |
| import org.apache.iotdb.cluster.exception.CheckConsistencyException; |
| import org.apache.iotdb.cluster.exception.UnsupportedPlanException; |
| import org.apache.iotdb.cluster.partition.PartitionGroup; |
| import org.apache.iotdb.cluster.query.manage.QueryCoordinator; |
| import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult; |
| import org.apache.iotdb.cluster.rpc.thrift.Node; |
| import org.apache.iotdb.cluster.rpc.thrift.RaftNode; |
| import org.apache.iotdb.cluster.server.RaftServer; |
| import org.apache.iotdb.cluster.server.member.DataGroupMember; |
| import org.apache.iotdb.cluster.server.member.MetaGroupMember; |
| import org.apache.iotdb.db.conf.IoTDBConstant; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.exception.metadata.IllegalPathException; |
| import org.apache.iotdb.db.exception.metadata.MetadataException; |
| import org.apache.iotdb.db.exception.metadata.PathNotExistException; |
| import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; |
| import org.apache.iotdb.db.metadata.MManager; |
| import org.apache.iotdb.db.metadata.MetaUtils; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.metadata.VectorPartialPath; |
| import org.apache.iotdb.db.metadata.mnode.IMNode; |
| import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; |
| import org.apache.iotdb.db.metadata.mnode.InternalMNode; |
| import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; |
| import org.apache.iotdb.db.metadata.template.Template; |
| import org.apache.iotdb.db.qp.constant.SQLConstant; |
| import org.apache.iotdb.db.qp.physical.BatchPlan; |
| import org.apache.iotdb.db.qp.physical.PhysicalPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; |
| import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan; |
| import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; |
| import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; |
| import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; |
| import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; |
| import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan; |
| import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; |
| import org.apache.iotdb.db.query.context.QueryContext; |
| import org.apache.iotdb.db.query.dataset.ShowDevicesResult; |
| import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult; |
| import org.apache.iotdb.db.service.IoTDB; |
| import org.apache.iotdb.db.utils.TypeInferenceUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.service.rpc.thrift.TSStatus; |
| import org.apache.iotdb.tsfile.common.cache.LRUCache; |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.common.constant.TsFileConstant; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; |
| |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.lang.reflect.Array; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.LOG_FAIL_CONNECT; |
| import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.THREAD_POOL_SIZE; |
| import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.waitForThreadPool; |
| import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; |
| |
| @SuppressWarnings("java:S1135") // ignore todos |
| public class CMManager extends MManager { |
| |
| private static final Logger logger = LoggerFactory.getLogger(CMManager.class); |
| |
| private ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); |
| // only cache the series who is writing, we need not to cache series who is reading |
| // because the read is slow, so pull from remote is little cost comparing to the disk io |
| private RemoteMetaCache mRemoteMetaCache; |
| private MetaPuller metaPuller; |
| private MetaGroupMember metaGroupMember; |
| private Coordinator coordinator; |
| |
| private CMManager() { |
| super(); |
| metaPuller = MetaPuller.getInstance(); |
| int remoteCacheSize = config.getmRemoteSchemaCacheSize(); |
| mRemoteMetaCache = new RemoteMetaCache(remoteCacheSize); |
| } |
| |
| private static class MManagerHolder { |
| |
| private MManagerHolder() { |
| // allowed to do nothing |
| } |
| |
| private static final CMManager INSTANCE = new CMManager(); |
| } |
| |
| /** |
| * we should not use this function in other place, but only in IoTDB class |
| * |
| * @return |
| */ |
| public static CMManager getInstance() { |
| return CMManager.MManagerHolder.INSTANCE; |
| } |
| |
| /** |
| * sync meta leader to get the newest partition table and storage groups. |
| * |
| * @throws MetadataException throws MetadataException if necessary |
| */ |
| public void syncMetaLeader() throws MetadataException { |
| try { |
| metaGroupMember.syncLeaderWithConsistencyCheck(false); |
| } catch (CheckConsistencyException e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public String deleteTimeseries(PartialPath prefixPath) throws MetadataException { |
| cacheLock.writeLock().lock(); |
| mRemoteMetaCache.removeItem(prefixPath); |
| cacheLock.writeLock().unlock(); |
| return super.deleteTimeseries(prefixPath); |
| } |
| |
| @Override |
| public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException { |
| cacheLock.writeLock().lock(); |
| for (PartialPath storageGroup : storageGroups) { |
| mRemoteMetaCache.removeItem(storageGroup); |
| } |
| cacheLock.writeLock().unlock(); |
| super.deleteStorageGroups(storageGroups); |
| } |
| |
| @Override |
| public TSDataType getSeriesType(PartialPath path) throws MetadataException { |
| |
| if (path.equals(SQLConstant.TIME_PATH)) { |
| return TSDataType.INT64; |
| } |
| |
| if (path instanceof VectorPartialPath) { |
| if (((VectorPartialPath) path).getSubSensorsPathList().size() != 1) { |
| return TSDataType.VECTOR; |
| } else { |
| path = ((VectorPartialPath) path).getSubSensorsPathList().get(0); |
| } |
| } |
| |
| // try remote cache first |
| try { |
| cacheLock.readLock().lock(); |
| IMeasurementMNode measurementMNode = mRemoteMetaCache.get(path); |
| if (measurementMNode != null) { |
| return measurementMNode.getDataType(path.getMeasurement()); |
| } |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| |
| // try local MTree |
| TSDataType seriesType; |
| try { |
| seriesType = super.getSeriesType(path); |
| } catch (PathNotExistException e) { |
| // pull from remote node |
| List<IMeasurementSchema> schemas = |
| metaPuller.pullMeasurementSchemas(Collections.singletonList(path)); |
| if (!schemas.isEmpty()) { |
| IMeasurementSchema measurementSchema = schemas.get(0); |
| IMeasurementMNode measurementMNode = |
| new MeasurementMNode( |
| null, measurementSchema.getMeasurementId(), measurementSchema, null); |
| if (measurementSchema instanceof VectorMeasurementSchema) { |
| for (String subSensorId : measurementSchema.getValueMeasurementIdList()) { |
| cacheMeta(new PartialPath(path.getDevice(), subSensorId), measurementMNode, false); |
| } |
| cacheMeta( |
| new PartialPath(path.getDevice(), measurementSchema.getMeasurementId()), |
| measurementMNode, |
| true); |
| } else { |
| cacheMeta(path, measurementMNode, true); |
| } |
| return measurementMNode.getDataType(path.getMeasurement()); |
| } else { |
| throw e; |
| } |
| } |
| return seriesType; |
| } |
| |
| @Override |
| public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchemas(List<PartialPath> fullPaths) |
| throws MetadataException { |
| Map<IMNode, PartialPath> nodeToPartialPath = new LinkedHashMap<>(); |
| Map<IMNode, List<Integer>> nodeToIndex = new LinkedHashMap<>(); |
| for (int i = 0; i < fullPaths.size(); i++) { |
| PartialPath path = fullPaths.get(i); |
| IMeasurementMNode node = getMeasurementMNode(path); |
| super.getNodeToPartialPath(node, nodeToPartialPath, nodeToIndex, path, i); |
| } |
| return getPair(fullPaths, nodeToPartialPath, nodeToIndex); |
| } |
| |
| @Override |
| public IMeasurementSchema getSeriesSchema(PartialPath fullPath) throws MetadataException { |
| return super.getSeriesSchema(fullPath, getMeasurementMNode(fullPath)); |
| } |
| |
| /** |
| * Transform the PartialPath to VectorPartialPath if it is a sub sensor of one vector. otherwise, |
| * we don't change it. |
| */ |
| @Override |
| public PartialPath transformPath(PartialPath partialPath) throws MetadataException { |
| IMeasurementMNode node = getMeasurementMNode(partialPath); |
| if (node.getSchema() instanceof MeasurementSchema) { |
| return partialPath; |
| } else { |
| return toVectorPath(partialPath); |
| } |
| } |
| |
| private IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException { |
| IMeasurementMNode node = null; |
| // try remote cache first |
| try { |
| cacheLock.readLock().lock(); |
| IMeasurementMNode measurementMNode = mRemoteMetaCache.get(fullPath); |
| if (measurementMNode != null) { |
| node = measurementMNode; |
| } |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| |
| if (node == null) { |
| // try local MTree |
| try { |
| node = (IMeasurementMNode) super.getNodeByPath(fullPath); |
| } catch (PathNotExistException e) { |
| // pull from remote node |
| List<IMeasurementSchema> schemas = |
| metaPuller.pullMeasurementSchemas(Collections.singletonList(fullPath)); |
| if (!schemas.isEmpty()) { |
| IMeasurementSchema measurementSchema = schemas.get(0); |
| IMeasurementMNode measurementMNode = |
| new MeasurementMNode( |
| null, measurementSchema.getMeasurementId(), measurementSchema, null); |
| cacheMeta(fullPath, measurementMNode, true); |
| node = measurementMNode; |
| } else { |
| throw e; |
| } |
| } |
| } |
| return node; |
| } |
| |
| /** |
| * the {@link org.apache.iotdb.db.writelog.recover.LogReplayer#replayLogs(Supplier)} will call |
| * this to get schema after restart we should retry to get schema util we get the schema. |
| * |
| * @param deviceId the device id. |
| * @param measurements the measurements. |
| */ |
| @Override |
| public IMeasurementMNode[] getMNodes(PartialPath deviceId, String[] measurements) |
| throws MetadataException { |
| try { |
| return super.getMNodes(deviceId, measurements); |
| } catch (MetadataException e) { |
| // some measurements not exist in local |
| // try cache |
| IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[measurements.length]; |
| int failedMeasurementIndex = getMNodesLocally(deviceId, measurements, measurementMNodes); |
| if (failedMeasurementIndex == -1) { |
| return measurementMNodes; |
| } |
| |
| // will retry util get schema |
| pullSeriesSchemas(deviceId, measurements); |
| |
| // try again |
| failedMeasurementIndex = getMNodesLocally(deviceId, measurements, measurementMNodes); |
| if (failedMeasurementIndex != -1) { |
| throw new MetadataException( |
| deviceId.getFullPath() |
| + IoTDBConstant.PATH_SEPARATOR |
| + measurements[failedMeasurementIndex] |
| + " is not found"); |
| } |
| return measurementMNodes; |
| } |
| } |
| |
| /** @return -1 if all schemas are found, or the first index of the non-exist schema */ |
| private int getMNodesLocally( |
| PartialPath deviceId, String[] measurements, IMeasurementMNode[] measurementMNodes) { |
| int failedMeasurementIndex = -1; |
| cacheLock.readLock().lock(); |
| try { |
| for (int i = 0; i < measurements.length && failedMeasurementIndex == -1; i++) { |
| IMeasurementMNode measurementMNode = |
| mRemoteMetaCache.get(deviceId.concatNode(measurements[i])); |
| if (measurementMNode == null) { |
| failedMeasurementIndex = i; |
| } else { |
| measurementMNodes[i] = measurementMNode; |
| } |
| } |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| return failedMeasurementIndex; |
| } |
| |
| private void pullSeriesSchemas(PartialPath deviceId, String[] measurementList) |
| throws MetadataException { |
| List<PartialPath> schemasToPull = new ArrayList<>(); |
| for (String s : measurementList) { |
| schemasToPull.add(deviceId.concatNode(s)); |
| } |
| List<IMeasurementSchema> schemas = metaPuller.pullMeasurementSchemas(schemasToPull); |
| for (IMeasurementSchema schema : schemas) { |
| // TODO-Cluster: also pull alias? |
| // take care, the pulled schema's measurement Id is only series name |
| IMeasurementMNode measurementMNode = |
| new MeasurementMNode(null, schema.getMeasurementId(), schema, null); |
| cacheMeta(deviceId.concatNode(schema.getMeasurementId()), measurementMNode, true); |
| } |
| logger.debug("Pulled {}/{} schemas from remote", schemas.size(), measurementList.length); |
| } |
| |
| /* |
| do not set FullPath for Vector subSensor |
| */ |
| @Override |
| public void cacheMeta( |
| PartialPath seriesPath, IMeasurementMNode measurementMNode, boolean needSetFullPath) { |
| if (needSetFullPath) { |
| measurementMNode.setFullPath(seriesPath.getFullPath()); |
| } |
| cacheLock.writeLock().lock(); |
| mRemoteMetaCache.put(seriesPath, measurementMNode); |
| cacheLock.writeLock().unlock(); |
| } |
| |
| @Override |
| public void updateLastCache( |
| PartialPath seriesPath, |
| TimeValuePair timeValuePair, |
| boolean highPriorityUpdate, |
| Long latestFlushedTime, |
| IMeasurementMNode node) { |
| cacheLock.writeLock().lock(); |
| try { |
| IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath); |
| if (measurementMNode != null) { |
| measurementMNode.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); |
| } |
| } finally { |
| cacheLock.writeLock().unlock(); |
| } |
| // maybe local also has the timeseries |
| super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node); |
| } |
| |
| @Override |
| public TimeValuePair getLastCache(PartialPath seriesPath) { |
| IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath); |
| if (measurementMNode != null) { |
| return measurementMNode.getCachedLast(); |
| } |
| |
| return super.getLastCache(seriesPath); |
| } |
| |
| @Override |
| public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) |
| throws MetadataException, IOException { |
| IMeasurementMNode[] measurementMNodes = new IMeasurementMNode[plan.getMeasurements().length]; |
| int nonExistSchemaIndex = |
| getMNodesLocally(plan.getPrefixPath(), plan.getMeasurements(), measurementMNodes); |
| if (nonExistSchemaIndex == -1) { |
| plan.setMeasurementMNodes(measurementMNodes); |
| return new InternalMNode(null, plan.getPrefixPath().getDevice()); |
| } |
| // auto-create schema in IoTDBConfig is always disabled in the cluster version, and we have |
| // another config in ClusterConfig to do this |
| return super.getSeriesSchemasAndReadLockDevice(plan); |
| } |
| |
| @Override |
| public IMeasurementSchema getSeriesSchema(PartialPath device, String measurement) |
| throws MetadataException { |
| try { |
| IMeasurementSchema measurementSchema = super.getSeriesSchema(device, measurement); |
| if (measurementSchema != null) { |
| return measurementSchema; |
| } |
| } catch (PathNotExistException e) { |
| // not found in local |
| } |
| |
| // try cache |
| cacheLock.readLock().lock(); |
| try { |
| IMeasurementMNode measurementMNode = mRemoteMetaCache.get(device.concatNode(measurement)); |
| if (measurementMNode != null) { |
| return measurementMNode.getSchema(); |
| } |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| |
| // pull from remote |
| pullSeriesSchemas(device, new String[] {measurement}); |
| |
| // try again |
| cacheLock.readLock().lock(); |
| try { |
| IMeasurementMNode measurementMeta = mRemoteMetaCache.get(device.concatNode(measurement)); |
| if (measurementMeta != null) { |
| return measurementMeta.getSchema(); |
| } |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| return super.getSeriesSchema(device, measurement); |
| } |
| |
| /** |
| * Check whether the path exists. |
| * |
| * @param path a full path or a prefix path |
| */ |
| @Override |
| public boolean isPathExist(PartialPath path) { |
| boolean localExist = super.isPathExist(path); |
| if (localExist) { |
| return true; |
| } |
| |
| // search the cache |
| cacheLock.readLock().lock(); |
| try { |
| return mRemoteMetaCache.containsKey(path); |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public Pair<IMNode, Template> getDeviceNodeWithAutoCreate(PartialPath path) |
| throws MetadataException, IOException { |
| return getDeviceNodeWithAutoCreate( |
| path, |
| ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema(), |
| false, |
| config.getDefaultStorageGroupLevel()); |
| } |
| |
| private static class RemoteMetaCache extends LRUCache<PartialPath, IMeasurementMNode> { |
| |
| RemoteMetaCache(int cacheSize) { |
| super(cacheSize); |
| } |
| |
| @Override |
| protected IMeasurementMNode loadObjectByKey(PartialPath key) { |
| return null; |
| } |
| |
| @Override |
| public synchronized void removeItem(PartialPath key) { |
| cache.keySet().removeIf(s -> s.getFullPath().startsWith(key.getFullPath())); |
| } |
| |
| @Override |
| public synchronized IMeasurementMNode get(PartialPath key) { |
| try { |
| return super.get(key); |
| } catch (IOException e) { |
| // not happening |
| return null; |
| } |
| } |
| |
| public synchronized boolean containsKey(PartialPath key) { |
| return cache.containsKey(key); |
| } |
| } |
| |
| /** |
| * create storage groups for CreateTimeseriesPlan, CreateMultiTimeseriesPlan and InsertPlan, also |
| * create timeseries for InsertPlan. Only the three kind of plans can use this method. |
| */ |
| public void createSchema(PhysicalPlan plan) throws MetadataException, CheckConsistencyException { |
| List<PartialPath> storageGroups = new ArrayList<>(); |
| // for InsertPlan, try to just use deviceIds to get related storage groups because there's no |
| // need to call getPaths to concat deviceId and sensor as they will gain same result, |
| // for CreateTimeSeriesPlan, use getPath() to get timeseries to get related storage group, |
| // for CreateMultiTimeSeriesPlan, use getPaths() to get all timeseries to get related storage |
| // groups. |
| if (plan instanceof BatchPlan) { |
| storageGroups.addAll(getStorageGroups(getValidStorageGroups(plan))); |
| } else if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) { |
| storageGroups.addAll( |
| getStorageGroups(Collections.singletonList(((InsertPlan) plan).getPrefixPath()))); |
| } else if (plan instanceof CreateTimeSeriesPlan) { |
| storageGroups.addAll( |
| getStorageGroups(Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath()))); |
| } else if (plan instanceof CreateAlignedTimeSeriesPlan) { |
| storageGroups.addAll( |
| getStorageGroups( |
| Collections.singletonList(((CreateAlignedTimeSeriesPlan) plan).getPrefixPath()))); |
| } else if (plan instanceof SetDeviceTemplatePlan) { |
| storageGroups.addAll( |
| getStorageGroups( |
| Collections.singletonList( |
| new PartialPath(((SetDeviceTemplatePlan) plan).getPrefixPath())))); |
| } else { |
| storageGroups.addAll(getStorageGroups(plan.getPaths())); |
| } |
| |
| // create storage groups |
| createStorageGroups(storageGroups); |
| |
| // need to verify the storage group is created |
| verifyCreatedSgSuccess(storageGroups, plan); |
| |
| // try to create timeseries for insertPlan |
| if (plan instanceof InsertPlan && !createTimeseries((InsertPlan) plan)) { |
| throw new MetadataException("Failed to create timeseries from InsertPlan automatically."); |
| } |
| } |
| |
| private List<PartialPath> getValidStorageGroups(PhysicalPlan plan) { |
| List<PartialPath> paths = new ArrayList<>(); |
| for (int i = 0; i < plan.getPaths().size(); i++) { |
| // has permission to create sg |
| if (!((BatchPlan) plan).getResults().containsKey(i)) { |
| paths.add(plan.getPaths().get(i)); |
| } |
| } |
| return paths; |
| } |
| |
| /** return storage groups paths for given deviceIds or timeseries. */ |
| private List<PartialPath> getStorageGroups(List<PartialPath> paths) throws MetadataException { |
| Set<PartialPath> storageGroups = new HashSet<>(); |
| for (PartialPath path : paths) { |
| storageGroups.add( |
| MetaUtils.getStorageGroupPathByLevel( |
| path, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel())); |
| } |
| return new ArrayList<>(storageGroups); |
| } |
| |
| @SuppressWarnings("squid:S3776") |
| private void verifyCreatedSgSuccess(List<PartialPath> storageGroups, PhysicalPlan physicalPlan) { |
| long startTime = System.currentTimeMillis(); |
| boolean[] ready = new boolean[storageGroups.size()]; |
| Arrays.fill(ready, false); |
| while (true) { |
| boolean allReady = true; |
| for (int i = 0; i < storageGroups.size(); i++) { |
| if (ready[i]) { |
| continue; |
| } |
| if (IoTDB.metaManager.isStorageGroup(storageGroups.get(i))) { |
| ready[i] = true; |
| } else { |
| allReady = false; |
| } |
| } |
| |
| if (allReady |
| || System.currentTimeMillis() - startTime |
| > ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS()) { |
| break; |
| } else { |
| try { |
| Thread.sleep(1); |
| } catch (InterruptedException e) { |
| logger.debug("Failed to wait for creating sgs for plan {}", physicalPlan, e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Create storage groups automatically for paths. |
| * |
| * @param storageGroups the uncreated storage groups |
| */ |
| private void createStorageGroups(List<PartialPath> storageGroups) throws MetadataException { |
| for (PartialPath storageGroup : storageGroups) { |
| SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(storageGroup); |
| TSStatus setStorageGroupResult = |
| metaGroupMember.processNonPartitionedMetaPlan(setStorageGroupPlan); |
| if (setStorageGroupResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() |
| && setStorageGroupResult.getCode() |
| != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) { |
| throw new MetadataException( |
| String.format( |
| "Status Code: %d, failed to set storage group %s", |
| setStorageGroupResult.getCode(), storageGroup)); |
| } |
| } |
| } |
| |
| /** |
| * @param insertMultiTabletPlan the InsertMultiTabletPlan |
| * @return true if all InsertTabletPlan in InsertMultiTabletPlan create timeseries success, |
| * otherwise false |
| */ |
| public boolean createTimeseries(InsertMultiTabletPlan insertMultiTabletPlan) |
| throws CheckConsistencyException, IllegalPathException { |
| boolean allSuccess = true; |
| for (InsertTabletPlan insertTabletPlan : insertMultiTabletPlan.getInsertTabletPlanList()) { |
| boolean success = createTimeseries(insertTabletPlan); |
| allSuccess = allSuccess && success; |
| if (!success) { |
| logger.error( |
| "create timeseries for device={} failed, plan={}", |
| insertTabletPlan.getPrefixPath(), |
| insertTabletPlan); |
| } |
| } |
| return allSuccess; |
| } |
| |
| public boolean createTimeseries(InsertRowsPlan insertRowsPlan) |
| throws CheckConsistencyException, IllegalPathException { |
| boolean allSuccess = true; |
| for (InsertRowPlan insertRowPlan : insertRowsPlan.getInsertRowPlanList()) { |
| boolean success = createTimeseries(insertRowPlan); |
| allSuccess = allSuccess && success; |
| if (!success) { |
| logger.error( |
| "create timeseries for device={} failed, plan={}", |
| insertRowPlan.getPrefixPath(), |
| insertRowPlan); |
| } |
| } |
| return allSuccess; |
| } |
| |
| /** |
| * Create timeseries automatically for an InsertPlan. |
| * |
| * @param insertPlan some of the timeseries in it are not created yet |
| * @return true of all uncreated timeseries are created |
| */ |
| public boolean createTimeseries(InsertPlan insertPlan) |
| throws IllegalPathException, CheckConsistencyException { |
| if (insertPlan instanceof InsertMultiTabletPlan) { |
| return createTimeseries((InsertMultiTabletPlan) insertPlan); |
| } |
| |
| if (insertPlan instanceof InsertRowsPlan) { |
| return createTimeseries((InsertRowsPlan) insertPlan); |
| } |
| |
| List<String> seriesList = new ArrayList<>(); |
| PartialPath deviceId = insertPlan.getPrefixPath(); |
| PartialPath storageGroupName; |
| try { |
| storageGroupName = |
| MetaUtils.getStorageGroupPathByLevel( |
| deviceId, IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel()); |
| } catch (MetadataException e) { |
| logger.error("Failed to infer storage group from deviceId {}", deviceId); |
| return false; |
| } |
| for (String measurementId : insertPlan.getMeasurements()) { |
| seriesList.add(deviceId.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurementId); |
| } |
| if (insertPlan.isAligned()) { |
| return createAlignedTimeseries(seriesList, insertPlan); |
| } |
| PartitionGroup partitionGroup = |
| metaGroupMember.getPartitionTable().route(storageGroupName.getFullPath(), 0); |
| List<String> unregisteredSeriesList = getUnregisteredSeriesList(seriesList, partitionGroup); |
| if (unregisteredSeriesList.isEmpty()) { |
| return true; |
| } |
| logger.debug("Unregisterd series of {} are {}", seriesList, unregisteredSeriesList); |
| |
| return createTimeseries(unregisteredSeriesList, seriesList, insertPlan); |
| } |
| |
| private boolean createAlignedTimeseries(List<String> seriesList, InsertPlan insertPlan) |
| throws IllegalPathException { |
| List<String> measurements = new ArrayList<>(); |
| for (String series : seriesList) { |
| measurements.addAll(MetaUtils.getMeasurementsInPartialPath(new PartialPath(series))); |
| } |
| |
| List<TSDataType> dataTypes = new ArrayList<>(measurements.size()); |
| List<TSEncoding> encodings = new ArrayList<>(measurements.size()); |
| for (int index = 0; index < measurements.size(); index++) { |
| TSDataType dataType; |
| if (insertPlan.getDataTypes() != null && insertPlan.getDataTypes()[index] != null) { |
| dataType = insertPlan.getDataTypes()[index]; |
| } else { |
| dataType = |
| TypeInferenceUtils.getPredictedDataType( |
| insertPlan instanceof InsertTabletPlan |
| ? Array.get(((InsertTabletPlan) insertPlan).getColumns()[index], 0) |
| : ((InsertRowPlan) insertPlan).getValues()[index], |
| true); |
| } |
| dataTypes.add(dataType); |
| encodings.add(getDefaultEncoding(dataType)); |
| } |
| |
| CreateAlignedTimeSeriesPlan plan = |
| new CreateAlignedTimeSeriesPlan( |
| insertPlan.getPrefixPath(), |
| measurements, |
| dataTypes, |
| encodings, |
| TSFileDescriptor.getInstance().getConfig().getCompressor(), |
| null); |
| TSStatus result; |
| try { |
| result = coordinator.processPartitionedPlan(plan); |
| } catch (UnsupportedPlanException e) { |
| logger.error( |
| "Failed to create timeseries {} automatically. Unsupported plan exception {} ", |
| plan, |
| e.getMessage()); |
| return false; |
| } |
| if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() |
| && result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode() |
| && result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) { |
| logger.error( |
| "{} failed to execute create timeseries {}: {}", |
| metaGroupMember.getThisNode(), |
| plan, |
| result); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * create timeseries from paths in "unregisteredSeriesList". If data types are provided by the |
| * InsertPlan, use them, otherwise infer the types from the values. Use default encodings and |
| * compressions of the corresponding data type. |
| */ |
| private boolean createTimeseries( |
| List<String> unregisteredSeriesList, List<String> seriesList, InsertPlan insertPlan) |
| throws IllegalPathException { |
| List<PartialPath> paths = new ArrayList<>(); |
| List<TSDataType> dataTypes = new ArrayList<>(); |
| List<TSEncoding> encodings = new ArrayList<>(); |
| List<CompressionType> compressionTypes = new ArrayList<>(); |
| for (String seriesPath : unregisteredSeriesList) { |
| paths.add(new PartialPath(seriesPath)); |
| int index = seriesList.indexOf(seriesPath); |
| TSDataType dataType; |
| // use data types in insertPlan if provided, otherwise infer them from the values |
| if (insertPlan.getDataTypes() != null && insertPlan.getDataTypes()[index] != null) { |
| dataType = insertPlan.getDataTypes()[index]; |
| } else { |
| dataType = |
| TypeInferenceUtils.getPredictedDataType( |
| insertPlan instanceof InsertTabletPlan |
| ? Array.get(((InsertTabletPlan) insertPlan).getColumns()[index], 0) |
| : ((InsertRowPlan) insertPlan).getValues()[index], |
| true); |
| } |
| dataTypes.add(dataType); |
| // use default encoding and compression from the config |
| encodings.add(getDefaultEncoding(dataType)); |
| compressionTypes.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); |
| } |
| CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan(); |
| plan.setPaths(paths); |
| plan.setDataTypes(dataTypes); |
| plan.setEncodings(encodings); |
| plan.setCompressors(compressionTypes); |
| |
| TSStatus result; |
| try { |
| result = coordinator.processPartitionedPlan(plan); |
| } catch (UnsupportedPlanException e) { |
| logger.error( |
| "Failed to create timeseries {} automatically. Unsupported plan exception {} ", |
| paths, |
| e.getMessage()); |
| return false; |
| } |
| if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() |
| && result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode() |
| && result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) { |
| logger.error( |
| "{} failed to execute create timeseries {}: {}", |
| metaGroupMember.getThisNode(), |
| paths, |
| result); |
| return false; |
| } |
| return true; |
| } |
| |
| public void setMetaGroupMember(MetaGroupMember metaGroupMember) { |
| this.metaGroupMember = metaGroupMember; |
| } |
| |
| public void setCoordinator(Coordinator coordinator) { |
| this.coordinator = coordinator; |
| } |
| |
| /** |
| * To check which timeseries in the input list is unregistered from one node in "partitionGroup". |
| */ |
| private List<String> getUnregisteredSeriesList( |
| List<String> seriesList, PartitionGroup partitionGroup) throws CheckConsistencyException { |
| if (partitionGroup.contains(metaGroupMember.getThisNode())) { |
| return getUnregisteredSeriesListLocally(seriesList, partitionGroup); |
| } else { |
| return getUnregisteredSeriesListRemotely(seriesList, partitionGroup); |
| } |
| } |
| |
| private List<String> getUnregisteredSeriesListLocally( |
| List<String> seriesList, PartitionGroup partitionGroup) throws CheckConsistencyException { |
| DataGroupMember dataMember = |
| metaGroupMember |
| .getDataClusterServer() |
| .getDataMember(partitionGroup.getHeader(), null, null); |
| return dataMember.getLocalQueryExecutor().getUnregisteredTimeseries(seriesList); |
| } |
| |
| private List<String> getUnregisteredSeriesListRemotely( |
| List<String> seriesList, PartitionGroup partitionGroup) { |
| for (Node node : partitionGroup) { |
| try { |
| List<String> result; |
| if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { |
| AsyncDataClient client = |
| metaGroupMember |
| .getClientProvider() |
| .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); |
| result = |
| SyncClientAdaptor.getUnregisteredMeasurements( |
| client, partitionGroup.getHeader(), seriesList); |
| } else { |
| try (SyncDataClient syncDataClient = |
| metaGroupMember |
| .getClientProvider() |
| .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) { |
| try { |
| result = |
| syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList); |
| } catch (TException e) { |
| // the connection may be broken, close it to avoid it being reused |
| syncDataClient.getInputProtocol().getTransport().close(); |
| throw e; |
| } |
| } |
| } |
| if (result != null) { |
| return result; |
| } |
| } catch (TException | IOException e) { |
| logger.error( |
| "{}: cannot getting unregistered {} and other {} paths from {}", |
| metaGroupMember.getName(), |
| seriesList.get(0), |
| seriesList.get(seriesList.size() - 1), |
| node, |
| e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| logger.error( |
| "{}: getting unregistered series list {} ... {} is interrupted from {}", |
| metaGroupMember.getName(), |
| seriesList.get(0), |
| seriesList.get(seriesList.size() - 1), |
| node, |
| e); |
| } |
| } |
| return Collections.emptyList(); |
| } |
| |
| /** |
| * Get all devices after removing wildcards in the path |
| * |
| * @param originPath a path potentially with wildcard |
| * @return all paths after removing wildcards in the path |
| */ |
| public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException { |
| // get all storage groups this path may belong to |
| // the key is the storage group name and the value is the path to be queried with storage group |
| // added, e.g: |
| // "root.*" will be translated into: |
| // "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ... |
| Map<String, String> sgPathMap = determineStorageGroup(originPath); |
| Set<PartialPath> ret = getMatchedDevices(sgPathMap); |
| logger.debug("The devices of path {} are {}", originPath, ret); |
| return ret; |
| } |
| |
| /** |
| * Split the paths by the data group they belong to and query them from the groups separately. |
| * |
| * @param sgPathMap the key is the storage group name and the value is the path to be queried with |
| * storage group added |
| * @return a collection of all queried paths |
| */ |
| private List<PartialPath> getMatchedPaths(Map<String, String> sgPathMap, boolean withAlias) |
| throws MetadataException { |
| List<PartialPath> result = new ArrayList<>(); |
| // split the paths by the data group they belong to |
| Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>(); |
| for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) { |
| String storageGroupName = sgPathEntry.getKey(); |
| PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue()); |
| // find the data group that should hold the timeseries schemas of the storage group |
| PartitionGroup partitionGroup = |
| metaGroupMember.getPartitionTable().route(storageGroupName, 0); |
| if (partitionGroup.contains(metaGroupMember.getThisNode())) { |
| // this node is a member of the group, perform a local query after synchronizing with the |
| // leader |
| try { |
| metaGroupMember |
| .getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()) |
| .syncLeader(null); |
| } catch (CheckConsistencyException e) { |
| logger.warn("Failed to check consistency.", e); |
| } |
| List<PartialPath> allTimeseriesName = getMatchedPathsLocally(pathUnderSG, withAlias); |
| logger.debug( |
| "{}: get matched paths of {} locally, result {}", |
| metaGroupMember.getName(), |
| partitionGroup, |
| allTimeseriesName); |
| result.addAll(allTimeseriesName); |
| } else { |
| // batch the queries of the same group to reduce communication |
| groupPathMap |
| .computeIfAbsent(partitionGroup, p -> new ArrayList<>()) |
| .add(pathUnderSG.getFullPath()); |
| } |
| } |
| |
| // query each data group separately |
| for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) { |
| PartitionGroup partitionGroup = partitionGroupPathEntry.getKey(); |
| List<String> pathsToQuery = partitionGroupPathEntry.getValue(); |
| result.addAll(getMatchedPaths(partitionGroup, pathsToQuery, withAlias)); |
| } |
| |
| return result; |
| } |
| |
| private List<PartialPath> getMatchedPathsLocally(PartialPath partialPath, boolean withAlias) |
| throws MetadataException { |
| if (!withAlias) { |
| return getAllTimeseriesPath(partialPath); |
| } else { |
| return super.getAllTimeseriesPathWithAlias(partialPath, -1, -1).left; |
| } |
| } |
| |
| private List<PartialPath> getMatchedPaths( |
| PartitionGroup partitionGroup, List<String> pathsToQuery, boolean withAlias) |
| throws MetadataException { |
| // choose the node with lowest latency or highest throughput |
| List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup); |
| for (Node node : coordinatedNodes) { |
| try { |
| List<PartialPath> paths = |
| getMatchedPaths(node, partitionGroup.getHeader(), pathsToQuery, withAlias); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: get matched paths of {} and other {} paths from {} in {}, result {}", |
| metaGroupMember.getName(), |
| pathsToQuery.get(0), |
| pathsToQuery.size() - 1, |
| node, |
| partitionGroup.getHeader(), |
| paths); |
| } |
| if (paths != null) { |
| // a non-null result contains correct result even if it is empty, so query next group |
| return paths; |
| } |
| } catch (IOException | TException e) { |
| throw new MetadataException(e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new MetadataException(e); |
| } |
| } |
| logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup); |
| return Collections.emptyList(); |
| } |
| |
| @SuppressWarnings("java:S1168") // null and empty list are different |
| private List<PartialPath> getMatchedPaths( |
| Node node, RaftNode header, List<String> pathsToQuery, boolean withAlias) |
| throws IOException, TException, InterruptedException { |
| GetAllPathsResult result; |
| if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { |
| AsyncDataClient client = |
| metaGroupMember |
| .getClientProvider() |
| .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); |
| result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, withAlias); |
| } else { |
| try (SyncDataClient syncDataClient = |
| metaGroupMember |
| .getClientProvider() |
| .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) { |
| try { |
| result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias); |
| } catch (TException e) { |
| // the connection may be broken, close it to avoid it being reused |
| syncDataClient.getInputProtocol().getTransport().close(); |
| throw e; |
| } |
| } |
| } |
| |
| if (result != null) { |
| // paths may be empty, implying that the group does not contain matched paths, so we do not |
| // need to query other nodes in the group |
| List<PartialPath> partialPaths = new ArrayList<>(); |
| for (int i = 0; i < result.paths.size(); i++) { |
| try { |
| PartialPath partialPath = new PartialPath(result.paths.get(i)); |
| if (withAlias) { |
| partialPath.setMeasurementAlias(result.aliasList.get(i)); |
| } |
| partialPaths.add(partialPath); |
| } catch (IllegalPathException e) { |
| // ignore |
| } |
| } |
| return partialPaths; |
| } else { |
| // a null implies a network failure, so we have to query other nodes in the group |
| return null; |
| } |
| } |
| |
| /** |
| * Split the paths by the data group they belong to and query them from the groups separately. |
| * |
| * @param sgPathMap the key is the storage group name and the value is the path to be queried with |
| * storage group added |
| * @return a collection of all queried devices |
| */ |
| private Set<PartialPath> getMatchedDevices(Map<String, String> sgPathMap) |
| throws MetadataException { |
| Set<PartialPath> result = new HashSet<>(); |
| // split the paths by the data group they belong to |
| Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>(); |
| for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) { |
| String storageGroupName = sgPathEntry.getKey(); |
| PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue()); |
| // find the data group that should hold the timeseries schemas of the storage group |
| PartitionGroup partitionGroup = |
| metaGroupMember.getPartitionTable().route(storageGroupName, 0); |
| if (partitionGroup.contains(metaGroupMember.getThisNode())) { |
| // this node is a member of the group, perform a local query after synchronizing with the |
| // leader |
| try { |
| metaGroupMember |
| .getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()) |
| .syncLeader(null); |
| } catch (CheckConsistencyException e) { |
| logger.warn("Failed to check consistency.", e); |
| } |
| Set<PartialPath> allDevices = getDevices(pathUnderSG); |
| logger.debug( |
| "{}: get matched paths of {} locally, result {}", |
| metaGroupMember.getName(), |
| partitionGroup, |
| allDevices); |
| result.addAll(allDevices); |
| } else { |
| // batch the queries of the same group to reduce communication |
| groupPathMap |
| .computeIfAbsent(partitionGroup, p -> new ArrayList<>()) |
| .add(pathUnderSG.getFullPath()); |
| } |
| } |
| |
| // query each data group separately |
| for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) { |
| PartitionGroup partitionGroup = partitionGroupPathEntry.getKey(); |
| List<String> pathsToQuery = partitionGroupPathEntry.getValue(); |
| |
| result.addAll(getMatchedDevices(partitionGroup, pathsToQuery)); |
| } |
| |
| return result; |
| } |
| |
| private Set<PartialPath> getMatchedDevices( |
| PartitionGroup partitionGroup, List<String> pathsToQuery) throws MetadataException { |
| // choose the node with lowest latency or highest throughput |
| List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup); |
| for (Node node : coordinatedNodes) { |
| try { |
| Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery); |
| logger.debug( |
| "{}: get matched paths of {} from {}, result {} for {}", |
| metaGroupMember.getName(), |
| partitionGroup, |
| node, |
| paths, |
| pathsToQuery); |
| if (paths != null) { |
| // query next group |
| Set<PartialPath> partialPaths = new HashSet<>(); |
| for (String path : paths) { |
| partialPaths.add(new PartialPath(path)); |
| } |
| return partialPaths; |
| } |
| } catch (IOException | TException e) { |
| throw new MetadataException(e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new MetadataException(e); |
| } |
| } |
| logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup); |
| return Collections.emptySet(); |
| } |
| |
| private Set<String> getMatchedDevices(Node node, RaftNode header, List<String> pathsToQuery) |
| throws IOException, TException, InterruptedException { |
| Set<String> paths; |
| if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { |
| AsyncDataClient client = |
| metaGroupMember |
| .getClientProvider() |
| .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); |
| paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery); |
| } else { |
| try (SyncDataClient syncDataClient = |
| metaGroupMember |
| .getClientProvider() |
| .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) { |
| try { |
| paths = syncDataClient.getAllDevices(header, pathsToQuery); |
| } catch (TException e) { |
| // the connection may be broken, close it to avoid it being reused |
| syncDataClient.getInputProtocol().getTransport().close(); |
| throw e; |
| } |
| } |
| } |
| return paths; |
| } |
| |
| /** Similar to method getAllTimeseriesPath(), but return Path with alias alias. */ |
| @Override |
| public Pair<List<PartialPath>, Integer> getAllTimeseriesPathWithAlias( |
| PartialPath prefixPath, int limit, int offset) throws MetadataException { |
| |
| // get all storage groups this path may belong to |
| // the key is the storage group name and the value is the path to be queried with storage group |
| // added, e.g: |
| // "root.*" will be translated into: |
| // "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ... |
| Map<String, String> sgPathMap = determineStorageGroup(prefixPath); |
| List<PartialPath> result = getMatchedPaths(sgPathMap, true); |
| |
| int skippedOffset = 0; |
| // apply offset and limit |
| if (offset > 0 && result.size() > offset) { |
| skippedOffset = offset; |
| result = result.subList(offset, result.size()); |
| } else if (offset > 0) { |
| skippedOffset = result.size(); |
| result = Collections.emptyList(); |
| } |
| if (limit > 0 && result.size() > limit) { |
| result = result.subList(0, limit); |
| } |
| logger.debug("The paths of path {} are {}", prefixPath, result); |
| |
| return new Pair<>(result, skippedOffset); |
| } |
| |
| /** |
| * Get all paths after removing wildcards in the path |
| * |
| * @param originPath a path potentially with wildcard |
| * @return all paths after removing wildcards in the path |
| */ |
| public List<PartialPath> getMatchedPaths(PartialPath originPath) throws MetadataException { |
| // get all storage groups this path may belong to |
| // the key is the storage group name and the value is the path to be queried with storage group |
| // added, e.g: |
| // "root.*" will be translated into: |
| // "root.group1" -> "root.group1.*", "root.group2" -> "root.group2.*" ... |
| Map<String, String> sgPathMap = determineStorageGroup(originPath); |
| List<PartialPath> ret = getMatchedPaths(sgPathMap, false); |
| logger.debug("The paths of path {} are {}", originPath, ret); |
| return ret; |
| } |
| |
| /** |
| * Get all paths after removing wildcards in the path |
| * |
| * @param originalPaths a list of paths, potentially with wildcard |
| * @return a pair of path lists, the first are the existing full paths, the second are invalid |
| * original paths |
| */ |
| public Pair<List<PartialPath>, List<PartialPath>> getMatchedPaths( |
| List<PartialPath> originalPaths) { |
| ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>(); |
| ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>(); |
| ExecutorService getAllPathsService = |
| Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size()); |
| for (PartialPath pathStr : originalPaths) { |
| getAllPathsService.submit( |
| () -> { |
| try { |
| List<PartialPath> fullPathStrs = getMatchedPaths(pathStr); |
| if (fullPathStrs.isEmpty()) { |
| nonExistPaths.add(pathStr); |
| logger.debug("Path {} is not found.", pathStr); |
| } else { |
| fullPaths.addAll(fullPathStrs); |
| } |
| } catch (MetadataException e) { |
| logger.error("Failed to get full paths of the prefix path: {} because", pathStr, e); |
| } |
| }); |
| } |
| getAllPathsService.shutdown(); |
| try { |
| getAllPathsService.awaitTermination( |
| RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| logger.error("Unexpected interruption when waiting for get all paths services to stop", e); |
| } |
| return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths)); |
| } |
| |
| /** |
| * Get the local paths that match any path in "paths". The result is not deduplicated. |
| * |
| * @param paths paths potentially contain wildcards |
| */ |
| public List<String> getAllPaths(List<String> paths) throws MetadataException { |
| List<String> ret = new ArrayList<>(); |
| for (String path : paths) { |
| getAllTimeseriesPath(new PartialPath(path)).stream() |
| .map(PartialPath::getFullPath) |
| .forEach(ret::add); |
| } |
| return ret; |
| } |
| |
| /** |
| * Get the local devices that match any path in "paths". The result is deduplicated. |
| * |
| * @param paths paths potentially contain wildcards |
| */ |
| public Set<String> getAllDevices(List<String> paths) throws MetadataException { |
| Set<String> results = new HashSet<>(); |
| for (String path : paths) { |
| getDevices(new PartialPath(path)).stream() |
| .map(PartialPath::getFullPath) |
| .forEach(results::add); |
| } |
| return results; |
| } |
| |
| /** |
| * Get the nodes of a prefix "path" at "nodeLevel". The method currently requires strong |
| * consistency. |
| * |
| * @param path |
| * @param nodeLevel |
| */ |
| public List<String> getNodeList(String path, int nodeLevel) throws MetadataException { |
| return getNodesList(new PartialPath(path), nodeLevel).stream() |
| .map(PartialPath::getFullPath) |
| .collect(Collectors.toList()); |
| } |
| |
| public Set<String> getChildNodeInNextLevel(String path) throws MetadataException { |
| return getChildNodeInNextLevel(new PartialPath(path)); |
| } |
| |
| public Set<String> getChildNodePathInNextLevel(String path) throws MetadataException { |
| return getChildNodePathInNextLevel(new PartialPath(path)); |
| } |
| |
| /** |
| * Replace partial paths (paths not containing measurements), and abstract paths (paths containing |
| * wildcards) with full paths. |
| */ |
| public void convertToFullPaths(PhysicalPlan plan) |
| throws PathNotExistException, CheckConsistencyException { |
| // make sure this node knows all storage groups |
| metaGroupMember.syncLeaderWithConsistencyCheck(false); |
| |
| Pair<List<PartialPath>, List<PartialPath>> getMatchedPathsRet = |
| getMatchedPaths(plan.getPaths()); |
| List<PartialPath> fullPaths = getMatchedPathsRet.left; |
| List<PartialPath> nonExistPath = getMatchedPathsRet.right; |
| plan.setPaths(fullPaths); |
| if (!nonExistPath.isEmpty()) { |
| throw new PathNotExistException( |
| nonExistPath.stream().map(PartialPath::getFullPath).collect(Collectors.toList())); |
| } |
| } |
| |
| @Override |
| public IMNode getMNode(IMNode deviceMNode, String measurementName) { |
| IMNode child = deviceMNode.getChild(measurementName); |
| if (child == null) { |
| child = mRemoteMetaCache.get(deviceMNode.getPartialPath().concatNode(measurementName)); |
| } |
| return child; |
| } |
| |
| public List<ShowTimeSeriesResult> showLocalTimeseries( |
| ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException { |
| return super.showTimeseries(plan, context); |
| } |
| |
| public List<ShowDevicesResult> getLocalDevices(ShowDevicesPlan plan) throws MetadataException { |
| return super.getDevices(plan); |
| } |
| |
| @Override |
| public List<ShowDevicesResult> getDevices(ShowDevicesPlan plan) throws MetadataException { |
| ConcurrentSkipListSet<ShowDevicesResult> resultSet = new ConcurrentSkipListSet<>(); |
| ExecutorService pool = |
| new ThreadPoolExecutor( |
| THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); |
| List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups(); |
| |
| int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit(); |
| int offset = plan.getOffset(); |
| // do not use limit and offset in sub-queries unless offset is 0, otherwise the results are |
| // not combinable |
| if (offset != 0) { |
| if (limit > Integer.MAX_VALUE - offset) { |
| plan.setLimit(0); |
| } else { |
| plan.setLimit(limit + offset); |
| } |
| plan.setOffset(0); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Fetch devices schemas of {} from {} groups", plan.getPath(), globalGroups.size()); |
| } |
| |
| List<Future<Void>> futureList = new ArrayList<>(); |
| for (PartitionGroup group : globalGroups) { |
| futureList.add( |
| pool.submit( |
| () -> { |
| try { |
| getDevices(group, plan, resultSet); |
| } catch (CheckConsistencyException e) { |
| logger.error("Cannot get show devices result of {} from {}", plan, group); |
| } |
| return null; |
| })); |
| } |
| |
| waitForThreadPool(futureList, pool, "getDevices()"); |
| List<ShowDevicesResult> showDevicesResults = |
| applyShowDevicesLimitOffset(resultSet, limit, offset); |
| logger.debug("show devices {} has {} results", plan.getPath(), showDevicesResults.size()); |
| return showDevicesResults; |
| } |
| |
| @Override |
| public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context) |
| throws MetadataException { |
| ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet = new ConcurrentSkipListSet<>(); |
| ExecutorService pool = |
| new ThreadPoolExecutor( |
| THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); |
| |
| List<PartitionGroup> globalGroups = new ArrayList<>(); |
| try { |
| PartitionGroup partitionGroup = |
| metaGroupMember.getPartitionTable().partitionByPathTime(plan.getPath(), 0); |
| globalGroups.add(partitionGroup); |
| } catch (MetadataException e) { |
| // if the path location is not find, obtain the path location from all groups. |
| globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups(); |
| } |
| |
| int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit(); |
| int offset = plan.getOffset(); |
| // do not use limit and offset in sub-queries unless offset is 0, otherwise the results are |
| // not combinable |
| if (offset != 0) { |
| if (limit > Integer.MAX_VALUE - offset) { |
| plan.setLimit(0); |
| } else { |
| plan.setLimit(limit + offset); |
| } |
| plan.setOffset(0); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Fetch timeseries schemas of {} from {} groups", plan.getPath(), globalGroups.size()); |
| } |
| |
| List<Future<Void>> futureList = new ArrayList<>(); |
| for (PartitionGroup group : globalGroups) { |
| futureList.add( |
| pool.submit( |
| () -> { |
| try { |
| showTimeseries(group, plan, resultSet, context); |
| } catch (CheckConsistencyException | MetadataException e) { |
| logger.error("Cannot get show timeseries result of {} from {}", plan, group); |
| } |
| return null; |
| })); |
| } |
| |
| waitForThreadPool(futureList, pool, "showTimeseries()"); |
| List<ShowTimeSeriesResult> showTimeSeriesResults = |
| applyShowTimeseriesLimitOffset(resultSet, limit, offset); |
| logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size()); |
| return showTimeSeriesResults; |
| } |
| |
| private List<ShowTimeSeriesResult> applyShowTimeseriesLimitOffset( |
| ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet, int limit, int offset) { |
| List<ShowTimeSeriesResult> showTimeSeriesResults = new ArrayList<>(); |
| Iterator<ShowTimeSeriesResult> iterator = resultSet.iterator(); |
| while (iterator.hasNext() && limit > 0) { |
| if (offset > 0) { |
| offset--; |
| iterator.next(); |
| } else { |
| limit--; |
| showTimeSeriesResults.add(iterator.next()); |
| } |
| } |
| |
| return showTimeSeriesResults; |
| } |
| |
| private List<ShowDevicesResult> applyShowDevicesLimitOffset( |
| Set<ShowDevicesResult> resultSet, int limit, int offset) { |
| List<ShowDevicesResult> showDevicesResults = new ArrayList<>(); |
| Iterator<ShowDevicesResult> iterator = resultSet.iterator(); |
| while (iterator.hasNext() && limit > 0) { |
| if (offset > 0) { |
| offset--; |
| iterator.next(); |
| } else { |
| limit--; |
| showDevicesResults.add(iterator.next()); |
| } |
| } |
| return showDevicesResults; |
| } |
| |
| private void showTimeseries( |
| PartitionGroup group, |
| ShowTimeSeriesPlan plan, |
| Set<ShowTimeSeriesResult> resultSet, |
| QueryContext context) |
| throws CheckConsistencyException, MetadataException { |
| if (group.contains(metaGroupMember.getThisNode())) { |
| showLocalTimeseries(group, plan, resultSet, context); |
| } else { |
| showRemoteTimeseries(group, plan, resultSet); |
| } |
| } |
| |
| private void getDevices( |
| PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet) |
| throws CheckConsistencyException, MetadataException { |
| if (group.contains(metaGroupMember.getThisNode())) { |
| getLocalDevices(group, plan, resultSet); |
| } else { |
| getRemoteDevices(group, plan, resultSet); |
| } |
| } |
| |
| private void getLocalDevices( |
| PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet) |
| throws CheckConsistencyException, MetadataException { |
| DataGroupMember localDataMember = |
| metaGroupMember.getLocalDataMember(group.getHeader(), group.getId()); |
| localDataMember.syncLeaderWithConsistencyCheck(false); |
| try { |
| List<ShowDevicesResult> localResult = super.getDevices(plan); |
| resultSet.addAll(localResult); |
| logger.debug("Fetched {} devices of {} from {}", localResult.size(), plan.getPath(), group); |
| } catch (MetadataException e) { |
| logger.error("Cannot execute show devices plan {} from {} locally.", plan, group); |
| throw e; |
| } |
| } |
| |
| private void showLocalTimeseries( |
| PartitionGroup group, |
| ShowTimeSeriesPlan plan, |
| Set<ShowTimeSeriesResult> resultSet, |
| QueryContext context) |
| throws CheckConsistencyException, MetadataException { |
| DataGroupMember localDataMember = |
| metaGroupMember.getLocalDataMember(group.getHeader(), group.getId()); |
| localDataMember.syncLeaderWithConsistencyCheck(false); |
| try { |
| List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context); |
| resultSet.addAll(localResult); |
| logger.debug( |
| "Fetched local timeseries {} schemas of {} from {}", |
| localResult.size(), |
| plan.getPath(), |
| group); |
| } catch (MetadataException e) { |
| logger.error("Cannot execute show timeseries plan {} from {} locally.", plan, group); |
| throw e; |
| } |
| } |
| |
| private void showRemoteTimeseries( |
| PartitionGroup group, ShowTimeSeriesPlan plan, Set<ShowTimeSeriesResult> resultSet) { |
| ByteBuffer resultBinary = null; |
| for (Node node : group) { |
| try { |
| resultBinary = showRemoteTimeseries(node, group, plan); |
| if (resultBinary != null) { |
| break; |
| } |
| } catch (IOException e) { |
| logger.error(LOG_FAIL_CONNECT, node, e); |
| } catch (TException e) { |
| logger.error("Error occurs when getting timeseries schemas in node {}.", node, e); |
| } catch (InterruptedException e) { |
| logger.error("Interrupted when getting timeseries schemas in node {}.", node, e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| if (resultBinary != null) { |
| int size = resultBinary.getInt(); |
| logger.debug( |
| "Fetched remote timeseries {} schemas of {} from {}", size, plan.getPath(), group); |
| for (int i = 0; i < size; i++) { |
| resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary)); |
| } |
| } else { |
| logger.error("Failed to execute show timeseries {} in group: {}.", plan, group); |
| } |
| } |
| |
| private void getRemoteDevices( |
| PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> resultSet) { |
| ByteBuffer resultBinary = null; |
| for (Node node : group) { |
| try { |
| resultBinary = getRemoteDevices(node, group, plan); |
| if (resultBinary != null) { |
| break; |
| } |
| } catch (IOException e) { |
| logger.error(LOG_FAIL_CONNECT, node, e); |
| } catch (TException e) { |
| logger.error("Error occurs when getting devices schemas in node {}.", node, e); |
| } catch (InterruptedException e) { |
| logger.error("Interrupted when getting devices schemas in node {}.", node, e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| if (resultBinary != null) { |
| int size = resultBinary.getInt(); |
| logger.debug("Fetched remote devices {} schemas of {} from {}", size, plan.getPath(), group); |
| for (int i = 0; i < size; i++) { |
| resultSet.add(ShowDevicesResult.deserialize(resultBinary)); |
| } |
| } else { |
| logger.error("Failed to execute show devices {} in group: {}.", plan, group); |
| } |
| } |
| |
| private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, ShowTimeSeriesPlan plan) |
| throws IOException, TException, InterruptedException { |
| ByteBuffer resultBinary; |
| |
| if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { |
| AsyncDataClient client = |
| metaGroupMember |
| .getClientProvider() |
| .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); |
| resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan); |
| } else { |
| try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); |
| DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); |
| SyncDataClient syncDataClient = |
| metaGroupMember |
| .getClientProvider() |
| .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) { |
| try { |
| plan.serialize(dataOutputStream); |
| resultBinary = |
| syncDataClient.getAllMeasurementSchema( |
| group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); |
| } catch (TException e) { |
| // the connection may be broken, close it to avoid it being reused |
| syncDataClient.getInputProtocol().getTransport().close(); |
| throw e; |
| } |
| } |
| } |
| return resultBinary; |
| } |
| |
| private ByteBuffer getRemoteDevices(Node node, PartitionGroup group, ShowDevicesPlan plan) |
| throws IOException, TException, InterruptedException { |
| ByteBuffer resultBinary; |
| if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { |
| AsyncDataClient client = |
| metaGroupMember |
| .getClientProvider() |
| .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); |
| resultBinary = SyncClientAdaptor.getDevices(client, group.getHeader(), plan); |
| } else { |
| try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); |
| DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); |
| SyncDataClient syncDataClient = |
| metaGroupMember |
| .getClientProvider() |
| .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) { |
| try { |
| plan.serialize(dataOutputStream); |
| resultBinary = |
| syncDataClient.getDevices( |
| group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); |
| } catch (TException e) { |
| // the connection may be broken, close it to avoid it being reused |
| syncDataClient.getInputProtocol().getTransport().close(); |
| throw e; |
| } |
| } |
| } |
| return resultBinary; |
| } |
| |
| public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias) |
| throws MetadataException { |
| List<String> retPaths = new ArrayList<>(); |
| List<String> alias = null; |
| if (withAlias) { |
| alias = new ArrayList<>(); |
| } |
| |
| if (withAlias) { |
| for (String path : paths) { |
| List<PartialPath> allTimeseriesPathWithAlias = |
| super.getAllTimeseriesPathWithAlias(new PartialPath(path), -1, -1).left; |
| for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) { |
| retPaths.add(timeseriesPathWithAlias.getFullPath()); |
| alias.add(timeseriesPathWithAlias.getMeasurementAlias()); |
| } |
| } |
| } else { |
| retPaths = getAllPaths(paths); |
| } |
| |
| GetAllPathsResult getAllPathsResult = new GetAllPathsResult(); |
| getAllPathsResult.setPaths(retPaths); |
| getAllPathsResult.setAliasList(alias); |
| return getAllPathsResult; |
| } |
| |
| @Override |
| public PartialPath getStorageGroupPath(PartialPath path) throws StorageGroupNotSetException { |
| try { |
| return super.getStorageGroupPath(path); |
| } catch (StorageGroupNotSetException e) { |
| try { |
| metaGroupMember.syncLeader(null); |
| } catch (CheckConsistencyException ex) { |
| logger.warn("Failed to check consistency.", e); |
| } |
| return super.getStorageGroupPath(path); |
| } |
| } |
| } |