[IOTDB-5821] Pipe: PipeCollector Stage (#9789)
* Implemention of DefaultCollector, default collector of IoTDB
* Implemention of PipeCollectorStage
---------
Co-authored-by: Steve Yurong Su <rong@apache.org>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 655de94..03ec5e4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -27,10 +27,6 @@
this.id = id;
}
- public DataRegionId(String id) {
- this.id = Integer.parseInt(id);
- }
-
@Override
public TConsensusGroupType getType() {
return TConsensusGroupType.DataRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index d697264..6e59cab 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -19,11 +19,15 @@
package org.apache.iotdb.commons.pipe.plugin.builtin;
+import org.apache.iotdb.commons.pipe.plugin.builtin.collector.DefaultCollector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
public enum BuiltinPipePlugin {
+ // collectors
+ DEFAULT_COLLECTOR("default_collector", DefaultCollector.class),
+
// processors
DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
@@ -35,8 +39,8 @@
private final Class<?> pipePluginClass;
private final String className;
- BuiltinPipePlugin(String functionName, Class<?> pipePluginClass) {
- this.pipePluginName = functionName;
+ BuiltinPipePlugin(String pipePluginName, Class<?> pipePluginClass) {
+ this.pipePluginName = pipePluginName;
this.pipePluginClass = pipePluginClass;
this.className = pipePluginClass.getName();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
new file mode 100644
index 0000000..fed08ae
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
+
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the default collector
+ * when no collector is specified. There is a real implementation in the server module but cannot be
+ * imported here. The pipe agent in the server module will replace this class with the real
+ * implementation when initializing the collector.
+ */
+public class DefaultCollector implements PipeCollector {
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+
+ @Override
+ public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
+ throws Exception {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+
+ @Override
+ public void start() throws Exception {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+
+ @Override
+ public Event supply() throws Exception {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+
+ @Override
+ public void close() throws Exception {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index e65495a..d0b42f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -24,6 +24,8 @@
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -188,6 +190,11 @@
}
}
+ public PipeCollector reflectCollector(PipeParameters collectorParameters) {
+ return new IoTDBDataRegionCollector(); // TODO: reflect plugin, use PipeIoTDBCollector as
+ // default collector
+ }
+
public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
new file mode 100644
index 0000000..b75d216
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config;
+
+public class PipeCollectorConstant {
+
+ public static final String PATTERN_PATTERN_KEY = "collector.pattern";
+ public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+
+ private PipeCollectorConstant() {
+ throw new IllegalStateException("Utility class");
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
new file mode 100644
index 0000000..0fc9fdf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class IoTDBDataRegionCollector implements PipeCollector {
+
+ private final AtomicBoolean hasBeenStarted;
+
+ private final PipeRealtimeDataRegionCollector realtimeCollector;
+ // TODO: support pattern in historical collector
+ private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
+
+ public IoTDBDataRegionCollector() {
+ hasBeenStarted = new AtomicBoolean(false);
+ realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+ historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
+ }
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // TODO: require more attributes
+ realtimeCollector.validate(validator);
+ historicalCollector.validate(validator);
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+ realtimeCollector.customize(parameters, configuration);
+ historicalCollector.customize(parameters, configuration);
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (hasBeenStarted.get()) {
+ return;
+ }
+ hasBeenStarted.set(true);
+
+ realtimeCollector.start();
+ historicalCollector.start();
+ }
+
+ @Override
+ public Event supply() throws Exception {
+ return historicalCollector.hasConsumedAll()
+ ? realtimeCollector.supply()
+ : historicalCollector.supply();
+ }
+
+ @Override
+ public void close() throws Exception {
+ historicalCollector.close();
+ realtimeCollector.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index d35245f..9ba2f3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -23,6 +23,7 @@
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -30,48 +31,36 @@
import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.commons.lang.NotImplementedException;
-
import java.util.ArrayDeque;
import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-public class PipeHistoricalTsFileCollector implements PipeCollector {
- private final AtomicBoolean hasBeenStarted;
- private final String dataRegionId;
- private Queue<PipeTsFileInsertionEvent> pendingQueue;
+public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
- public PipeHistoricalTsFileCollector(String dataRegionId) {
- this.hasBeenStarted = new AtomicBoolean(false);
- this.dataRegionId = dataRegionId;
- }
+ private int dataRegionId;
+
+ private Queue<PipeTsFileInsertionEvent> pendingQueue;
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- throw new NotImplementedException("Not implement for validate.");
+ validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
}
@Override
- public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
- throws Exception {
- throw new NotImplementedException("Not implement for customize.");
+ public void customize(
+ PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+ dataRegionId = parameters.getInt(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
}
@Override
- public void start() {
- if (hasBeenStarted.get()) {
- return;
- }
- hasBeenStarted.set(true);
-
- DataRegion dataRegion =
+ public synchronized void start() {
+ final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
dataRegion.writeLock("Pipe: collect historical TsFile");
try {
dataRegion.syncCloseAllWorkingTsFileProcessors();
- TsFileManager tsFileManager = dataRegion.getTsFileManager();
+ final TsFileManager tsFileManager = dataRegion.getTsFileManager();
tsFileManager.readLock();
try {
pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false));
@@ -100,6 +89,10 @@
return pendingQueue.poll();
}
+ public synchronized boolean hasConsumedAll() {
+ return pendingQueue != null && pendingQueue.isEmpty();
+ }
+
@Override
public void close() {
if (pendingQueue != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 452831c6..0825b14 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -19,44 +19,39 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeException;
public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
- protected final String pattern;
- protected final String dataRegionId;
-
- public PipeRealtimeDataRegionCollector(String pattern, String dataRegionId) {
- this.pattern = pattern;
- this.dataRegionId = dataRegionId;
- }
+ protected String pattern;
+ protected String dataRegionId;
@Override
- public final void validate(PipeParameterValidator validator) throws PipeException {
- // TODO: complete this method
+ public void validate(PipeParameterValidator validator) throws Exception {
+ validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_PATTERN_KEY);
+ validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
}
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
- // TODO: complete this method
+ pattern = parameters.getString(PipeCollectorConstant.PATTERN_PATTERN_KEY);
+ dataRegionId = parameters.getString(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
}
@Override
public void start() {
- // TODO: if the collector is not started, start it. if the collector is started, do nothing.
PipeInsertionDataNodeListener.getInstance().startListenAndAssign(dataRegionId, this);
}
@Override
public void close() {
- // TODO: if the collector is not closed, close it. if the collector is closed, do nothing.
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 76aeebc..a48654d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -30,18 +30,17 @@
import java.util.concurrent.ArrayBlockingQueue;
// TODO: make this collector as a builtin pipe plugin. register it in BuiltinPipePlugin.
-public class PipeRealtimeHybridDataRegionCollector extends PipeRealtimeDataRegionCollector {
+public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegionCollector {
private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeRealtimeHybridDataRegionCollector.class);
+ LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class);
// TODO: memory control
// This queue is used to store pending events collected by the method collect(). The method
// supply() will poll events from this queue and send them to the next pipe plugin.
private final ArrayBlockingQueue<PipeRealtimeCollectEvent> pendingQueue;
- public PipeRealtimeHybridDataRegionCollector(String pattern, String dataRegionId) {
- super(pattern, dataRegionId);
+ public PipeRealtimeDataRegionHybridCollector() {
this.pendingQueue =
new ArrayBlockingQueue<>(
PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
@@ -168,4 +167,10 @@
// if the state is USING_TABLET, discard the event and poll the next one.
return null;
}
+
+ @Override
+ public void close() {
+ super.close();
+ pendingQueue.clear();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index a496376..51a344d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,44 +19,52 @@
package org.apache.iotdb.db.pipe.task.stage;
-import org.apache.iotdb.db.pipe.execution.executor.PipeAssignerSubtaskExecutor;
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
public class PipeTaskCollectorStage implements PipeTaskStage {
- private final PipeSubtaskExecutor executor;
- private final PipeSubtask subtask;
+ private final PipeParameters collectorParameters;
- PipeTaskCollectorStage(PipeAssignerSubtaskExecutor executor, PipeAssignerSubtask subtask) {
- this.executor = executor;
- this.subtask = subtask;
+ private PipeCollector pipeCollector;
+
+ PipeTaskCollectorStage(PipeParameters collectorParameters) {
+ this.collectorParameters = collectorParameters;
}
@Override
public void create() throws PipeException {
- executor.register(subtask);
+ this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
}
@Override
public void start() throws PipeException {
- executor.start(subtask.getTaskID());
+ try {
+ pipeCollector.start();
+ } catch (Exception e) {
+ throw new PipeException(e.getMessage(), e);
+ }
}
@Override
public void stop() throws PipeException {
- executor.stop(subtask.getTaskID());
+ // collector continuously collects data, so do nothing in stop
}
@Override
public void drop() throws PipeException {
- executor.deregister(subtask.getTaskID());
+ try {
+ pipeCollector.close();
+ } catch (Exception e) {
+ throw new PipeException(e.getMessage(), e);
+ }
}
@Override
public PipeSubtask getSubtask() {
- return subtask;
+ throw new UnsupportedOperationException("Collector stage does not have subtask.");
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 4a9abbe..c453ee6 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.pipe.core.collector;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -32,6 +34,7 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -61,19 +64,46 @@
@Test
public void testCachedMatcher() throws ExecutionException, InterruptedException {
- PipeRealtimeDataRegionCollector databaseCollector =
- new PipeRealtimeFakeDataRegionCollector("root", "1");
+ PipeRealtimeDataRegionCollector databaseCollector = new PipeRealtimeDataRegionFakeCollector();
+ databaseCollector.customize(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root");
+ put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+ }
+ }),
+ null);
collectorList.add(databaseCollector);
int deviceCollectorNum = 10;
int seriesCollectorNum = 10;
for (int i = 0; i < deviceCollectorNum; i++) {
- PipeRealtimeDataRegionCollector deviceCollector =
- new PipeRealtimeFakeDataRegionCollector("root." + i, "1");
+ PipeRealtimeDataRegionCollector deviceCollector = new PipeRealtimeDataRegionFakeCollector();
+ int finalI1 = i;
+ deviceCollector.customize(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI1);
+ put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+ }
+ }),
+ null);
collectorList.add(deviceCollector);
for (int j = 0; j < seriesCollectorNum; j++) {
- PipeRealtimeDataRegionCollector seriesCollector =
- new PipeRealtimeFakeDataRegionCollector("root." + i + "." + j, "1");
+ PipeRealtimeDataRegionCollector seriesCollector = new PipeRealtimeDataRegionFakeCollector();
+ int finalI = i;
+ int finalJ = j;
+ seriesCollector.customize(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI + "." + finalJ);
+ put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+ }
+ }),
+ null);
collectorList.add(seriesCollector);
}
}
@@ -115,11 +145,7 @@
future.get();
}
- public static class PipeRealtimeFakeDataRegionCollector extends PipeRealtimeDataRegionCollector {
-
- public PipeRealtimeFakeDataRegionCollector(String pattern, String dataRegionId) {
- super(pattern, dataRegionId);
- }
+ public static class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector {
@Override
public Event supply() {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 3d39545..f218e68 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -23,9 +23,11 @@
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
-import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeHybridDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.EventType;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -39,6 +41,7 @@
import java.io.File;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -79,14 +82,51 @@
public void testRealtimeCollectProcess() throws ExecutionException, InterruptedException {
// set up realtime collector
- try (PipeRealtimeHybridDataRegionCollector collector1 =
- new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion1);
- PipeRealtimeHybridDataRegionCollector collector2 =
- new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion1);
- PipeRealtimeHybridDataRegionCollector collector3 =
- new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion2);
- PipeRealtimeHybridDataRegionCollector collector4 =
- new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion2)) {
+ try (PipeRealtimeDataRegionHybridCollector collector1 =
+ new PipeRealtimeDataRegionHybridCollector();
+ PipeRealtimeDataRegionHybridCollector collector2 =
+ new PipeRealtimeDataRegionHybridCollector();
+ PipeRealtimeDataRegionHybridCollector collector3 =
+ new PipeRealtimeDataRegionHybridCollector();
+ PipeRealtimeDataRegionHybridCollector collector4 =
+ new PipeRealtimeDataRegionHybridCollector()) {
+
+ collector1.customize(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+ put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+ }
+ }),
+ null);
+ collector2.customize(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+ put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+ }
+ }),
+ null);
+ collector3.customize(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+ put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+ }
+ }),
+ null);
+ collector4.customize(
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+ put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+ }
+ }),
+ null);
PipeRealtimeDataRegionCollector[] collectors =
new PipeRealtimeDataRegionCollector[] {collector1, collector2, collector3, collector4};