[To rel/0.12][IOTDB-1600] Support InsertRowsOfOneDevicePlan in cluster (#3922)
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index d74e01b..1d7fdf4 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -79,7 +79,8 @@
* [IOTDB-1580] Error result of order by time desc when enable time partition
* [IOTDB-1584] Doesn't support order by time desc in cluster mode
* [IOTDB-1588] Bug fix: MAX_TIME is incorrect in cluster mode
-* [IOTDB-1594] Fix show timeseries returns incorrect tag value
+* [IOTDB-1594] Fix show timeseries returns incorrect tag value
+* [IOTDB-1600] Fix InsertRowsOfOneDevicePlan being not supported in cluster mode
* [IOTDB-1610] Fix TsFileRewriteTool writing incorrect data file
* [ISSUE-3116] Bug when using natural month unit in time interval in group by query
* [ISSUE-3316] Query result with the same time range is inconsistent in group by query
@@ -90,7 +91,7 @@
* [ISSUE-3690] Memory leaks on the server when cpp client invokes checkTimeseriesExists
* [ISSUE-3805] OOM caused by Chunk cache
* [ISSUE-3865] Meaningless connection reset issues caused by low default value for SOMAXCONN
-* Fix DataMigrationExample OOM if migrate too many timeseries
+* Fix DataMigrationExample OOM if migrate too many timeseries
* Handle false positive cases which may cause NPE of tsfile bloom filter
* Fix Windows shell error on JDK11 & fix iotdb-env.bat not working
* Fix cluster auto create schema bug when retry locally
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index acb0b77..e175f9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -32,6 +32,7 @@
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
@@ -124,6 +125,8 @@
return splitAndRoutePlan((CreateTimeSeriesPlan) plan);
} else if (plan instanceof InsertRowPlan) {
return splitAndRoutePlan((InsertRowPlan) plan);
+ } else if (plan instanceof InsertRowsOfOneDevicePlan) {
+ return splitAndRoutePlan((InsertRowsOfOneDevicePlan) plan);
} else if (plan instanceof AlterTimeSeriesPlan) {
return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
} else if (plan instanceof CreateMultiTimeSeriesPlan) {
@@ -464,4 +467,37 @@
subPlan.setIndexes(new ArrayList<>());
return subPlan;
}
+
+ /**
+ * @param plan InsertRowsOfOneDevicePlan
+ * @return key is InsertRowsOfOneDevicePlan, value is the partition group the plan belongs to. All
+ * InsertRowPlans in InsertRowsOfOneDevicePlan belong to one same storage group.
+ */
+ private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowsOfOneDevicePlan plan)
+ throws MetadataException {
+ Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+ Map<PartitionGroup, List<InsertRowPlan>> groupPlanMap = new HashMap<>();
+ Map<PartitionGroup, List<Integer>> groupPlanIndexMap = new HashMap<>();
+ PartialPath storageGroup = getMManager().getStorageGroupPath(plan.getDeviceId());
+ for (int i = 0; i < plan.getRowPlans().length; i++) {
+ InsertRowPlan p = plan.getRowPlans()[i];
+ PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), p.getTime());
+ List<InsertRowPlan> groupedPlans =
+ groupPlanMap.computeIfAbsent(group, k -> new ArrayList<>());
+ List<Integer> groupedPlanIndex =
+ groupPlanIndexMap.computeIfAbsent(group, k -> new ArrayList<>());
+ groupedPlans.add(p);
+ groupedPlanIndex.add(plan.getRowPlanIndexList()[i]);
+ }
+
+ for (Entry<PartitionGroup, List<InsertRowPlan>> entry : groupPlanMap.entrySet()) {
+ PhysicalPlan reducedPlan =
+ new InsertRowsOfOneDevicePlan(
+ plan.getDeviceId(),
+ entry.getValue().toArray(new InsertRowPlan[0]),
+ groupPlanIndexMap.get(entry.getKey()).stream().mapToInt(i -> i).toArray());
+ result.put(reducedPlan, entry.getKey());
+ }
+ return result;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index a73f5d3..2ce5b7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -27,6 +27,7 @@
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
@@ -339,6 +340,9 @@
case BATCH_INSERT_ROWS:
plan = new InsertRowsPlan();
break;
+ case BATCH_INSERT_ONE_DEVICE:
+ plan = new InsertRowsOfOneDevicePlan();
+ break;
case CREATE_TEMPLATE:
plan = new CreateTemplatePlan();
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 4838360..3f3177e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -29,7 +29,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,15 +41,35 @@
boolean[] isExecuted;
private InsertRowPlan[] rowPlans;
+ /**
+ * Suppose there is an InsertRowsOfOneDevicePlan, which contains 5 InsertRowPlans,
+ * rowPlans={InsertRowPlan_0, InsertRowPlan_1, InsertRowPlan_2, InsertRowPlan_3, InsertRowPlan_4},
+ * then the rowPlanIndexList={0, 1, 2, 3, 4} respectively. But when the InsertRowsOfOneDevicePlan
+ * is split into two InsertRowsOfOneDevicePlans according to the time partition in cluster
+ * version, suppose that the InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {InsertRowPlan_0,
+ * InsertRowPlan_3, InsertRowPlan_4}, then InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {0, 3,
+ * 4}; InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {InsertRowPlan_1, InsertRowPlan_2} then
+ * InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {1, 2} respectively;
+ */
+ private int[] rowPlanIndexList;
+
+ /** record the result of insert rows */
+ private Map<Integer, TSStatus> results = new HashMap<>();
+
+ public InsertRowsOfOneDevicePlan() {
+ super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+ }
+
public InsertRowsOfOneDevicePlan(
PartialPath deviceId,
Long[] insertTimes,
List<List<String>> measurements,
ByteBuffer[] insertValues)
throws QueryProcessException {
- super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+ this();
this.deviceId = deviceId;
rowPlans = new InsertRowPlan[insertTimes.length];
+ rowPlanIndexList = new int[insertTimes.length];
for (int i = 0; i < insertTimes.length; i++) {
rowPlans[i] =
new InsertRowPlan(
@@ -67,9 +88,22 @@
+ ", time:"
+ insertTimes[i]);
}
+ rowPlanIndexList[i] = i;
}
}
+ /**
+ * This constructor is used for splitting parent InsertRowsOfOneDevicePlan into sub ones. So
+ * there's no need to validate rowPlans.
+ */
+ public InsertRowsOfOneDevicePlan(
+ PartialPath deviceId, InsertRowPlan[] rowPlans, int[] rowPlanIndexList) {
+ this();
+ this.deviceId = deviceId;
+ this.rowPlans = rowPlans;
+ this.rowPlanIndexList = rowPlanIndexList;
+ }
+
@Override
public void checkIntegrity() {}
@@ -106,6 +140,9 @@
stream.writeLong(plan.getTime());
plan.serializeMeasurementsAndValues(stream);
}
+ for (Integer index : rowPlanIndexList) {
+ stream.writeInt(index);
+ }
}
@Override
@@ -119,6 +156,9 @@
buffer.putLong(plan.getTime());
plan.serializeMeasurementsAndValues(buffer);
}
+ for (Integer index : rowPlanIndexList) {
+ buffer.putInt(index);
+ }
}
@Override
@@ -127,10 +167,14 @@
this.rowPlans = new InsertRowPlan[buffer.getInt()];
for (int i = 0; i < rowPlans.length; i++) {
rowPlans[i] = new InsertRowPlan();
- rowPlans[i].setDeviceId(deviceId);
+ rowPlans[i].setDeviceId(this.deviceId);
rowPlans[i].setTime(buffer.getLong());
rowPlans[i].deserializeMeasurementsAndValues(buffer);
}
+ this.rowPlanIndexList = new int[rowPlans.length];
+ for (int i = 0; i < rowPlans.length; i++) {
+ rowPlanIndexList[i] = buffer.getInt();
+ }
}
@Override
@@ -183,7 +227,7 @@
}
public Map<Integer, TSStatus> getResults() {
- return Collections.emptyMap();
+ return results;
}
@Override
@@ -191,11 +235,37 @@
return rowPlans.length;
}
+ public int[] getRowPlanIndexList() {
+ return rowPlanIndexList;
+ }
+
@Override
public void unsetIsExecuted(int i) {
if (isExecuted == null) {
isExecuted = new boolean[getBatchSize()];
}
isExecuted[i] = false;
+ if (rowPlanIndexList != null && rowPlanIndexList.length > 0) {
+ results.remove(rowPlanIndexList[i]);
+ } else {
+ results.remove(i);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof InsertRowsOfOneDevicePlan
+ && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlanIndexList, this.rowPlanIndexList)
+ && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlans, this.rowPlans)
+ && ((InsertRowsOfOneDevicePlan) o).results.equals(this.results)
+ && ((InsertRowsOfOneDevicePlan) o).getDeviceId().equals(this.getDeviceId());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = rowPlans != null ? Arrays.hashCode(rowPlans) : 0;
+ result = 31 * result + (rowPlanIndexList != null ? Arrays.hashCode(rowPlanIndexList) : 0);
+ result = 31 * result + (results != null ? results.hashCode() : 0);
+ return result;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
new file mode 100644
index 0000000..6abb0b7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class InsertRowsOfOneDevicePlanTest {
+
+ @Test
+ public void testSerializable() throws IllegalPathException, IOException {
+
+ PartialPath device = new PartialPath("root.sg.d");
+ InsertRowPlan[] rowPlans =
+ new InsertRowPlan[] {
+ new InsertRowPlan(
+ device,
+ 1000L,
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+ new String[] {"1.0", "2", "300"}),
+ new InsertRowPlan(
+ device,
+ 2000L,
+ new String[] {"s1", "s4"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.TEXT},
+ new String[] {"2.0", "abc"}),
+ };
+
+ InsertRowsOfOneDevicePlan p = new InsertRowsOfOneDevicePlan(device, rowPlans, new int[] {0, 1});
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream w = new DataOutputStream(baos);
+ p.serialize(w);
+ w.flush();
+ byte[] res = baos.toByteArray();
+ ByteBuffer buf = ByteBuffer.wrap(res);
+ InsertRowsOfOneDevicePlan p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+ Assert.assertEquals(p, p2);
+ res = new byte[1024];
+ p.serialize(ByteBuffer.wrap(res));
+ buf = ByteBuffer.wrap(res);
+ p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+ Assert.assertEquals(p, p2);
+ }
+}