[IOTDB-5821] Pipe: PipeCollector Stage (#9789)

* Implemention of DefaultCollector, default collector of IoTDB
* Implemention of PipeCollectorStage

---------

Co-authored-by: Steve Yurong Su <rong@apache.org>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 655de94..03ec5e4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -27,10 +27,6 @@
     this.id = id;
   }
 
-  public DataRegionId(String id) {
-    this.id = Integer.parseInt(id);
-  }
-
   @Override
   public TConsensusGroupType getType() {
     return TConsensusGroupType.DataRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index d697264..6e59cab 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -19,11 +19,15 @@
 
 package org.apache.iotdb.commons.pipe.plugin.builtin;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.collector.DefaultCollector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
 
 public enum BuiltinPipePlugin {
 
+  // collectors
+  DEFAULT_COLLECTOR("default_collector", DefaultCollector.class),
+
   // processors
   DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
 
@@ -35,8 +39,8 @@
   private final Class<?> pipePluginClass;
   private final String className;
 
-  BuiltinPipePlugin(String functionName, Class<?> pipePluginClass) {
-    this.pipePluginName = functionName;
+  BuiltinPipePlugin(String pipePluginName, Class<?> pipePluginClass) {
+    this.pipePluginName = pipePluginName;
     this.pipePluginClass = pipePluginClass;
     this.className = pipePluginClass.getName();
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
new file mode 100644
index 0000000..fed08ae
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
+
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the default collector
+ * when no collector is specified. There is a real implementation in the server module but cannot be
+ * imported here. The pipe agent in the server module will replace this class with the real
+ * implementation when initializing the collector.
+ */
+public class DefaultCollector implements PipeCollector {
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
+      throws Exception {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+
+  @Override
+  public void start() throws Exception {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+
+  @Override
+  public Event supply() throws Exception {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+
+  @Override
+  public void close() throws Exception {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index e65495a..d0b42f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -24,6 +24,8 @@
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -188,6 +190,11 @@
     }
   }
 
+  public PipeCollector reflectCollector(PipeParameters collectorParameters) {
+    return new IoTDBDataRegionCollector(); // TODO: reflect plugin, use PipeIoTDBCollector as
+    // default collector
+  }
+
   public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
     throw new UnsupportedOperationException("Not supported yet.");
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
new file mode 100644
index 0000000..b75d216
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config;
+
+public class PipeCollectorConstant {
+
+  public static final String PATTERN_PATTERN_KEY = "collector.pattern";
+  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+
+  private PipeCollectorConstant() {
+    throw new IllegalStateException("Utility class");
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
new file mode 100644
index 0000000..0fc9fdf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class IoTDBDataRegionCollector implements PipeCollector {
+
+  private final AtomicBoolean hasBeenStarted;
+
+  private final PipeRealtimeDataRegionCollector realtimeCollector;
+  // TODO: support pattern in historical collector
+  private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
+
+  public IoTDBDataRegionCollector() {
+    hasBeenStarted = new AtomicBoolean(false);
+    realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+    historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
+  }
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    // TODO: require more attributes
+    realtimeCollector.validate(validator);
+    historicalCollector.validate(validator);
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+    realtimeCollector.customize(parameters, configuration);
+    historicalCollector.customize(parameters, configuration);
+  }
+
+  @Override
+  public void start() throws Exception {
+    if (hasBeenStarted.get()) {
+      return;
+    }
+    hasBeenStarted.set(true);
+
+    realtimeCollector.start();
+    historicalCollector.start();
+  }
+
+  @Override
+  public Event supply() throws Exception {
+    return historicalCollector.hasConsumedAll()
+        ? realtimeCollector.supply()
+        : historicalCollector.supply();
+  }
+
+  @Override
+  public void close() throws Exception {
+    historicalCollector.close();
+    realtimeCollector.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index d35245f..9ba2f3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -30,48 +31,36 @@
 import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 
-import org.apache.commons.lang.NotImplementedException;
-
 import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-public class PipeHistoricalTsFileCollector implements PipeCollector {
-  private final AtomicBoolean hasBeenStarted;
-  private final String dataRegionId;
-  private Queue<PipeTsFileInsertionEvent> pendingQueue;
+public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
 
-  public PipeHistoricalTsFileCollector(String dataRegionId) {
-    this.hasBeenStarted = new AtomicBoolean(false);
-    this.dataRegionId = dataRegionId;
-  }
+  private int dataRegionId;
+
+  private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    throw new NotImplementedException("Not implement for validate.");
+    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
   }
 
   @Override
-  public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
-      throws Exception {
-    throw new NotImplementedException("Not implement for customize.");
+  public void customize(
+      PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+    dataRegionId = parameters.getInt(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
   }
 
   @Override
-  public void start() {
-    if (hasBeenStarted.get()) {
-      return;
-    }
-    hasBeenStarted.set(true);
-
-    DataRegion dataRegion =
+  public synchronized void start() {
+    final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
     dataRegion.writeLock("Pipe: collect historical TsFile");
     try {
       dataRegion.syncCloseAllWorkingTsFileProcessors();
-      TsFileManager tsFileManager = dataRegion.getTsFileManager();
 
+      final TsFileManager tsFileManager = dataRegion.getTsFileManager();
       tsFileManager.readLock();
       try {
         pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false));
@@ -100,6 +89,10 @@
     return pendingQueue.poll();
   }
 
+  public synchronized boolean hasConsumedAll() {
+    return pendingQueue != null && pendingQueue.isEmpty();
+  }
+
   @Override
   public void close() {
     if (pendingQueue != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 452831c6..0825b14 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -19,44 +19,39 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
 
-  protected final String pattern;
-  protected final String dataRegionId;
-
-  public PipeRealtimeDataRegionCollector(String pattern, String dataRegionId) {
-    this.pattern = pattern;
-    this.dataRegionId = dataRegionId;
-  }
+  protected String pattern;
+  protected String dataRegionId;
 
   @Override
-  public final void validate(PipeParameterValidator validator) throws PipeException {
-    // TODO: complete this method
+  public void validate(PipeParameterValidator validator) throws Exception {
+    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_PATTERN_KEY);
+    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
-    // TODO: complete this method
+    pattern = parameters.getString(PipeCollectorConstant.PATTERN_PATTERN_KEY);
+    dataRegionId = parameters.getString(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
   }
 
   @Override
   public void start() {
-    // TODO: if the collector is not started, start it. if the collector is started, do nothing.
     PipeInsertionDataNodeListener.getInstance().startListenAndAssign(dataRegionId, this);
   }
 
   @Override
   public void close() {
-    // TODO: if the collector is not closed, close it. if the collector is closed, do nothing.
     PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 76aeebc..a48654d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -30,18 +30,17 @@
 import java.util.concurrent.ArrayBlockingQueue;
 
 // TODO: make this collector as a builtin pipe plugin. register it in BuiltinPipePlugin.
-public class PipeRealtimeHybridDataRegionCollector extends PipeRealtimeDataRegionCollector {
+public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegionCollector {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeRealtimeHybridDataRegionCollector.class);
+      LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class);
 
   // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
   private final ArrayBlockingQueue<PipeRealtimeCollectEvent> pendingQueue;
 
-  public PipeRealtimeHybridDataRegionCollector(String pattern, String dataRegionId) {
-    super(pattern, dataRegionId);
+  public PipeRealtimeDataRegionHybridCollector() {
     this.pendingQueue =
         new ArrayBlockingQueue<>(
             PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
@@ -168,4 +167,10 @@
     // if the state is USING_TABLET, discard the event and poll the next one.
     return null;
   }
+
+  @Override
+  public void close() {
+    super.close();
+    pendingQueue.clear();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index a496376..51a344d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,44 +19,52 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.db.pipe.execution.executor.PipeAssignerSubtaskExecutor;
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public class PipeTaskCollectorStage implements PipeTaskStage {
 
-  private final PipeSubtaskExecutor executor;
-  private final PipeSubtask subtask;
+  private final PipeParameters collectorParameters;
 
-  PipeTaskCollectorStage(PipeAssignerSubtaskExecutor executor, PipeAssignerSubtask subtask) {
-    this.executor = executor;
-    this.subtask = subtask;
+  private PipeCollector pipeCollector;
+
+  PipeTaskCollectorStage(PipeParameters collectorParameters) {
+    this.collectorParameters = collectorParameters;
   }
 
   @Override
   public void create() throws PipeException {
-    executor.register(subtask);
+    this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
   }
 
   @Override
   public void start() throws PipeException {
-    executor.start(subtask.getTaskID());
+    try {
+      pipeCollector.start();
+    } catch (Exception e) {
+      throw new PipeException(e.getMessage(), e);
+    }
   }
 
   @Override
   public void stop() throws PipeException {
-    executor.stop(subtask.getTaskID());
+    // collector continuously collects data, so do nothing in stop
   }
 
   @Override
   public void drop() throws PipeException {
-    executor.deregister(subtask.getTaskID());
+    try {
+      pipeCollector.close();
+    } catch (Exception e) {
+      throw new PipeException(e.getMessage(), e);
+    }
   }
 
   @Override
   public PipeSubtask getSubtask() {
-    return subtask;
+    throw new UnsupportedOperationException("Collector stage does not have subtask.");
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 4a9abbe..c453ee6 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.db.pipe.core.collector;
 
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
@@ -32,6 +34,7 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -61,19 +64,46 @@
 
   @Test
   public void testCachedMatcher() throws ExecutionException, InterruptedException {
-    PipeRealtimeDataRegionCollector databaseCollector =
-        new PipeRealtimeFakeDataRegionCollector("root", "1");
+    PipeRealtimeDataRegionCollector databaseCollector = new PipeRealtimeDataRegionFakeCollector();
+    databaseCollector.customize(
+        new PipeParameters(
+            new HashMap<String, String>() {
+              {
+                put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root");
+                put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+              }
+            }),
+        null);
     collectorList.add(databaseCollector);
 
     int deviceCollectorNum = 10;
     int seriesCollectorNum = 10;
     for (int i = 0; i < deviceCollectorNum; i++) {
-      PipeRealtimeDataRegionCollector deviceCollector =
-          new PipeRealtimeFakeDataRegionCollector("root." + i, "1");
+      PipeRealtimeDataRegionCollector deviceCollector = new PipeRealtimeDataRegionFakeCollector();
+      int finalI1 = i;
+      deviceCollector.customize(
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI1);
+                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                }
+              }),
+          null);
       collectorList.add(deviceCollector);
       for (int j = 0; j < seriesCollectorNum; j++) {
-        PipeRealtimeDataRegionCollector seriesCollector =
-            new PipeRealtimeFakeDataRegionCollector("root." + i + "." + j, "1");
+        PipeRealtimeDataRegionCollector seriesCollector = new PipeRealtimeDataRegionFakeCollector();
+        int finalI = i;
+        int finalJ = j;
+        seriesCollector.customize(
+            new PipeParameters(
+                new HashMap<String, String>() {
+                  {
+                    put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI + "." + finalJ);
+                    put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                  }
+                }),
+            null);
         collectorList.add(seriesCollector);
       }
     }
@@ -115,11 +145,7 @@
     future.get();
   }
 
-  public static class PipeRealtimeFakeDataRegionCollector extends PipeRealtimeDataRegionCollector {
-
-    public PipeRealtimeFakeDataRegionCollector(String pattern, String dataRegionId) {
-      super(pattern, dataRegionId);
-    }
+  public static class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector {
 
     @Override
     public Event supply() {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 3d39545..f218e68 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -23,9 +23,11 @@
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
-import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeHybridDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.EventType;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -39,6 +41,7 @@
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -79,14 +82,51 @@
   public void testRealtimeCollectProcess() throws ExecutionException, InterruptedException {
     // set up realtime collector
 
-    try (PipeRealtimeHybridDataRegionCollector collector1 =
-            new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion1);
-        PipeRealtimeHybridDataRegionCollector collector2 =
-            new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion1);
-        PipeRealtimeHybridDataRegionCollector collector3 =
-            new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion2);
-        PipeRealtimeHybridDataRegionCollector collector4 =
-            new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion2)) {
+    try (PipeRealtimeDataRegionHybridCollector collector1 =
+            new PipeRealtimeDataRegionHybridCollector();
+        PipeRealtimeDataRegionHybridCollector collector2 =
+            new PipeRealtimeDataRegionHybridCollector();
+        PipeRealtimeDataRegionHybridCollector collector3 =
+            new PipeRealtimeDataRegionHybridCollector();
+        PipeRealtimeDataRegionHybridCollector collector4 =
+            new PipeRealtimeDataRegionHybridCollector()) {
+
+      collector1.customize(
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+                }
+              }),
+          null);
+      collector2.customize(
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+                }
+              }),
+          null);
+      collector3.customize(
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+                }
+              }),
+          null);
+      collector4.customize(
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+                }
+              }),
+          null);
 
       PipeRealtimeDataRegionCollector[] collectors =
           new PipeRealtimeDataRegionCollector[] {collector1, collector2, collector3, collector4};