Add InventoryDumperCreator SPI to decouple InventoryDumper and ScalingEntry (#20157)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
new file mode 100644
index 0000000..0bc3df8
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+
+import javax.sql.DataSource;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+
+/**
+ * Inventory dumper creator.
+ */
+@SingletonSPI
+public interface InventoryDumperCreator extends TypedSPI {
+
+ /**
+ * Create Inventory Dumper.
+ * @param inventoryDumperConfig inventoryDumperConfig
+ * @param channel chanel
+ * @param sourceDataSource sourceDataSource
+ * @param sourceMetaDataLoader sourceMetaDataLoader
+ * @return InventoryDumper
+ */
+ InventoryDumper createInventoryDumper(InventoryDumperConfiguration inventoryDumperConfig, PipelineChannel channel,
+ DataSource sourceDataSource, PipelineTableMetaDataLoader sourceMetaDataLoader);
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
new file mode 100644
index 0000000..f291b6a
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+
+/**
+ * InventoryDumper creator factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class InventoryDumperCreatorFactory {
+
+ static {
+ ShardingSphereServiceLoader.register(InventoryDumperCreator.class);
+ }
+
+ /**
+ * Get inventoryDumper creator instance.
+ * @param databaseType databaseType
+ * @return InventoryDumperCreator
+ */
+ public static InventoryDumperCreator getInstance(final String databaseType) {
+ return TypedSPIRegistry.getRegisteredService(InventoryDumperCreator.class, databaseType);
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ffc9f32..b3b7965 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -34,11 +34,11 @@
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
-import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
import javax.sql.DataSource;
import java.util.List;
@@ -72,7 +72,8 @@
this.importerExecuteEngine = importerExecuteEngine;
taskId = generateTaskId(inventoryDumperConfig);
channel = createChannel(pipelineChannelCreator);
- dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
+ dumper = InventoryDumperCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource,
+ sourceMetaDataLoader);
importer = ImporterCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig, dataSourceManager, channel, jobProgressListener);
position = inventoryDumperConfig.getPosition();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
index 35d3fc7..3301824 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
@@ -21,16 +21,13 @@
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
-import javax.sql.DataSource;
import java.lang.reflect.Constructor;
/**
@@ -40,24 +37,6 @@
public final class DumperFactory {
/**
- * Create inventory dumper.
- *
- * @param inventoryDumperConfig inventory dumper configuration
- * @param channel channel
- * @param sourceDataSource data source
- * @param sourceMetaDataLoader metadata loader
- * @return inventory dumper
- */
- @SneakyThrows(ReflectiveOperationException.class)
- public static InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
- final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
- ScalingEntry scalingEntry = ScalingEntryFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType());
- Constructor<? extends InventoryDumper> constructor = scalingEntry.getInventoryDumperClass()
- .getConstructor(InventoryDumperConfiguration.class, PipelineChannel.class, DataSource.class, PipelineTableMetaDataLoader.class);
- return constructor.newInstance(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
- }
-
- /**
* Create incremental dumper.
*
* @param dumperConfig dumper configuration
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 5c1dfd0..16a446b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.scaling.core.spi;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -29,13 +28,6 @@
public interface ScalingEntry extends TypedSPI {
/**
- * Get inventory dumper type.
- *
- * @return inventory dumper type
- */
- Class<? extends InventoryDumper> getInventoryDumperClass();
-
- /**
* Get incremental dumper type.
*
* @return incremental dumper type
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
index 68ab8ff..a7977bd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
@@ -18,17 +18,11 @@
package org.apache.shardingsphere.scaling.core.spi.fixture;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
public final class ScalingEntryFixture implements ScalingEntry {
@Override
- public Class<? extends InventoryDumper> getInventoryDumperClass() {
- return InventoryDumper.class;
- }
-
- @Override
public Class<? extends IncrementalDumper> getIncrementalDumperClass() {
return IncrementalDumper.class;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
index 71c4c23..3943324 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.mysql;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
/**
@@ -27,11 +26,6 @@
public final class MySQLScalingEntry implements ScalingEntry {
@Override
- public Class<MySQLInventoryDumper> getInventoryDumperClass() {
- return MySQLInventoryDumper.class;
- }
-
- @Override
public Class<MySQLIncrementalDumper> getIncrementalDumperClass() {
return MySQLIncrementalDumper.class;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
new file mode 100644
index 0000000..7697ee5
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql;
+
+import javax.sql.DataSource;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+
+public class MySqlInventoryDumperCreator implements InventoryDumperCreator {
+
+ @Override
+ public InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig,
+ final PipelineChannel channel, final DataSource sourceDataSource,
+ final PipelineTableMetaDataLoader sourceMetaDataLoader) {
+ return new MySQLInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
+ }
+
+ @Override
+ public String getType() {
+ return "MySQL";
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
new file mode 100644
index 0000000..031f57d
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.mysql.MySqlInventoryDumperCreator
+
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
index 5ce566c..e90fa49 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.mysql;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
import org.junit.Test;
@@ -33,7 +32,6 @@
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry = ScalingEntryFactory.getInstance("MySQL");
assertThat(scalingEntry, instanceOf(MySQLScalingEntry.class));
- assertThat(scalingEntry.getInventoryDumperClass(), equalTo(MySQLInventoryDumper.class));
assertThat(scalingEntry.getIncrementalDumperClass(), equalTo(MySQLIncrementalDumper.class));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
index 36a072b..8cdbadb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.opengauss;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
/**
@@ -27,11 +26,6 @@
public final class OpenGaussScalingEntry implements ScalingEntry {
@Override
- public Class<PostgreSQLInventoryDumper> getInventoryDumperClass() {
- return PostgreSQLInventoryDumper.class;
- }
-
- @Override
public Class<OpenGaussWalDumper> getIncrementalDumperClass() {
return OpenGaussWalDumper.class;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
index 8c64808..b3534b9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.opengauss;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
import org.junit.Test;
@@ -33,7 +32,6 @@
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry actual = ScalingEntryFactory.getInstance("openGauss");
assertThat(actual, instanceOf(OpenGaussScalingEntry.class));
- assertThat(actual.getInventoryDumperClass(), equalTo(PostgreSQLInventoryDumper.class));
assertThat(actual.getIncrementalDumperClass(), equalTo(OpenGaussWalDumper.class));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
new file mode 100644
index 0000000..357edb2
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql;
+
+import javax.sql.DataSource;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+
+/**
+ * PostgreSQL inventory dumper creator.
+ */
+public class PostgreSQLInventoryDumperCreator implements InventoryDumperCreator {
+
+ @Override
+ public InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig,
+ final PipelineChannel channel, final DataSource sourceDataSource,
+ final PipelineTableMetaDataLoader sourceMetaDataLoader) {
+ return new PostgreSQLInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
+ }
+
+ @Override
+ public String getType() {
+ return "PostgreSQL";
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
index b21636c..1d04d0d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -27,11 +26,6 @@
public final class PostgreSQLScalingEntry implements ScalingEntry {
@Override
- public Class<PostgreSQLInventoryDumper> getInventoryDumperClass() {
- return PostgreSQLInventoryDumper.class;
- }
-
- @Override
public Class<PostgreSQLWalDumper> getIncrementalDumperClass() {
return PostgreSQLWalDumper.class;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
new file mode 100644
index 0000000..4864bf5
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.postgresql.PostgreSQLInventoryDumperCreator
+
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
index 700e139..067db98 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
@@ -33,7 +32,6 @@
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry = ScalingEntryFactory.getInstance("PostgreSQL");
assertThat(scalingEntry, instanceOf(PostgreSQLScalingEntry.class));
- assertThat(scalingEntry.getInventoryDumperClass(), equalTo(PostgreSQLInventoryDumper.class));
assertThat(scalingEntry.getIncrementalDumperClass(), equalTo(PostgreSQLWalDumper.class));
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
new file mode 100644
index 0000000..1a0bdbe
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.dumper;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class InventoryDumperCreatorFactoryTest {
+
+ private PipelineDataSourceWrapper dataSource;
+
+ @Before
+ public void setUp() {
+ PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
+ InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
+ dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+ }
+
+ @Test
+ public void assertInventoryDumperCreatorForMysql() {
+ InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("MySql")
+ .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ assertThat(actual, instanceOf(MySQLInventoryDumper.class));
+ }
+
+ @Test
+ public void assertInventoryDumperCreatorForPostgreSQL() {
+ InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("PostgreSQL")
+ .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
+ }
+
+ @Test
+ public void assertInventoryDumperCreatorForFixture() {
+ InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("Fixture")
+ .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+ assertThat(actual, instanceOf(FixtureInventoryDumper.class));
+ }
+
+ private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
+ DumperConfiguration dumperConfig = mockDumperConfiguration();
+ InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
+ result.setActualTableName("t_order");
+ result.setLogicTableName("t_order");
+ return result;
+ }
+
+ private DumperConfiguration mockDumperConfiguration() {
+ DumperConfiguration result = new DumperConfiguration();
+ result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
+ return result;
+ }
+}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
new file mode 100644
index 0000000..d099e0a
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import javax.sql.DataSource;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+
+/**
+ * Fixture iInventory dumper creator.
+ */
+public class FixtureInventoryDumperCreator implements InventoryDumperCreator {
+
+ private static final Collection<String> TYPE_ALIASES = Collections.unmodifiableList(Arrays.asList("Fixture", "H2"));
+
+ @Override
+ public InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig,
+ final PipelineChannel channel, final DataSource sourceDataSource,
+ final PipelineTableMetaDataLoader sourceMetaDataLoader) {
+ return new FixtureInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
+ }
+
+ @Override
+ public String getType() {
+ return "Fixture";
+ }
+
+ @Override
+ public Collection<String> getTypeAliases() {
+ return TYPE_ALIASES;
+ }
+}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
index d599be9..6b8f521 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
@@ -18,17 +18,11 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
public final class H2ScalingEntryFixture implements ScalingEntry {
@Override
- public Class<? extends InventoryDumper> getInventoryDumperClass() {
- return FixtureInventoryDumper.class;
- }
-
- @Override
public Class<? extends IncrementalDumper> getIncrementalDumperClass() {
return FixtureIncrementalDumper.class;
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
new file mode 100644
index 0000000..2989bc7
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryDumperCreator