[To rel/0.12][IOTDB-1600] Support InsertRowsOfOneDevicePlan in cluster (#3922)

diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index d74e01b..1d7fdf4 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -79,7 +79,8 @@
 * [IOTDB-1580] Error result of order by time desc when enable time partition
 * [IOTDB-1584] Doesn't support order by time desc in cluster mode
 * [IOTDB-1588] Bug fix: MAX_TIME is incorrect in cluster mode
-* [IOTDB-1594] Fix show timeseries returns incorrect tag value 
+* [IOTDB-1594] Fix show timeseries returns incorrect tag value
+* [IOTDB-1600] Fix InsertRowsOfOneDevicePlan being not supported in cluster mode
 * [IOTDB-1610] Fix TsFileRewriteTool writing incorrect data file
 * [ISSUE-3116] Bug when using natural month unit in time interval in group by query
 * [ISSUE-3316] Query result with the same time range is inconsistent in group by query
@@ -90,7 +91,7 @@
 * [ISSUE-3690] Memory leaks on the server when cpp client invokes checkTimeseriesExists
 * [ISSUE-3805] OOM caused by Chunk cache
 * [ISSUE-3865] Meaningless connection reset issues caused by low default value for SOMAXCONN
-*  Fix DataMigrationExample OOM if migrate too many timeseries 
+* Fix DataMigrationExample OOM if migrate too many timeseries 
 * Handle false positive cases which may cause NPE of tsfile bloom filter 
 * Fix Windows shell error on JDK11 & fix iotdb-env.bat not working
 * Fix cluster auto create schema bug when retry locally 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index acb0b77..e175f9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -32,6 +32,7 @@
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
@@ -124,6 +125,8 @@
       return splitAndRoutePlan((CreateTimeSeriesPlan) plan);
     } else if (plan instanceof InsertRowPlan) {
       return splitAndRoutePlan((InsertRowPlan) plan);
+    } else if (plan instanceof InsertRowsOfOneDevicePlan) {
+      return splitAndRoutePlan((InsertRowsOfOneDevicePlan) plan);
     } else if (plan instanceof AlterTimeSeriesPlan) {
       return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
     } else if (plan instanceof CreateMultiTimeSeriesPlan) {
@@ -464,4 +467,37 @@
     subPlan.setIndexes(new ArrayList<>());
     return subPlan;
   }
+
+  /**
+   * @param plan InsertRowsOfOneDevicePlan
+   * @return key is InsertRowsOfOneDevicePlan, value is the partition group the plan belongs to. All
+   *     InsertRowPlans in InsertRowsOfOneDevicePlan belong to one same storage group.
+   */
+  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowsOfOneDevicePlan plan)
+      throws MetadataException {
+    Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+    Map<PartitionGroup, List<InsertRowPlan>> groupPlanMap = new HashMap<>();
+    Map<PartitionGroup, List<Integer>> groupPlanIndexMap = new HashMap<>();
+    PartialPath storageGroup = getMManager().getStorageGroupPath(plan.getDeviceId());
+    for (int i = 0; i < plan.getRowPlans().length; i++) {
+      InsertRowPlan p = plan.getRowPlans()[i];
+      PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), p.getTime());
+      List<InsertRowPlan> groupedPlans =
+          groupPlanMap.computeIfAbsent(group, k -> new ArrayList<>());
+      List<Integer> groupedPlanIndex =
+          groupPlanIndexMap.computeIfAbsent(group, k -> new ArrayList<>());
+      groupedPlans.add(p);
+      groupedPlanIndex.add(plan.getRowPlanIndexList()[i]);
+    }
+
+    for (Entry<PartitionGroup, List<InsertRowPlan>> entry : groupPlanMap.entrySet()) {
+      PhysicalPlan reducedPlan =
+          new InsertRowsOfOneDevicePlan(
+              plan.getDeviceId(),
+              entry.getValue().toArray(new InsertRowPlan[0]),
+              groupPlanIndexMap.get(entry.getKey()).stream().mapToInt(i -> i).toArray());
+      result.put(reducedPlan, entry.getKey());
+    }
+    return result;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index a73f5d3..2ce5b7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -27,6 +27,7 @@
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
@@ -339,6 +340,9 @@
         case BATCH_INSERT_ROWS:
           plan = new InsertRowsPlan();
           break;
+        case BATCH_INSERT_ONE_DEVICE:
+          plan = new InsertRowsOfOneDevicePlan();
+          break;
         case CREATE_TEMPLATE:
           plan = new CreateTemplatePlan();
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 4838360..3f3177e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -29,7 +29,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,15 +41,35 @@
   boolean[] isExecuted;
   private InsertRowPlan[] rowPlans;
 
