blob: fb8c77b18192f7ec3cc0ab50a26f3e58b1ed115c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WritePlanNodeSplitTest {
long prevTimePartitionInterval;
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
dataPartitionMap;
private Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap;
SeriesPartitionExecutor partitionExecutor;
String executorClassName;
int seriesSlotPartitionNum;
@Before
public void setUp() {
prevTimePartitionInterval =
IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
IoTDBDescriptor.getInstance().getConfig().setTimePartitionIntervalForRouting(100);
TimePartitionUtils.setTimePartitionIntervalForRouting(100);
executorClassName = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass();
seriesSlotPartitionNum = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
partitionExecutor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
executorClassName, seriesSlotPartitionNum);
initDataPartitionMap();
initSchemaPartitionMap();
}
private void initDataPartitionMap() {
dataPartitionMap = new HashMap<>();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
seriesPartitionSlotMap = new HashMap<>();
// sg1 has 5 data regions
for (int i = 0; i < seriesSlotPartitionNum; i++) {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap = new HashMap<>();
for (int t = 0; t < 5; t++) {
timePartitionSlotMap.put(
new TTimePartitionSlot(
t * IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting()),
Collections.singletonList(
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, t), null)));
}
seriesPartitionSlotMap.put(new TSeriesPartitionSlot(i), timePartitionSlotMap);
}
dataPartitionMap.put("root.sg1", seriesPartitionSlotMap);
// sg2 has 1 data region
seriesPartitionSlotMap = new HashMap<>();
for (int i = 0; i < seriesSlotPartitionNum; i++) {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap = new HashMap<>();
for (int t = 0; t < 5; t++) {
timePartitionSlotMap.put(
new TTimePartitionSlot(
t * IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting()),
Collections.singletonList(
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 5), null)));
}
seriesPartitionSlotMap.put(new TSeriesPartitionSlot(i), timePartitionSlotMap);
}
dataPartitionMap.put("root.sg2", seriesPartitionSlotMap);
}
private void initSchemaPartitionMap() {
schemaPartitionMap = new HashMap<>();
Map<TSeriesPartitionSlot, TRegionReplicaSet> seriesPartitionSlotMap = new HashMap<>();
for (int i = 0; i < seriesSlotPartitionNum; i++) {
seriesPartitionSlotMap.put(
new TSeriesPartitionSlot(i),
new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.DataRegion, i % 5), null));
}
schemaPartitionMap.put("root.sg1", seriesPartitionSlotMap);
}
protected DataPartition getDataPartition(
List<DataPartitionQueryParam> dataPartitionQueryParamList) {
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParamList) {
String devicePath = dataPartitionQueryParam.getDevicePath();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
seriesPartitionSlotMap = null;
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
seriesPartitionSlotMapResult = null;
if (devicePath.startsWith("root.sg1")) {
seriesPartitionSlotMap = dataPartitionMap.get("root.sg1");
seriesPartitionSlotMapResult = result.computeIfAbsent("root.sg1", k -> new HashMap<>());
} else if (devicePath.startsWith("root.sg2")) {
seriesPartitionSlotMap = dataPartitionMap.get("root.sg2");
seriesPartitionSlotMapResult = result.computeIfAbsent("root.sg2", k -> new HashMap<>());
}
TSeriesPartitionSlot seriesPartitionSlot =
partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath());
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap =
seriesPartitionSlotMap.get(seriesPartitionSlot);
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMapResult =
seriesPartitionSlotMapResult.computeIfAbsent(seriesPartitionSlot, k -> new HashMap<>());
for (TTimePartitionSlot timePartitionSlot :
dataPartitionQueryParam.getTimePartitionSlotList()) {
timePartitionSlotMapResult.put(
timePartitionSlot, timePartitionSlotMap.get(timePartitionSlot));
}
}
return new DataPartition(result, executorClassName, seriesSlotPartitionNum);
}
@Test
public void testSplitInsertTablet() throws IllegalPathException {
InsertTabletNode insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 1"));
insertTabletNode.setDevicePath(new PartialPath("root.sg1.d1"));
insertTabletNode.setTimes(new long[] {1, 60, 120, 180, 270, 290, 360, 375, 440, 470});
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletNode.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertTabletNode.getTimePartitionSlots());
DataPartition dataPartition =
getDataPartition(Collections.singletonList(dataPartitionQueryParam));
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
List<WritePlanNode> insertTabletNodeList = insertTabletNode.splitByPartition(analysis);
Assert.assertEquals(5, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
Assert.assertEquals(((InsertTabletNode) insertNode).getTimes().length, 2);
}
insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 2"));
insertTabletNode.setDevicePath(new PartialPath("root.sg2.d1"));
insertTabletNode.setTimes(new long[] {1, 60, 120, 180, 270, 290, 360, 375, 440, 470});
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletNode.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertTabletNode.getTimePartitionSlots());
dataPartition = getDataPartition(Collections.singletonList(dataPartitionQueryParam));
analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
insertTabletNodeList = insertTabletNode.splitByPartition(analysis);
Assert.assertEquals(5, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
Assert.assertEquals(((InsertTabletNode) insertNode).getTimes().length, 2);
}
}
@Test
public void testInsertMultiTablets() throws IllegalPathException {
InsertMultiTabletsNode insertMultiTabletsNode =
new InsertMultiTabletsNode(new PlanNodeId("plan node 3"));
for (int i = 0; i < 5; i++) {
InsertTabletNode insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 3"));
insertTabletNode.setDevicePath(new PartialPath(String.format("root.sg1.d%d", i)));
insertTabletNode.setTimes(new long[] {1, 60, 120, 180, 270, 290, 360, 375, 440, 470});
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 3"));
insertTabletNode.setDevicePath(new PartialPath(String.format("root.sg2.d%d", i)));
insertTabletNode.setTimes(new long[] {1, 60, 120, 180, 270, 290, 360, 375, 440, 470});
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
}
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertTabletNode insertTabletNode : insertMultiTabletsNode.getInsertTabletNodeList()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletNode.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertTabletNode.getTimePartitionSlots());
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
DataPartition dataPartition = getDataPartition(dataPartitionQueryParams);
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
List<WritePlanNode> insertTabletNodeList = insertMultiTabletsNode.splitByPartition(analysis);
Assert.assertEquals(6, insertTabletNodeList.size());
}
@Test
public void testInsertRowsNode() throws IllegalPathException {
InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("plan node 3"));
for (int i = 0; i < 5; i++) {
InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId("plan node 3"));
insertRowNode.setDevicePath(new PartialPath(String.format("root.sg1.d%d", i)));
insertRowNode.setTime(
i * IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting());
insertRowsNode.addOneInsertRowNode(insertRowNode, 2 * i);
insertRowNode = new InsertRowNode(new PlanNodeId("plan node 3"));
insertRowNode.setDevicePath(new PartialPath(String.format("root.sg2.d%d", i)));
insertRowNode.setTime(1);
insertRowsNode.addOneInsertRowNode(insertRowNode, 2 * i + 1);
}
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowNode.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertRowNode.getTimePartitionSlots());
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
DataPartition dataPartition = getDataPartition(dataPartitionQueryParams);
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
List<WritePlanNode> insertTabletNodeList = insertRowsNode.splitByPartition(analysis);
Assert.assertEquals(6, insertTabletNodeList.size());
}
@After
public void tearDown() {
TimePartitionUtils.setTimePartitionIntervalForRouting(prevTimePartitionInterval);
IoTDBDescriptor.getInstance()
.getConfig()
.setTimePartitionIntervalForRouting(prevTimePartitionInterval);
}
}