| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.metadata.cache; |
| |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; |
| import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; |
| import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; |
| import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.SchemaCacheEntry; |
| import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; |
| import org.apache.iotdb.db.schemaengine.template.Template; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_PATTERN; |
| |
| public class DataNodeSchemaCacheTest { |
| DataNodeSchemaCache dataNodeSchemaCache; |
| private Map<String, String> s1TagMap; |
| |
| @Before |
| public void setUp() throws Exception { |
| dataNodeSchemaCache = DataNodeSchemaCache.getInstance(); |
| s1TagMap = new HashMap<>(); |
| s1TagMap.put("k1", "v1"); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| dataNodeSchemaCache.cleanUp(); |
| ClusterTemplateManager.getInstance().clear(); |
| } |
| |
| @Test |
| public void testGetSchemaEntity() throws IllegalPathException { |
| PartialPath device1 = new PartialPath("root.sg1.d1"); |
| String[] measurements = new String[3]; |
| measurements[0] = "s1"; |
| measurements[1] = "s2"; |
| measurements[2] = "s3"; |
| |
| dataNodeSchemaCache.put((ClusterSchemaTree) generateSchemaTree1()); |
| |
| Map<PartialPath, SchemaCacheEntry> schemaCacheEntryMap = |
| dataNodeSchemaCache.get(device1, measurements).getAllDevices().stream() |
| .flatMap(deviceSchemaInfo -> deviceSchemaInfo.getMeasurementSchemaPathList().stream()) |
| .collect( |
| Collectors.toMap( |
| o -> new PartialPath(o.getNodes()), |
| o -> |
| new SchemaCacheEntry( |
| "root.sg1", |
| o.getMeasurementSchema(), |
| o.getTagMap(), |
| o.isUnderAlignedEntity()))); |
| Assert.assertEquals( |
| TSDataType.INT32, |
| schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s1")).getTsDataType()); |
| Assert.assertEquals( |
| s1TagMap, schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s1")).getTagMap()); |
| Assert.assertEquals( |
| TSDataType.FLOAT, |
| schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s2")).getTsDataType()); |
| Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s2")).getTagMap()); |
| Assert.assertEquals( |
| TSDataType.BOOLEAN, |
| schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType()); |
| Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTagMap()); |
| |
| String[] otherMeasurements = new String[3]; |
| otherMeasurements[0] = "s3"; |
| otherMeasurements[1] = "s4"; |
| otherMeasurements[2] = "s5"; |
| |
| dataNodeSchemaCache.put((ClusterSchemaTree) generateSchemaTree2()); |
| |
| schemaCacheEntryMap = |
| dataNodeSchemaCache.get(device1, otherMeasurements).getAllDevices().stream() |
| .flatMap(deviceSchemaInfo -> deviceSchemaInfo.getMeasurementSchemaPathList().stream()) |
| .collect( |
| Collectors.toMap( |
| o -> new PartialPath(o.getNodes()), |
| o -> |
| new SchemaCacheEntry( |
| "root.sg1", |
| o.getMeasurementSchema(), |
| o.getTagMap(), |
| o.isUnderAlignedEntity()))); |
| Assert.assertEquals( |
| TSDataType.BOOLEAN, |
| schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType()); |
| Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTagMap()); |
| Assert.assertEquals( |
| TSDataType.TEXT, |
| schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTsDataType()); |
| Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTagMap()); |
| Assert.assertEquals( |
| TSDataType.INT64, |
| schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s5")).getTsDataType()); |
| Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTagMap()); |
| } |
| |
| @Test |
| public void testLastCache() throws IllegalPathException { |
| // test no cache |
| PartialPath devicePath = new PartialPath("root.sg1.d1"); |
| PartialPath seriesPath1 = new PartialPath("root.sg1.d1.s1"); |
| PartialPath seriesPath2 = new PartialPath("root.sg1.d1.s2"); |
| PartialPath seriesPath3 = new PartialPath("root.sg1.d1.s3"); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3)); |
| // test no last cache |
| dataNodeSchemaCache.put((ClusterSchemaTree) generateSchemaTree1()); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3)); |
| // put cache |
| long timestamp = 100; |
| long timestamp2 = 101; |
| TsPrimitiveType value = TsPrimitiveType.getByType(TSDataType.INT32, 101); |
| TsPrimitiveType value2 = TsPrimitiveType.getByType(TSDataType.INT32, 100); |
| TsPrimitiveType value3 = TsPrimitiveType.getByType(TSDataType.INT32, 99); |
| |
| // put into last cache when cache not exist |
| TimeValuePair timeValuePair = new TimeValuePair(timestamp, value); |
| dataNodeSchemaCache.updateLastCache(devicePath, "s1", timeValuePair, false, 99L); |
| TimeValuePair cachedTimeValuePair = dataNodeSchemaCache.getLastCache(seriesPath1); |
| Assert.assertNotNull(cachedTimeValuePair); |
| Assert.assertEquals(timestamp, cachedTimeValuePair.getTimestamp()); |
| Assert.assertEquals(value, cachedTimeValuePair.getValue()); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3)); |
| |
| // same time but low priority |
| TimeValuePair timeValuePair2 = new TimeValuePair(timestamp, value2); |
| dataNodeSchemaCache.updateLastCache(devicePath, "s1", timeValuePair2, false, 100L); |
| TimeValuePair cachedTimeValuePair2 = dataNodeSchemaCache.getLastCache(seriesPath1); |
| Assert.assertNotNull(cachedTimeValuePair2); |
| Assert.assertEquals(timestamp, cachedTimeValuePair2.getTimestamp()); |
| Assert.assertEquals(value, cachedTimeValuePair2.getValue()); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3)); |
| |
| // same time but high priority |
| dataNodeSchemaCache.updateLastCache(devicePath, "s1", timeValuePair2, true, 100L); |
| cachedTimeValuePair2 = dataNodeSchemaCache.getLastCache(seriesPath1); |
| Assert.assertNotNull(cachedTimeValuePair2); |
| Assert.assertEquals(timestamp, cachedTimeValuePair2.getTimestamp()); |
| Assert.assertEquals(value2, cachedTimeValuePair2.getValue()); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3)); |
| |
| // put into last cache when cache already exist |
| TimeValuePair timeValuePair3 = new TimeValuePair(timestamp2, value3); |
| dataNodeSchemaCache.updateLastCache(devicePath, "s1", timeValuePair3, false, 100L); |
| TimeValuePair cachedTimeValuePair3 = dataNodeSchemaCache.getLastCache(seriesPath1); |
| Assert.assertNotNull(cachedTimeValuePair3); |
| Assert.assertEquals(timestamp2, cachedTimeValuePair3.getTimestamp()); |
| Assert.assertEquals(value3, cachedTimeValuePair3.getValue()); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2)); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3)); |
| } |
| |
| private ISchemaTree generateSchemaTree1() throws IllegalPathException { |
| ClusterSchemaTree schemaTree = new ClusterSchemaTree(); |
| Map<String, String> s1TagMap = new HashMap<>(); |
| s1TagMap.put("k1", "v1"); |
| schemaTree.appendSingleMeasurement( |
| new PartialPath("root.sg1.d1.s1"), |
| new MeasurementSchema("s1", TSDataType.INT32), |
| s1TagMap, |
| null, |
| false); |
| schemaTree.appendSingleMeasurement( |
| new PartialPath("root.sg1.d1.s2"), |
| new MeasurementSchema("s2", TSDataType.FLOAT), |
| null, |
| null, |
| false); |
| schemaTree.appendSingleMeasurement( |
| new PartialPath("root.sg1.d1.s3"), |
| new MeasurementSchema("s3", TSDataType.BOOLEAN), |
| null, |
| null, |
| false); |
| schemaTree.setDatabases(Collections.singleton("root.sg1")); |
| return schemaTree; |
| } |
| |
| private ISchemaTree generateSchemaTree2() throws IllegalPathException { |
| ClusterSchemaTree schemaTree = new ClusterSchemaTree(); |
| |
| schemaTree.appendSingleMeasurement( |
| new PartialPath("root.sg1.d1.s3"), |
| new MeasurementSchema("s3", TSDataType.BOOLEAN), |
| null, |
| null, |
| false); |
| schemaTree.appendSingleMeasurement( |
| new PartialPath("root.sg1.d1.s4"), |
| new MeasurementSchema("s4", TSDataType.TEXT), |
| null, |
| null, |
| false); |
| schemaTree.appendSingleMeasurement( |
| new PartialPath("root.sg1.d1.s5"), |
| new MeasurementSchema("s5", TSDataType.INT64), |
| null, |
| null, |
| false); |
| schemaTree.setDatabases(Collections.singleton("root.sg1")); |
| return schemaTree; |
| } |
| |
| @Test |
| public void testUpdateLastCache() throws IllegalPathException { |
| String database = "root.db"; |
| PartialPath device = new PartialPath("root.db.d"); |
| |
| String[] measurements = new String[] {"s1", "s2", "s3"}; |
| MeasurementSchema[] measurementSchemas = |
| new MeasurementSchema[] { |
| new MeasurementSchema("s1", TSDataType.INT32), |
| new MeasurementSchema("s2", TSDataType.INT32), |
| new MeasurementSchema("s3", TSDataType.INT32) |
| }; |
| |
| dataNodeSchemaCache.updateLastCache( |
| database, |
| device, |
| measurements, |
| measurementSchemas, |
| true, |
| index -> new TimeValuePair(1, new TsPrimitiveType.TsInt(1)), |
| index -> index != 1, |
| true, |
| 1L); |
| |
| Assert.assertNotNull(dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s1"))); |
| Assert.assertNull(dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s2"))); |
| Assert.assertNotNull(dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s3"))); |
| |
| dataNodeSchemaCache.updateLastCache( |
| database, |
| device, |
| measurements, |
| measurementSchemas, |
| true, |
| index -> new TimeValuePair(2, new TsPrimitiveType.TsInt(2)), |
| index -> true, |
| true, |
| 1L); |
| |
| Assert.assertEquals( |
| new TimeValuePair(2, new TsPrimitiveType.TsInt(2)), |
| dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s1"))); |
| Assert.assertEquals( |
| new TimeValuePair(2, new TsPrimitiveType.TsInt(2)), |
| dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s2"))); |
| Assert.assertEquals( |
| new TimeValuePair(2, new TsPrimitiveType.TsInt(2)), |
| dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s3"))); |
| } |
| |
| @Test |
| public void testPut() throws Exception { |
| ClusterSchemaTree clusterSchemaTree = new ClusterSchemaTree(); |
| Template template1 = |
| new Template( |
| "t1", |
| Arrays.asList("s1", "s2"), |
| Arrays.asList(TSDataType.DOUBLE, TSDataType.INT32), |
| Arrays.asList(TSEncoding.RLE, TSEncoding.RLE), |
| Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY)); |
| template1.setId(1); |
| Template template2 = |
| new Template( |
| "t2", |
| Arrays.asList("t1", "t2", "t3"), |
| Arrays.asList(TSDataType.DOUBLE, TSDataType.INT32, TSDataType.INT64), |
| Arrays.asList(TSEncoding.RLE, TSEncoding.RLE, TSEncoding.RLBE), |
| Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY, CompressionType.SNAPPY)); |
| template2.setId(2); |
| ClusterTemplateManager.getInstance().putTemplate(template1); |
| ClusterTemplateManager.getInstance().putTemplate(template2); |
| clusterSchemaTree.appendTemplateDevice(new PartialPath("root.sg1.d1"), false, 1, template1); |
| clusterSchemaTree.appendTemplateDevice(new PartialPath("root.sg1.d2"), false, 2, template2); |
| clusterSchemaTree.setDatabases(Collections.singleton("root.sg1")); |
| clusterSchemaTree.appendSingleMeasurementPath( |
| new MeasurementPath("root.sg1.d3.s1", TSDataType.FLOAT)); |
| dataNodeSchemaCache.put(clusterSchemaTree); |
| ClusterSchemaTree d1Tree = |
| dataNodeSchemaCache.getMatchedSchemaWithTemplate(new PartialPath("root.sg1.d1")); |
| ClusterSchemaTree d2Tree = |
| dataNodeSchemaCache.getMatchedSchemaWithTemplate(new PartialPath("root.sg1.d2")); |
| ClusterSchemaTree d3Tree = |
| dataNodeSchemaCache.getMatchedSchemaWithoutTemplate(new PartialPath("root.sg1.d3.s1")); |
| List<MeasurementPath> measurementPaths = d1Tree.searchMeasurementPaths(ALL_MATCH_PATTERN).left; |
| Assert.assertEquals(2, measurementPaths.size()); |
| for (MeasurementPath measurementPath : measurementPaths) { |
| Assert.assertEquals( |
| template1.getSchema(measurementPath.getMeasurement()), |
| measurementPath.getMeasurementSchema()); |
| } |
| measurementPaths = d2Tree.searchMeasurementPaths(ALL_MATCH_PATTERN).left; |
| Assert.assertEquals(3, measurementPaths.size()); |
| for (MeasurementPath measurementPath : measurementPaths) { |
| Assert.assertEquals( |
| template2.getSchema(measurementPath.getMeasurement()), |
| measurementPath.getMeasurementSchema()); |
| } |
| measurementPaths = d3Tree.searchMeasurementPaths(ALL_MATCH_PATTERN).left; |
| Assert.assertEquals(1, measurementPaths.size()); |
| Assert.assertEquals(TSDataType.FLOAT, measurementPaths.get(0).getMeasurementSchema().getType()); |
| Assert.assertEquals("root.sg1.d3.s1", measurementPaths.get(0).getFullPath()); |
| } |
| } |