+  /**
+   * Suppose there is an InsertRowsOfOneDevicePlan, which contains 5 InsertRowPlans,
+   * rowPlans={InsertRowPlan_0, InsertRowPlan_1, InsertRowPlan_2, InsertRowPlan_3, InsertRowPlan_4},
+   * then the rowPlanIndexList={0, 1, 2, 3, 4} respectively. But when the InsertRowsOfOneDevicePlan
+   * is split into two InsertRowsOfOneDevicePlans according to the time partition in cluster
+   * version, suppose that the InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {InsertRowPlan_0,
+   * InsertRowPlan_3, InsertRowPlan_4}, then InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {0, 3,
+   * 4}; InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {InsertRowPlan_1, InsertRowPlan_2} then
+   * InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {1, 2} respectively;
+   */
+  private int[] rowPlanIndexList;
+
+  /** record the result of insert rows */
+  private Map<Integer, TSStatus> results = new HashMap<>();
+
+  public InsertRowsOfOneDevicePlan() {
+    super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+  }
+
   public InsertRowsOfOneDevicePlan(
       PartialPath deviceId,
       Long[] insertTimes,
       List<List<String>> measurements,
       ByteBuffer[] insertValues)
       throws QueryProcessException {
-    super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+    this();
     this.deviceId = deviceId;
     rowPlans = new InsertRowPlan[insertTimes.length];
+    rowPlanIndexList = new int[insertTimes.length];
     for (int i = 0; i < insertTimes.length; i++) {
       rowPlans[i] =
           new InsertRowPlan(
@@ -67,9 +88,22 @@
                 + ", time:"
                 + insertTimes[i]);
       }
+      rowPlanIndexList[i] = i;
     }
   }
 
+  /**
+   * This constructor is used for splitting parent InsertRowsOfOneDevicePlan into sub ones. So
+   * there's no need to validate rowPlans.
+   */
+  public InsertRowsOfOneDevicePlan(
+      PartialPath deviceId, InsertRowPlan[] rowPlans, int[] rowPlanIndexList) {
+    this();
+    this.deviceId = deviceId;
+    this.rowPlans = rowPlans;
+    this.rowPlanIndexList = rowPlanIndexList;
+  }
+
   @Override
   public void checkIntegrity() {}
 
@@ -106,6 +140,9 @@
       stream.writeLong(plan.getTime());
       plan.serializeMeasurementsAndValues(stream);
     }
+    for (Integer index : rowPlanIndexList) {
+      stream.writeInt(index);
+    }
   }
 
   @Override
@@ -119,6 +156,9 @@
       buffer.putLong(plan.getTime());
       plan.serializeMeasurementsAndValues(buffer);
     }
+    for (Integer index : rowPlanIndexList) {
+      buffer.putInt(index);
+    }
   }
 
   @Override
@@ -127,10 +167,14 @@
     this.rowPlans = new InsertRowPlan[buffer.getInt()];
     for (int i = 0; i < rowPlans.length; i++) {
       rowPlans[i] = new InsertRowPlan();
-      rowPlans[i].setDeviceId(deviceId);
+      rowPlans[i].setDeviceId(this.deviceId);
       rowPlans[i].setTime(buffer.getLong());
       rowPlans[i].deserializeMeasurementsAndValues(buffer);
     }
+    this.rowPlanIndexList = new int[rowPlans.length];
+    for (int i = 0; i < rowPlans.length; i++) {
+      rowPlanIndexList[i] = buffer.getInt();
+    }
   }
 
   @Override
@@ -183,7 +227,7 @@
   }
 
   public Map<Integer, TSStatus> getResults() {
-    return Collections.emptyMap();
+    return results;
   }
 
   @Override
@@ -191,11 +235,37 @@
     return rowPlans.length;
   }
 
+  public int[] getRowPlanIndexList() {
+    return rowPlanIndexList;
+  }
+
   @Override
   public void unsetIsExecuted(int i) {
     if (isExecuted == null) {
       isExecuted = new boolean[getBatchSize()];
     }
     isExecuted[i] = false;
+    if (rowPlanIndexList != null && rowPlanIndexList.length > 0) {
+      results.remove(rowPlanIndexList[i]);
+    } else {
+      results.remove(i);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof InsertRowsOfOneDevicePlan
+        && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlanIndexList, this.rowPlanIndexList)
+        && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlans, this.rowPlans)
+        && ((InsertRowsOfOneDevicePlan) o).results.equals(this.results)
+        && ((InsertRowsOfOneDevicePlan) o).getDeviceId().equals(this.getDeviceId());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = rowPlans != null ? Arrays.hashCode(rowPlans) : 0;
+    result = 31 * result + (rowPlanIndexList != null ? Arrays.hashCode(rowPlanIndexList) : 0);
+    result = 31 * result + (results != null ? results.hashCode() : 0);
+    return result;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
new file mode 100644
index 0000000..6abb0b7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class InsertRowsOfOneDevicePlanTest {
+
+  @Test
+  public void testSerializable() throws IllegalPathException, IOException {
+
+    PartialPath device = new PartialPath("root.sg.d");
+    InsertRowPlan[] rowPlans =
+        new InsertRowPlan[] {
+          new InsertRowPlan(
+              device,
+              1000L,
+              new String[] {"s1", "s2", "s3"},
+              new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+              new String[] {"1.0", "2", "300"}),
+          new InsertRowPlan(
+              device,
+              2000L,
+              new String[] {"s1", "s4"},
+              new TSDataType[] {TSDataType.DOUBLE, TSDataType.TEXT},
+              new String[] {"2.0", "abc"}),
+        };
+
+    InsertRowsOfOneDevicePlan p = new InsertRowsOfOneDevicePlan(device, rowPlans, new int[] {0, 1});
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream w = new DataOutputStream(baos);
+    p.serialize(w);
+    w.flush();
+    byte[] res = baos.toByteArray();
+    ByteBuffer buf = ByteBuffer.wrap(res);
+    InsertRowsOfOneDevicePlan p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+    Assert.assertEquals(p, p2);
+    res = new byte[1024];
+    p.serialize(ByteBuffer.wrap(res));
+    buf = ByteBuffer.wrap(res);
+    p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+    Assert.assertEquals(p, p2);
+  }
+}