Improve scaling packages and classes name (#20156)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 9811501..bb0aabd 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -18,13 +18,13 @@
package org.apache.shardingsphere.sharding.data.pipeline;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -74,15 +74,15 @@
public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer {
@Override
- public void extendJobConfiguration(final YamlRuleAlteredJobConfiguration yamlJobConfig) {
- Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new RuleAlteredJobConfigurationSwapper().swapToObject(yamlJobConfig));
+ public void extendJobConfiguration(final YamlMigrationJobConfiguration yamlJobConfig) {
+ Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new YamlMigrationJobConfigurationSwapper().swapToObject(yamlJobConfig));
yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
yamlJobConfig.setSchemaTablesMap(getSchemaTablesMap(yamlJobConfig.getDatabaseName(), actualDataNodes.keySet()));
}
- private static Map<String, List<DataNode>> getActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
+ private static Map<String, List<DataNode>> getActualDataNodes(final MigrationJobConfiguration jobConfig) {
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
@@ -153,7 +153,7 @@
// TODO use jobConfig as parameter, jobShardingItem
@Override
- public TaskConfiguration createTaskConfiguration(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+ public TaskConfiguration createTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
Map<String, DataSourceProperties> dataSourcePropsMap = new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
@@ -177,11 +177,11 @@
return result;
}
- private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final MigrationJobConfiguration jobConfig) {
return (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
}
- private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final MigrationJobConfiguration jobConfig) {
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
if (!(targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration)) {
return Optional.empty();
@@ -237,7 +237,7 @@
return result;
}
- private static ImporterConfiguration createImporterConfiguration(final RuleAlteredJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
+ private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
int batchSize = pipelineProcessConfig.getOutput().getBatchSize();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
similarity index 94%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
index deea0fa..a5dbba8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/ImporterConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import java.util.Collection;
import java.util.Collections;
@@ -31,7 +31,6 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
/**
* Importer configuration.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
similarity index 94%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
index 5356a16..67c7187 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
similarity index 90%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
index f112b51..d3d77cb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
@@ -15,25 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.config.job;
import com.google.common.base.Splitter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import java.util.List;
import java.util.Map;
/**
- * Rule altered job configuration.
+ * Migration job configuration.
*/
@RequiredArgsConstructor
@Getter
@Slf4j
-public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration {
+public final class MigrationJobConfiguration implements PipelineJobConfiguration {
private final String jobId;
@@ -98,7 +97,7 @@
@Override
public String toString() {
- return "RuleAlteredJobConfiguration{"
+ return "MigrationJobConfiguration{"
+ "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
+ ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
+ ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
similarity index 90%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
index efc26ef..3971d8b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
@@ -15,25 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import java.util.List;
import java.util.Map;
/**
- * Rule altered job configuration for YAML.
+ * Migration job configuration for YAML.
*/
@Getter
@Setter
@Slf4j
-public final class YamlRuleAlteredJobConfiguration implements YamlPipelineJobConfiguration {
+public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfiguration {
private String jobId;
@@ -106,7 +105,7 @@
@Override
public String toString() {
- return "YamlRuleAlteredJobConfiguration{"
+ return "YamlMigrationJobConfiguration{"
+ "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
+ ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
+ ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
similarity index 72%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
index ac6682b..9c28e43 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
- * Rule altered job configuration swapper.
+ * YAML migration job configuration swapper.
*/
-// TODO add RuleAlteredJobConfigurationSwapper test
-public final class RuleAlteredJobConfigurationSwapper implements YamlConfigurationSwapper<YamlRuleAlteredJobConfiguration, RuleAlteredJobConfiguration> {
+// TODO add test
+public final class YamlMigrationJobConfigurationSwapper implements YamlConfigurationSwapper<YamlMigrationJobConfiguration, MigrationJobConfiguration> {
- private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
+ private static final YamlMigrationJobConfigurationSwapper JOB_CONFIG_SWAPPER = new YamlMigrationJobConfigurationSwapper();
private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
@Override
- public YamlRuleAlteredJobConfiguration swapToYamlConfiguration(final RuleAlteredJobConfiguration data) {
- YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
+ public YamlMigrationJobConfiguration swapToYamlConfiguration(final MigrationJobConfiguration data) {
+ YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setJobId(data.getJobId());
result.setDatabaseName(data.getDatabaseName());
result.setActiveVersion(data.getActiveVersion());
@@ -54,8 +54,8 @@
}
@Override
- public RuleAlteredJobConfiguration swapToObject(final YamlRuleAlteredJobConfiguration yamlConfig) {
- return new RuleAlteredJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
+ public MigrationJobConfiguration swapToObject(final YamlMigrationJobConfiguration yamlConfig) {
+ return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
@@ -70,8 +70,8 @@
* @param jobParameter job parameter
* @return job configuration
*/
- public static RuleAlteredJobConfiguration swapToObject(final String jobParameter) {
- YamlRuleAlteredJobConfiguration yamlJobConfig = YamlEngine.unmarshal(jobParameter, YamlRuleAlteredJobConfiguration.class, true);
+ public static MigrationJobConfiguration swapToObject(final String jobParameter) {
+ YamlMigrationJobConfiguration yamlJobConfig = YamlEngine.unmarshal(jobParameter, YamlMigrationJobConfiguration.class, true);
return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
index 8289e76..eed6f41 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.spi.importer;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index 70f4ade..2137bfd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -33,7 +33,7 @@
*
* @param yamlJobConfig YAML job configuration
*/
- void extendJobConfiguration(YamlRuleAlteredJobConfiguration yamlJobConfig);
+ void extendJobConfiguration(YamlMigrationJobConfiguration yamlJobConfig);
/**
* Create task configuration, used by underlying scheduler.
@@ -43,5 +43,5 @@
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
- TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
+ TaskConfiguration createTaskConfiguration(MigrationJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
new file mode 100644
index 0000000..504ce95
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class MigrationJobConfigurationTest {
+
+ private static final YamlMigrationJobConfigurationSwapper JOB_CONFIG_SWAPPER = new YamlMigrationJobConfigurationSwapper();
+
+ @Test
+ public void assertGetJobShardingCountByNull() {
+ YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
+ MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+ assertThat(jobConfig.getJobShardingCount(), is(0));
+ }
+
+ @Test
+ public void assertGetJobShardingCount() {
+ YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
+ yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+ MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+ assertThat(jobConfig.getJobShardingCount(), is(2));
+ }
+
+ @Test
+ public void assertSplitLogicTableNames() {
+ YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
+ yamlJobConfig.setLogicTables("foo_tbl,bar_tbl");
+ MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+ assertThat(jobConfig.splitLogicTableNames(), is(Arrays.asList("foo_tbl", "bar_tbl")));
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
deleted file mode 100644
index 2d783b9..0000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class RuleAlteredJobConfigurationTest {
-
- private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper();
-
- @Test
- public void assertGetJobShardingCountByNull() {
- YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
- RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
- assertThat(jobConfig.getJobShardingCount(), is(0));
- }
-
- @Test
- public void assertGetJobShardingCount() {
- YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
- yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
- RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
- assertThat(jobConfig.getJobShardingCount(), is(2));
- }
-
- @Test
- public void assertSplitLogicTableNames() {
- YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration();
- yamlJobConfig.setLogicTables("foo_tbl,bar_tbl");
- RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
- assertThat(jobConfig.splitLogicTableNames(), is(Arrays.asList("foo_tbl", "bar_tbl")));
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
index e454bd4..ab73dbd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
@@ -17,20 +17,20 @@
package org.apache.shardingsphere.data.pipeline.spi.fixture;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
public final class RuleAlteredJobConfigurationPreparerFixture implements RuleAlteredJobConfigurationPreparer {
@Override
- public void extendJobConfiguration(final YamlRuleAlteredJobConfiguration yamlJobConfig) {
+ public void extendJobConfiguration(final YamlMigrationJobConfiguration yamlJobConfig) {
}
@Override
- public TaskConfiguration createTaskConfiguration(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+ public TaskConfiguration createTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
return null;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3da0bc5..a858e15 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -21,15 +21,15 @@
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -66,7 +66,7 @@
log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
- repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), RuleAlteredJob.class.getName());
+ repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), MigrationJob.class.getName());
repositoryAPI.persist(jobConfigKey, convertJobConfigurationToText(jobConfig));
return Optional.of(jobId);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index e165e11..3a18c77 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -23,7 +23,7 @@
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -68,13 +68,13 @@
public final class DataConsistencyChecker {
// TODO remove jobConfig for common usage
- private final RuleAlteredJobConfiguration jobConfig;
+ private final MigrationJobConfiguration jobConfig;
private final Collection<String> logicTableNames;
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- public DataConsistencyChecker(final RuleAlteredJobConfiguration jobConfig) {
+ public DataConsistencyChecker(final MigrationJobConfiguration jobConfig) {
this.jobConfig = jobConfig;
logicTableNames = jobConfig.splitLogicTableNames();
tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 373fa3a..7c05017 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,16 +18,16 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -74,12 +74,12 @@
String jobId = jobConfigPOJO.getJobName();
log.info("jobId={}, deleted={}, disabled={}", jobId, isDeleted, isDisabled);
// TODO refactor: dispatch to different job types
- RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+ MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
if (isDeleted) {
- new RuleAlteredJobPreparer().cleanup(jobConfig);
- } else if (PipelineJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), RuleAlteredJobAPIFactory.getInstance().getJobProgress(jobConfig).values())) {
+ new MigrationJobPreparer().cleanup(jobConfig);
+ } else if (PipelineJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), MigrationJobAPIFactory.getInstance().getJobProgress(jobConfig).values())) {
log.info("isJobSuccessful=true");
- new RuleAlteredJobPreparer().cleanup(jobConfig);
+ new MigrationJobPreparer().cleanup(jobConfig);
}
PipelineJobCenter.stop(jobId);
return;
@@ -100,7 +100,7 @@
}
private void execute(final JobConfigurationPOJO jobConfigPOJO) {
- RuleAlteredJob job = new RuleAlteredJob();
+ MigrationJob job = new MigrationJob();
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
oneOffJobBootstrap.execute();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 2e069cc..0b17237 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -20,7 +20,7 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
index c8d5b4c..fb7b51c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
similarity index 89%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
index 2b46033..af9a505 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
/**
* Abstract pipeline job id.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 6bb7b0d..01e2dbf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -18,15 +18,15 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -39,14 +39,14 @@
@Slf4j
public final class FinishedCheckJob implements SimpleJob {
- private final RuleAlteredJobAPI ruleAlteredJobAPI = RuleAlteredJobAPIFactory.getInstance();
+ private final MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
private final Set<String> onCheckJobIds = new ConcurrentSkipListSet<>();
// TODO only one proxy node could do data consistency check in proxy cluster
@Override
public void execute(final ShardingContext shardingContext) {
- List<JobInfo> jobInfos = ruleAlteredJobAPI.list();
+ List<JobInfo> jobInfos = jobAPI.list();
for (JobInfo jobInfo : jobInfos) {
if (!jobInfo.isActive()) {
continue;
@@ -63,22 +63,22 @@
onCheckJobIds.add(jobId);
try {
// TODO refactor: dispatch to different job types
- RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
- RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) {
+ MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
+ MigrationContext migrationContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ if (null == migrationContext.getCompletionDetectAlgorithm()) {
log.info("completionDetector not configured, auto switch will not be enabled. You could query job progress and switch config manually with DistSQL.");
continue;
}
- RuleAlteredJobAlmostCompletedParameter parameter = new RuleAlteredJobAlmostCompletedParameter(jobInfo.getShardingTotalCount(), ruleAlteredJobAPI.getJobProgress(jobConfig).values());
- if (!ruleAlteredContext.getCompletionDetectAlgorithm().isAlmostCompleted(parameter)) {
+ RuleAlteredJobAlmostCompletedParameter parameter = new RuleAlteredJobAlmostCompletedParameter(jobInfo.getShardingTotalCount(), jobAPI.getJobProgress(jobConfig).values());
+ if (!migrationContext.getCompletionDetectAlgorithm().isAlmostCompleted(parameter)) {
continue;
}
log.info("scaling job {} almost finished.", jobId);
try {
- ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
- if (!ruleAlteredJobAPI.isDataConsistencyCheckNeeded(jobConfig)) {
+ jobAPI.stopClusterWriteDB(jobConfig);
+ if (!jobAPI.isDataConsistencyCheckNeeded(jobConfig)) {
log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
- ruleAlteredJobAPI.switchClusterConfiguration(jobConfig);
+ jobAPI.switchClusterConfiguration(jobConfig);
continue;
}
if (!dataConsistencyCheck(jobConfig)) {
@@ -87,7 +87,7 @@
}
switchClusterConfiguration(jobConfig);
} finally {
- ruleAlteredJobAPI.restoreClusterWriteDB(jobConfig);
+ jobAPI.restoreClusterWriteDB(jobConfig);
}
log.info("job {} finished", jobId);
// CHECKSTYLE:OFF
@@ -101,7 +101,7 @@
}
private boolean isNotAllowDataCheck(final String jobId) {
- Map<Integer, InventoryIncrementalJobItemProgress> jobItemProgressMap = ruleAlteredJobAPI.getJobProgress(jobId);
+ Map<Integer, InventoryIncrementalJobItemProgress> jobItemProgressMap = jobAPI.getJobProgress(jobId);
for (InventoryIncrementalJobItemProgress each : jobItemProgressMap.values()) {
if (null == each || !JobStatus.EXECUTE_INCREMENTAL_TASK.equals(each.getStatus())) {
return true;
@@ -110,13 +110,13 @@
return false;
}
- private boolean dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
+ private boolean dataConsistencyCheck(final MigrationJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
log.info("dataConsistencyCheck for job {}", jobId);
- return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, ruleAlteredJobAPI.dataConsistencyCheck(jobConfig));
+ return jobAPI.aggregateDataConsistencyCheckResults(jobId, jobAPI.dataConsistencyCheck(jobConfig));
}
- private void switchClusterConfiguration(final RuleAlteredJobConfiguration jobConfig) {
- ruleAlteredJobAPI.switchClusterConfiguration(jobConfig);
+ private void switchClusterConfiguration(final MigrationJobConfiguration jobConfig) {
+ jobAPI.switchClusterConfiguration(jobConfig);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
similarity index 90%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index a25ec5d..6e7beb6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -15,7 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
/**
* Pipeline job id utils.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 20066a1..73e185a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -19,9 +19,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import java.util.Collections;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
similarity index 98%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 5740d80..072ea27 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
+package org.apache.shardingsphere.data.pipeline.core.prepare;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
similarity index 98%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 71f590b..fd4eb6b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.prepare;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index ed7a7323..820b87f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -83,7 +83,7 @@
}
// TODO the invocation is disabled for now, it might be used again for next new feature
- protected final PipelineDataSourceWrapper getSourceCachedDataSource(final RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
+ protected final PipelineDataSourceWrapper getSourceCachedDataSource(final MigrationJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index df8d085..dd93578 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -20,8 +20,8 @@
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index e9b9c15..3ac38ed 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -23,13 +23,13 @@
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import java.util.Collection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index b3b7965..762ae27 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -20,8 +20,8 @@
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
similarity index 91%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
index 81991f5..3386acb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -31,11 +31,11 @@
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
/**
- * Rule altered context.
+ * Migration context.
*/
@Getter
@Slf4j
-public final class RuleAlteredContext extends AbstractPipelineProcessContext {
+public final class MigrationContext extends AbstractPipelineProcessContext {
private static final YamlOnRuleAlteredActionConfigurationSwapper SWAPPER = new YamlOnRuleAlteredActionConfigurationSwapper();
@@ -44,7 +44,7 @@
private final DataConsistencyCalculateAlgorithm dataConsistencyCalculateAlgorithm;
@SuppressWarnings("unchecked")
- public RuleAlteredContext(final String jobId, final OnRuleAlteredActionConfiguration actionConfig) {
+ public MigrationContext(final String jobId, final OnRuleAlteredActionConfiguration actionConfig) {
super(jobId, new PipelineProcessConfiguration(actionConfig.getInput(), actionConfig.getOutput(), actionConfig.getStreamChannel()));
AlgorithmConfiguration completionDetector = actionConfig.getCompletionDetector();
completionDetectAlgorithm = null != completionDetector ? JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
similarity index 80%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 358e0af2..001da10 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -15,18 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
@@ -37,16 +36,16 @@
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
/**
- * Rule altered job.
+ * Migration job.
*/
@Slf4j
@RequiredArgsConstructor
-public final class RuleAlteredJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
+public final class MigrationJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
// Shared by all sharding items
- private final RuleAlteredJobPreparer jobPreparer = new RuleAlteredJobPreparer();
+ private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();
@Override
public void execute(final ShardingContext shardingContext) {
@@ -56,9 +55,9 @@
return;
}
setJobId(shardingContext.getJobName());
- RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
- InventoryIncrementalJobItemProgress initProgress = RuleAlteredJobAPIFactory.getInstance().getJobItemProgress(shardingContext.getJobName(), shardingContext.getShardingItem());
- RuleAlteredJobContext jobItemContext = new RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(), initProgress, dataSourceManager);
+ MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
+ InventoryIncrementalJobItemProgress initProgress = MigrationJobAPIFactory.getInstance().getJobItemProgress(shardingContext.getJobName(), shardingContext.getShardingItem());
+ MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, shardingContext.getShardingItem(), initProgress, dataSourceManager);
int shardingItem = jobItemContext.getShardingItem();
if (getTasksRunnerMap().containsKey(shardingItem)) {
log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
@@ -75,7 +74,7 @@
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
}
- private void prepare(final RuleAlteredJobContext jobItemContext) {
+ private void prepare(final MigrationJobItemContext jobItemContext) {
try {
jobPreparer.prepare(jobItemContext);
} catch (final PipelineIgnoredException ex) {
@@ -87,7 +86,7 @@
log.error("job prepare failed, {}-{}", getJobId(), jobItemContext.getShardingItem(), ex);
PipelineJobCenter.stop(getJobId());
jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
- RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+ MigrationJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
throw ex;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
similarity index 76%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 468a0c9..b945b00 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -15,25 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import java.util.Map;
/**
- * Rule altered job API.
+ * Migration job API.
*/
@SingletonSPI
-public interface RuleAlteredJobAPI extends PipelineJobAPI, MigrationJobPublicAPI, RequiredSPI {
+public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI, RequiredSPI {
@Override
- RuleAlteredJobConfiguration getJobConfiguration(String jobId);
+ MigrationJobConfiguration getJobConfiguration(String jobId);
/**
* Get job progress.
@@ -41,7 +42,7 @@
* @param jobConfig job configuration
* @return each sharding item progress
*/
- Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(RuleAlteredJobConfiguration jobConfig);
+ Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(MigrationJobConfiguration jobConfig);
@Override
InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
@@ -51,14 +52,14 @@
*
* @param jobConfig job configuration
*/
- void stopClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
+ void stopClusterWriteDB(MigrationJobConfiguration jobConfig);
/**
* Restore cluster writing.
*
* @param jobConfig job configuration
*/
- void restoreClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
+ void restoreClusterWriteDB(MigrationJobConfiguration jobConfig);
/**
* Is data consistency check needed.
@@ -66,7 +67,7 @@
* @param jobConfig job configuration
* @return data consistency check needed or not
*/
- boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration jobConfig);
+ boolean isDataConsistencyCheckNeeded(MigrationJobConfiguration jobConfig);
/**
* Do data consistency check.
@@ -74,7 +75,7 @@
* @param jobConfig job configuration
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(MigrationJobConfiguration jobConfig);
/**
* Aggregate data consistency check results.
@@ -90,5 +91,5 @@
*
* @param jobConfig job configuration
*/
- void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
+ void switchClusterConfiguration(MigrationJobConfiguration jobConfig);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactory.java
similarity index 73%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactory.java
index e0a87e0..0c58d6a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactory.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
/**
- * Rule altered job API factory.
+ * Migration job API factory.
*/
-public final class RuleAlteredJobAPIFactory {
+public final class MigrationJobAPIFactory {
static {
- ShardingSphereServiceLoader.register(RuleAlteredJobAPI.class);
+ ShardingSphereServiceLoader.register(MigrationJobAPI.class);
}
/**
- * Get instance of rule altered job API.
+ * Get instance of migration job API.
*
* @return got instance
*/
- public static RuleAlteredJobAPI getInstance() {
- return RequiredSPIRegistry.getRegisteredService(RuleAlteredJobAPI.class);
+ public static MigrationJobAPI getInstance() {
+ return RequiredSPIRegistry.getRegisteredService(MigrationJobAPI.class);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
similarity index 80%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 60e170a..a2da011 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -15,39 +15,38 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.api.impl;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
@@ -73,23 +72,23 @@
import java.util.stream.Stream;
/**
- * Rule altered job API impl.
+ * Migration job API impl.
*/
@Slf4j
-public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl implements RuleAlteredJobAPI {
+public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implements MigrationJobAPI {
private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
- RuleAlteredJobId jobId = (RuleAlteredJobId) pipelineJobId;
+ MigrationJobId jobId = (MigrationJobId) pipelineJobId;
String text = jobId.getFormatVersion() + "|" + jobId.getCurrentMetadataVersion() + "T" + jobId.getNewMetadataVersion() + "|" + jobId.getDatabaseName();
return Hex.encodeHexString(text.getBytes(StandardCharsets.UTF_8), true);
}
@Override
public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
- YamlRuleAlteredJobConfiguration config = (YamlRuleAlteredJobConfiguration) yamlJobConfig;
+ YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig;
if (null == config.getJobShardingDataNodes()) {
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(config);
}
@@ -106,10 +105,10 @@
}
}
- private String generateJobId(final YamlRuleAlteredJobConfiguration config) {
- RuleAlteredJobId jobId = new RuleAlteredJobId();
+ private String generateJobId(final YamlMigrationJobConfiguration config) {
+ MigrationJobId jobId = new MigrationJobId();
jobId.setTypeCode(JobType.MIGRATION.getTypeCode());
- jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+ jobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
jobId.setCurrentMetadataVersion(config.getActiveVersion());
jobId.setNewMetadataVersion(config.getNewVersion());
jobId.setDatabaseName(config.getDatabaseName());
@@ -118,16 +117,16 @@
@Override
protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
- return new RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration((RuleAlteredJobConfiguration) jobConfig);
+ return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig);
}
@Override
- public RuleAlteredJobConfiguration getJobConfiguration(final String jobId) {
+ public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
}
- private RuleAlteredJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
- return RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+ private MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
+ return YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
@Override
@@ -149,7 +148,7 @@
private JobInfo getJobInfo(final String jobName) {
JobInfo result = new JobInfo(jobName);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(result.getJobId());
- RuleAlteredJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+ MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
result.setActive(!jobConfigPOJO.isDisabled());
result.setShardingTotalCount(jobConfig.getJobShardingCount());
result.setTables(jobConfig.getLogicTables());
@@ -166,7 +165,7 @@
}
@Override
- public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final RuleAlteredJobConfiguration jobConfig) {
+ public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final MigrationJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
@@ -193,14 +192,14 @@
jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
}
- private void verifyManualMode(final RuleAlteredJobConfiguration jobConfig) {
- RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- if (null != ruleAlteredContext.getCompletionDetectAlgorithm()) {
+ private void verifyManualMode(final MigrationJobConfiguration jobConfig) {
+ MigrationContext migrationContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ if (null != migrationContext.getCompletionDetectAlgorithm()) {
throw new PipelineVerifyFailedException("It's not necessary to do it in auto mode.");
}
}
- private void verifyJobNotCompleted(final RuleAlteredJobConfiguration jobConfig) {
+ private void verifyJobNotCompleted(final MigrationJobConfiguration jobConfig) {
if (PipelineJobProgressDetector.isJobCompleted(jobConfig.getJobShardingCount(), getJobProgress(jobConfig).values())) {
throw new PipelineVerifyFailedException("Job is completed, it's not necessary to do it.");
}
@@ -211,7 +210,7 @@
checkModeConfig();
log.info("stopClusterWriteDB for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+ MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
verifyManualMode(jobConfig);
verifyJobNotStopped(jobConfigPOJO);
verifyJobNotCompleted(jobConfig);
@@ -219,7 +218,7 @@
}
@Override
- public void stopClusterWriteDB(final RuleAlteredJobConfiguration jobConfig) {
+ public void stopClusterWriteDB(final MigrationJobConfiguration jobConfig) {
String databaseName = jobConfig.getDatabaseName();
LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockDefinition lockDefinition = new ExclusiveLockDefinition(databaseName);
@@ -239,13 +238,13 @@
checkModeConfig();
log.info("restoreClusterWriteDB for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+ MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
verifyManualMode(jobConfig);
restoreClusterWriteDB(jobConfig);
}
@Override
- public void restoreClusterWriteDB(final RuleAlteredJobConfiguration jobConfig) {
+ public void restoreClusterWriteDB(final MigrationJobConfiguration jobConfig) {
String databaseName = jobConfig.getDatabaseName();
LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockDefinition lockDefinition = new ExclusiveLockDefinition(databaseName);
@@ -272,49 +271,49 @@
@Override
public boolean isDataConsistencyCheckNeeded(final String jobId) {
log.info("isDataConsistencyCheckNeeded for job {}", jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
return isDataConsistencyCheckNeeded(jobConfig);
}
@Override
- public boolean isDataConsistencyCheckNeeded(final RuleAlteredJobConfiguration jobConfig) {
- RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- return isDataConsistencyCheckNeeded(ruleAlteredContext);
+ public boolean isDataConsistencyCheckNeeded(final MigrationJobConfiguration jobConfig) {
+ MigrationContext migrationContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ return isDataConsistencyCheckNeeded(migrationContext);
}
- private boolean isDataConsistencyCheckNeeded(final RuleAlteredContext ruleAlteredContext) {
- return null != ruleAlteredContext.getDataConsistencyCalculateAlgorithm();
+ private boolean isDataConsistencyCheckNeeded(final MigrationContext migrationContext) {
+ return null != migrationContext.getDataConsistencyCalculateAlgorithm();
}
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
checkModeConfig();
log.info("Data consistency check for job {}", jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ MigrationJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig);
}
@Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
- RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- if (!isDataConsistencyCheckNeeded(ruleAlteredContext)) {
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final MigrationJobConfiguration jobConfig) {
+ MigrationContext migrationContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ if (!isDataConsistencyCheckNeeded(migrationContext)) {
log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
return Collections.emptyMap();
}
- return dataConsistencyCheck(jobConfig, ruleAlteredContext.getDataConsistencyCalculateAlgorithm());
+ return dataConsistencyCheck(jobConfig, migrationContext.getDataConsistencyCalculateAlgorithm());
}
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType, final Properties algorithmProps) {
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
- RuleAlteredJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ MigrationJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
}
- private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
+ private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
String jobId = jobConfig.getJobId();
Map<String, DataConsistencyCheckResult> result = new DataConsistencyChecker(jobConfig).check(calculator);
log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getType(), result);
@@ -322,7 +321,7 @@
return result;
}
- private void verifyDataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
+ private void verifyDataConsistencyCheck(final MigrationJobConfiguration jobConfig) {
verifyManualMode(jobConfig);
}
@@ -348,7 +347,7 @@
checkModeConfig();
log.info("Switch cluster configuration for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+ MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
verifyManualMode(jobConfig);
verifyJobNotStopped(jobConfigPOJO);
verifyJobNotCompleted(jobConfig);
@@ -356,11 +355,11 @@
}
@Override
- public void switchClusterConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ public void switchClusterConfiguration(final MigrationJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
- RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationContext migrationContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- if (isDataConsistencyCheckNeeded(ruleAlteredContext)) {
+ if (isDataConsistencyCheckNeeded(migrationContext)) {
Optional<Boolean> checkResult = repositoryAPI.getJobCheckResult(jobId);
if (!checkResult.isPresent() || !checkResult.get()) {
throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
similarity index 82%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index f21780e..f204d9d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
/**
- * Job id.
+ * Migration job id.
*/
@Getter
@Setter
@ToString(callSuper = true)
-// TODO rename and change fields
-public final class RuleAlteredJobId extends AbstractPipelineJobId {
+public final class MigrationJobId extends AbstractPipelineJobId {
public static final String CURRENT_VERSION = "01";
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
similarity index 84%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 142497c..c58ed2d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.Setter;
@@ -23,8 +23,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -33,17 +33,18 @@
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import java.util.Collection;
import java.util.LinkedList;
/**
- * Rule altered job item context.
+ * Migration job item context.
*/
@Getter
@Setter
@Slf4j
-public final class RuleAlteredJobContext implements InventoryIncrementalJobItemContext {
+public final class MigrationJobItemContext implements InventoryIncrementalJobItemContext {
private final String jobId;
@@ -61,9 +62,9 @@
private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
- private final RuleAlteredJobConfiguration jobConfig;
+ private final MigrationJobConfiguration jobConfig;
- private final RuleAlteredContext jobProcessContext;
+ private final MigrationContext jobProcessContext;
private final PipelineDataSourceManager dataSourceManager;
@@ -83,8 +84,9 @@
}
};
- public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final InventoryIncrementalJobItemProgress initProgress,
- final PipelineDataSourceManager dataSourceManager) {
+ public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final InventoryIncrementalJobItemProgress initProgress,
+ final PipelineDataSourceManager dataSourceManager) {
+ // TODO refactor RuleAlteredJobWorker
jobProcessContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
similarity index 86%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index ebf4b7d..e1fbed4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -15,30 +15,28 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
+import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJobPreparerUtils;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -50,19 +48,19 @@
import java.util.Collections;
/**
- * Rule altered job preparer.
+ * Migration job preparer.
*/
@Slf4j
-public final class RuleAlteredJobPreparer {
+public final class MigrationJobPreparer {
- private static final RuleAlteredJobAPI JOB_API = RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobAPI JOB_API = MigrationJobAPIFactory.getInstance();
/**
* Do prepare work for scaling job.
*
* @param jobItemContext job item context
*/
- public void prepare(final RuleAlteredJobContext jobItemContext) {
+ public void prepare(final MigrationJobItemContext jobItemContext) {
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
throw new PipelineIgnoredException("Job stopping, jobId=" + jobItemContext.getJobId());
@@ -86,8 +84,8 @@
}
}
- private void prepareAndCheckTargetWithLock(final RuleAlteredJobContext jobItemContext) {
- RuleAlteredJobConfiguration jobConfig = jobItemContext.getJobConfig();
+ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) {
+ MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String lockName = "prepare-" + jobConfig.getJobId();
LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockDefinition lockDefinition = new ExclusiveLockDefinition(lockName);
@@ -115,7 +113,7 @@
}
}
- private void prepareAndCheckTarget(final RuleAlteredJobContext jobItemContext) {
+ private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) {
prepareTarget(jobItemContext);
InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
if (null == initProgress || initProgress.getStatus() == JobStatus.PREPARING_FAILURE) {
@@ -125,8 +123,8 @@
}
}
- private void prepareTarget(final RuleAlteredJobContext jobItemContext) {
- RuleAlteredJobConfiguration jobConfig = jobItemContext.getJobConfig();
+ private void prepareTarget(final MigrationJobItemContext jobItemContext) {
+ MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
TableNameSchemaNameMapping tableNameSchemaNameMapping = jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
String targetDatabaseType = jobConfig.getTargetDatabaseType();
if (isSourceAndTargetSchemaAvailable(jobConfig)) {
@@ -141,7 +139,7 @@
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, prepareTargetTablesParameter);
}
- private boolean isSourceAndTargetSchemaAvailable(final RuleAlteredJobConfiguration jobConfig) {
+ private boolean isSourceAndTargetSchemaAvailable(final MigrationJobConfiguration jobConfig) {
DatabaseType sourceDatabaseType = DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
DatabaseType targetDatabaseType = DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType());
if (!sourceDatabaseType.isSchemaAvailable() || !targetDatabaseType.isSchemaAvailable()) {
@@ -151,13 +149,13 @@
return true;
}
- private void initInventoryTasks(final RuleAlteredJobContext jobItemContext) {
+ private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(),
jobItemContext.getJobProcessContext().getImporterExecuteEngine(), jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), jobItemContext.getInitProgress());
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
}
- private void initIncrementalTasks(final RuleAlteredJobContext jobItemContext) throws SQLException {
+ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) throws SQLException {
PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
ExecuteEngine incrementalDumperExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
TaskConfiguration taskConfig = jobItemContext.getTaskConfig();
@@ -176,7 +174,7 @@
*
* @param jobConfig job configuration
*/
- public void cleanup(final RuleAlteredJobConfiguration jobConfig) {
+ public void cleanup(final MigrationJobConfiguration jobConfig) {
try {
PipelineJobPreparerUtils.destroyPosition(jobConfig.getSource());
} catch (final SQLException ex) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 0e35371..330e0dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -21,10 +21,10 @@
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -33,11 +33,12 @@
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
@@ -115,7 +116,7 @@
* @param jobConfig job configuration
* @return rule altered context
*/
- public static RuleAlteredContext createRuleAlteredContext(final RuleAlteredJobConfiguration jobConfig) {
+ public static MigrationContext createRuleAlteredContext(final MigrationJobConfiguration jobConfig) {
YamlRootConfiguration targetRootConfig = getYamlRootConfig(jobConfig);
YamlRuleConfiguration yamlRuleConfig = null;
for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
@@ -135,7 +136,7 @@
log.error("rule altered action enabled but actor is not configured, ignored, ruleConfig={}", ruleConfig);
throw new PipelineJobCreationException("rule altered actor not configured");
}
- return new RuleAlteredContext(jobConfig.getJobId(), onRuleAlteredActionConfig.get());
+ return new MigrationContext(jobConfig.getJobId(), onRuleAlteredActionConfig.get());
}
/**
@@ -144,7 +145,7 @@
* @param jobConfig job configuration
* @return YAML root configuration
*/
- private static YamlRootConfiguration getYamlRootConfig(final RuleAlteredJobConfiguration jobConfig) {
+ private static YamlRootConfiguration getYamlRootConfig(final MigrationJobConfiguration jobConfig) {
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
if (targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
return ((ShardingSpherePipelineDataSourceConfiguration) targetDataSourceConfig).getRootConfig();
@@ -165,9 +166,9 @@
log.warn("There is uncompleted job with the same database name, please handle it first, current job will be ignored");
return;
}
- Optional<RuleAlteredJobConfiguration> jobConfig = createJobConfig(event);
+ Optional<MigrationJobConfiguration> jobConfig = createJobConfig(event);
if (jobConfig.isPresent()) {
- RuleAlteredJobAPIFactory.getInstance().start(jobConfig.get());
+ MigrationJobAPIFactory.getInstance().start(jobConfig.get());
} else {
log.info("Switch rule configuration immediately.");
ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getDatabaseName(), event.getActiveVersion(), event.getNewVersion());
@@ -175,7 +176,7 @@
}
}
- private Optional<RuleAlteredJobConfiguration> createJobConfig(final StartScalingEvent event) {
+ private Optional<MigrationJobConfiguration> createJobConfig(final StartScalingEvent event) {
YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getSourceDataSource(), event.getSourceRule());
YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getTargetDataSource(), event.getTargetRule());
Map<String, List<String>> alteredRuleYamlClassNameTablesMap = new HashMap<>();
@@ -199,7 +200,7 @@
log.error("more than 1 rule altered");
throw new PipelineJobCreationException("more than 1 rule altered");
}
- YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
+ YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setDatabaseName(event.getDatabaseName());
result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
result.setActiveVersion(event.getActiveVersion());
@@ -207,7 +208,7 @@
result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
- return Optional.of(new RuleAlteredJobConfigurationSwapper().swapToObject(result));
+ return Optional.of(new YamlMigrationJobConfigurationSwapper().swapToObject(result));
}
private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules,
@@ -254,18 +255,18 @@
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
- public static TaskConfiguration buildTaskConfig(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+ public static TaskConfiguration buildTaskConfig(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
return RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig, jobShardingItem, pipelineProcessConfig);
}
private boolean hasUncompletedJobOfSameDatabaseName(final String databaseName) {
boolean result = false;
- for (JobInfo each : RuleAlteredJobAPIFactory.getInstance().list()) {
- if (RuleAlteredJobAPIFactory.getInstance().getJobProgress(each.getJobId()).values().stream()
+ for (JobInfo each : MigrationJobAPIFactory.getInstance().list()) {
+ if (MigrationJobAPIFactory.getInstance().getJobProgress(each.getJobId()).values().stream()
.allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
continue;
}
- RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(each.getJobParameter());
+ MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(each.getJobParameter());
if (databaseName.equals(jobConfig.getDatabaseName())) {
result = true;
break;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
index 76f42a0..90857b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
index 76f42a0..90857b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
deleted file mode 100644
index 76f42a0..0000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
similarity index 90%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
index dbf2c59..90857b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.fixture.RuleAlteredJobAPIFixture
+org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/MigrationJobAPIFactoryTest.java
similarity index 76%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/MigrationJobAPIFactoryTest.java
index 6a8459b..06ca59c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/MigrationJobAPIFactoryTest.java
@@ -17,17 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.fixture.RuleAlteredJobAPIFixture;
+import org.apache.shardingsphere.data.pipeline.core.fixture.MigrationJobAPIFixture;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
-public final class RuleAlteredJobAPIFactoryTest {
+public final class MigrationJobAPIFactoryTest {
@Test
public void assertGetInstance() {
- assertThat(RuleAlteredJobAPIFactory.getInstance(), instanceOf(RuleAlteredJobAPIFixture.class));
+ assertThat(MigrationJobAPIFactory.getInstance(), instanceOf(MigrationJobAPIFixture.class));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
similarity index 84%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 1856230..7588730 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -18,16 +18,16 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import java.util.Collection;
import java.util.List;
@@ -35,7 +35,7 @@
import java.util.Optional;
import java.util.Properties;
-public final class RuleAlteredJobAPIFixture implements RuleAlteredJobAPI {
+public final class MigrationJobAPIFixture implements MigrationJobAPI {
@Override
public String marshalJobId(final PipelineJobId pipelineJobId) {
@@ -74,7 +74,7 @@
}
@Override
- public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final RuleAlteredJobConfiguration jobConfig) {
+ public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final MigrationJobConfiguration jobConfig) {
return null;
}
@@ -83,7 +83,7 @@
}
@Override
- public void stopClusterWriteDB(final RuleAlteredJobConfiguration jobConfig) {
+ public void stopClusterWriteDB(final MigrationJobConfiguration jobConfig) {
}
@Override
@@ -91,7 +91,7 @@
}
@Override
- public void restoreClusterWriteDB(final RuleAlteredJobConfiguration jobConfig) {
+ public void restoreClusterWriteDB(final MigrationJobConfiguration jobConfig) {
}
@Override
@@ -105,7 +105,7 @@
}
@Override
- public boolean isDataConsistencyCheckNeeded(final RuleAlteredJobConfiguration jobConfig) {
+ public boolean isDataConsistencyCheckNeeded(final MigrationJobConfiguration jobConfig) {
return false;
}
@@ -115,7 +115,7 @@
}
@Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final MigrationJobConfiguration jobConfig) {
return null;
}
@@ -134,7 +134,7 @@
}
@Override
- public void switchClusterConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ public void switchClusterConfiguration(final MigrationJobConfiguration jobConfig) {
}
@Override
@@ -142,13 +142,13 @@
}
@Override
- public RuleAlteredJobConfiguration getJobConfiguration(final String jobId) {
+ public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return null;
}
@Override
public boolean isDefault() {
- return RuleAlteredJobAPI.super.isDefault();
+ return MigrationJobAPI.super.isDefault();
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 9712866..2b9ceeb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -18,8 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -29,9 +28,9 @@
@Test
public void assertCodec() {
- RuleAlteredJobId pipelineJobId = new RuleAlteredJobId();
+ MigrationJobId pipelineJobId = new MigrationJobId();
pipelineJobId.setTypeCode(JobType.MIGRATION.getTypeCode());
- pipelineJobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+ pipelineJobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
pipelineJobId.setDatabaseName("sharding_db");
pipelineJobId.setCurrentMetadataVersion(0);
pipelineJobId.setNewMetadataVersion(1);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
similarity index 90%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
index dbf2c59..d4f8cbc7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.fixture.RuleAlteredJobAPIFixture
+org.apache.shardingsphere.data.pipeline.core.fixture.MigrationJobAPIFixture
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 64995ee..5d55e49 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -53,7 +53,7 @@
private PrepareTargetTablesParameter prepareTargetTablesParameter;
@Mock
- private RuleAlteredJobConfiguration jobConfig;
+ private MigrationJobConfiguration jobConfig;
@Mock
private PipelineDataSourceConfiguration sourcePipelineDataSourceConfig;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 4133515..3117e8a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.api.impl;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
@@ -33,7 +33,7 @@
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.BeforeClass;
@@ -67,7 +67,7 @@
@Test
public void assertPersistJobProgress() {
- RuleAlteredJobContext jobItemContext = mockJobItemContext();
+ MigrationJobItemContext jobItemContext = mockJobItemContext();
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue");
String actual = governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
assertThat(actual, is("testValue"));
@@ -75,7 +75,7 @@
@Test
public void assertPersistJobCheckResult() {
- RuleAlteredJobContext jobItemContext = mockJobItemContext();
+ MigrationJobItemContext jobItemContext = mockJobItemContext();
governanceRepositoryAPI.persistJobCheckResult(jobItemContext.getJobId(), true);
Optional<Boolean> checkResult = governanceRepositoryAPI.getJobCheckResult(jobItemContext.getJobId());
assertTrue(checkResult.isPresent() && checkResult.get());
@@ -118,15 +118,15 @@
@Test
public void assertGetShardingItems() {
- RuleAlteredJobContext jobItemContext = mockJobItemContext();
+ MigrationJobItemContext jobItemContext = mockJobItemContext();
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue");
List<Integer> shardingItems = governanceRepositoryAPI.getShardingItems(jobItemContext.getJobId());
assertThat(shardingItems.size(), is(1));
assertThat(shardingItems.get(0), is(jobItemContext.getShardingItem()));
}
- private RuleAlteredJobContext mockJobItemContext() {
- RuleAlteredJobContext result = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(),
+ private MigrationJobItemContext mockJobItemContext() {
+ MigrationJobItemContext result = new MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(),
0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
TaskConfiguration taskConfig = result.getTaskConfig();
result.getInventoryTasks().add(mockInventoryTask(taskConfig));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
similarity index 67%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 8a0ce93..6466aab 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -22,7 +22,7 @@
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -30,14 +30,14 @@
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -61,19 +61,19 @@
import static org.junit.Assert.assertTrue;
@Slf4j
-public final class RuleAlteredJobAPIImplTest {
+public final class MigrationJobAPIImplTest {
- private static RuleAlteredJobAPI ruleAlteredJobAPI;
+ private static MigrationJobAPI jobAPI;
@BeforeClass
public static void beforeClass() {
PipelineContextUtil.mockModeConfigAndContextManager();
- ruleAlteredJobAPI = RuleAlteredJobAPIFactory.getInstance();
+ jobAPI = MigrationJobAPIFactory.getInstance();
}
@Test
public void assertStartAndList() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
JobInfo jobInfo = getNonNullJobInfo(jobId.get());
assertTrue(jobInfo.isActive());
@@ -82,7 +82,7 @@
}
private Optional<JobInfo> getJobInfo(final String jobId) {
- return ruleAlteredJobAPI.list().stream().filter(each -> Objects.equals(each.getJobId(), jobId)).reduce((a, b) -> a);
+ return jobAPI.list().stream().filter(each -> Objects.equals(each.getJobId(), jobId)).reduce((a, b) -> a);
}
private JobInfo getNonNullJobInfo(final String jobId) {
@@ -93,64 +93,64 @@
@Test
public void assertStartOrStopById() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
assertTrue(getNonNullJobInfo(jobId.get()).isActive());
- ruleAlteredJobAPI.stop(jobId.get());
+ jobAPI.stop(jobId.get());
assertFalse(getNonNullJobInfo(jobId.get()).isActive());
- ruleAlteredJobAPI.startDisabledJob(jobId.get());
+ jobAPI.startDisabledJob(jobId.get());
assertTrue(getNonNullJobInfo(jobId.get()).isActive());
}
@Test
public void assertRemove() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
assertTrue(getJobInfo(jobId.get()).isPresent());
- ruleAlteredJobAPI.stop(jobId.get());
- ruleAlteredJobAPI.remove(jobId.get());
+ jobAPI.stop(jobId.get());
+ jobAPI.remove(jobId.get());
assertFalse(getJobInfo(jobId.get()).isPresent());
}
@Test
public void assertGetProgress() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap = ruleAlteredJobAPI.getJobProgress(jobId.get());
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap = jobAPI.getJobProgress(jobId.get());
assertThat(jobProgressMap.size(), is(1));
}
@Test
public void assertIsDataConsistencyCheckNeeded() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- assertTrue(ruleAlteredJobAPI.isDataConsistencyCheckNeeded(jobId.get()));
+ assertTrue(jobAPI.isDataConsistencyCheckNeeded(jobId.get()));
}
@Test
public void assertDataConsistencyCheck() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfiguration(jobId.get());
+ MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
if (null == jobConfig.getSource()) {
log.error("source is null, jobConfig={}", YamlEngine.marshal(jobConfig));
}
initTableData(jobConfig);
- ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
- Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get());
- ruleAlteredJobAPI.restoreClusterWriteDB(jobConfig);
+ jobAPI.stopClusterWriteDB(jobConfig);
+ Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobId.get());
+ jobAPI.restoreClusterWriteDB(jobConfig);
assertThat(checkResultMap.size(), is(1));
}
@Test
public void assertDataConsistencyCheckWithAlgorithm() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfiguration(jobId.get());
+ MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
- ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
- Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
- ruleAlteredJobAPI.restoreClusterWriteDB(jobConfig);
+ jobAPI.stopClusterWriteDB(jobConfig);
+ Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
+ jobAPI.restoreClusterWriteDB(jobConfig);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(2L));
@@ -159,7 +159,7 @@
@Test
public void assertAggregateEmptyDataConsistencyCheckResults() {
- assertFalse(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap()));
+ assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap()));
}
@Test
@@ -170,7 +170,7 @@
Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
checkResults.put("bar_tbl", new DataConsistencyCheckResult(notEqualCountCheckResult, equalContentCheckResult));
- assertFalse(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
+ assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
}
@Test
@@ -181,7 +181,7 @@
Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
checkResults.put("bar_tbl", new DataConsistencyCheckResult(equalCountCheckResult, notEqualContentCheckResult));
- assertFalse(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
+ assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
}
@Test
@@ -191,34 +191,34 @@
Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
checkResults.put("bar_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
- assertTrue(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
+ assertTrue(jobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
}
@Test(expected = PipelineVerifyFailedException.class)
public void assertSwitchClusterConfigurationAlreadyFinished() {
- final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
+ final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
final GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobItemContext = new RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
- ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
+ MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+ jobAPI.persistJobItemProgress(jobItemContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
- ruleAlteredJobAPI.updateJobItemStatus(jobId.get(), 0, JobStatus.FINISHED);
- ruleAlteredJobAPI.switchClusterConfiguration(jobId.get());
+ jobAPI.updateJobItemStatus(jobId.get(), 0, JobStatus.FINISHED);
+ jobAPI.switchClusterConfiguration(jobId.get());
}
@Test
public void assertSwitchClusterConfigurationSucceed() {
- final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
+ final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobItemContext = new RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
- ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
+ MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+ jobAPI.persistJobItemProgress(jobItemContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
- ruleAlteredJobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
- ruleAlteredJobAPI.switchClusterConfiguration(jobId.get());
- Map<Integer, InventoryIncrementalJobItemProgress> progress = ruleAlteredJobAPI.getJobProgress(jobId.get());
+ jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
+ jobAPI.switchClusterConfiguration(jobId.get());
+ Map<Integer, InventoryIncrementalJobItemProgress> progress = jobAPI.getJobProgress(jobId.get());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : progress.entrySet()) {
assertSame(entry.getValue().getStatus(), JobStatus.FINISHED);
}
@@ -226,18 +226,18 @@
@Test
public void assertResetTargetTable() {
- Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfiguration(jobId.get());
+ MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
- ruleAlteredJobAPI.stop(jobId.get());
- ruleAlteredJobAPI.reset(jobId.get());
- Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobConfig);
+ jobAPI.stop(jobId.get());
+ jobAPI.reset(jobId.get());
+ Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobConfig);
assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(2L));
}
@SneakyThrows(SQLException.class)
- private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
+ private void initTableData(final MigrationJobConfiguration jobConfig) {
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
initTableData(PipelineDataSourceCreatorFactory.getInstance(sourceDataSourceConfig.getType()).createPipelineDataSource(sourceDataSourceConfig.getDataSourceConfiguration()));
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
@@ -256,11 +256,11 @@
@Test
public void assertRenewJobStatus() {
- final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- RuleAlteredJobContext jobItemContext = new RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
- ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
- ruleAlteredJobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
- InventoryIncrementalJobItemProgress actual = ruleAlteredJobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+ final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+ jobAPI.persistJobItemProgress(jobItemContext);
+ jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
+ InventoryIncrementalJobItemProgress actual = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
assertNotNull(actual);
assertThat(actual.getStatus(), is(JobStatus.FINISHED));
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 743e5ee..373c74c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -18,14 +18,14 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -53,8 +53,8 @@
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
}
- private RuleAlteredJobConfiguration createJobConfiguration() throws SQLException {
- RuleAlteredJobContext jobItemContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0,
+ private MigrationJobConfiguration createJobConfiguration() throws SQLException {
+ MigrationJobItemContext jobItemContext = new MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0,
new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
index d7b80a3..e9eaf95 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -39,7 +39,7 @@
public final class DefaultPipelineDataSourceManagerTest {
- private RuleAlteredJobConfiguration jobConfig;
+ private MigrationJobConfiguration jobConfig;
@BeforeClass
public static void beforeClass() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index ec3d2ef..0645d0f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
index 9d0ae14..f0566a6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index e1f29a5..98b4cee 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
index 9b09961..5dc9c17 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index d9545f6..93c2b82 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -18,12 +18,12 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -43,7 +43,7 @@
private static FinishedCheckJob finishedCheckJob;
@Mock
- private RuleAlteredJobAPI ruleAlteredJobAPI;
+ private MigrationJobAPI jobAPI;
@BeforeClass
public static void beforeClass() {
@@ -54,26 +54,26 @@
@Before
@SneakyThrows(ReflectiveOperationException.class)
public void setUp() {
- ReflectionUtil.setFieldValue(finishedCheckJob, "ruleAlteredJobAPI", ruleAlteredJobAPI);
+ ReflectionUtil.setFieldValue(finishedCheckJob, "jobAPI", jobAPI);
}
@Test
public void assertExecuteAllDisabledJob() {
- Optional<String> jobId = RuleAlteredJobAPIFactory.getInstance().start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = MigrationJobAPIFactory.getInstance().start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- List<JobInfo> jobInfos = RuleAlteredJobAPIFactory.getInstance().list();
+ List<JobInfo> jobInfos = MigrationJobAPIFactory.getInstance().list();
jobInfos.forEach(each -> each.setActive(false));
- when(ruleAlteredJobAPI.list()).thenReturn(jobInfos);
+ when(jobAPI.list()).thenReturn(jobInfos);
finishedCheckJob.execute(null);
}
@Test
public void assertExecuteActiveJob() {
- Optional<String> jobId = RuleAlteredJobAPIFactory.getInstance().start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = MigrationJobAPIFactory.getInstance().start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- List<JobInfo> jobInfos = RuleAlteredJobAPIFactory.getInstance().list();
+ List<JobInfo> jobInfos = MigrationJobAPIFactory.getInstance().list();
jobInfos.forEach(each -> each.setActive(true));
- when(ruleAlteredJobAPI.list()).thenReturn(jobInfos);
+ when(jobAPI.list()).thenReturn(jobInfos);
finishedCheckJob.execute(null);
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
similarity index 91%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 449e484..a2ab03f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
+package org.apache.shardingsphere.data.pipeline.core.prepare;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -46,7 +46,7 @@
public final class InventoryTaskSplitterTest {
- private RuleAlteredJobContext jobItemContext;
+ private MigrationJobItemContext jobItemContext;
private TaskConfiguration taskConfig;
@@ -67,9 +67,9 @@
}
private void initJobItemContext() {
- RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- InventoryIncrementalJobItemProgress initProgress = RuleAlteredJobAPIFactory.getInstance().getJobItemProgress(jobConfig.getJobId(), 0);
- jobItemContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, new DefaultPipelineDataSourceManager());
+ MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ InventoryIncrementalJobItemProgress initProgress = MigrationJobAPIFactory.getInstance().getJobItemProgress(jobConfig.getJobId(), 0);
+ jobItemContext = new MigrationJobItemContext(jobConfig, 0, initProgress, new DefaultPipelineDataSourceManager());
dataSourceManager = jobItemContext.getDataSourceManager();
taskConfig = jobItemContext.getTaskConfig();
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index bbbd8b3..bc9bddf 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.task;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
@@ -26,7 +26,7 @@
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -48,7 +48,7 @@
@Before
public void setUp() {
- TaskConfiguration taskConfig = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new InventoryIncrementalJobItemProgress(),
+ TaskConfiguration taskConfig = new MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0, new InventoryIncrementalJobItemProgress(),
new DefaultPipelineDataSourceManager()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index fa507f8..5809f42 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.task;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
@@ -30,7 +30,7 @@
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -62,7 +62,8 @@
@Before
public void setUp() {
- taskConfig = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager()).getTaskConfig();
+ taskConfig = new MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0,
+ new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager()).getTaskConfig();
}
@Test(expected = IngestException.class)
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index d20d2ca..3c8cd16 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -19,9 +19,9 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -44,8 +44,8 @@
*
* @return created job configuration
*/
- public static RuleAlteredJobConfiguration createJobConfiguration() {
- YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
+ public static MigrationJobConfiguration createJobConfiguration() {
+ YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setDatabaseName("logic_db");
result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")));
int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
@@ -56,7 +56,7 @@
new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
- return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
+ return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
}
private static YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final PipelineDataSourceConfiguration config) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparerTest.java
deleted file mode 100644
index d3591f5..0000000
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparerTest.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-public final class RuleAlteredJobPreparerTest {
- // TODO simple unit test can't cover prepare() method, it's a combination method
-}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index a05b884..df383c2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -18,15 +18,14 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import org.apache.commons.io.FileUtils;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
@@ -34,6 +33,8 @@
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
@@ -61,9 +62,9 @@
@Test(expected = PipelineJobCreationException.class)
public void assertCreateRuleAlteredContextNoAlteredRule() {
- RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- RuleAlteredJobConfigurationSwapper swapper = new RuleAlteredJobConfigurationSwapper();
- YamlRuleAlteredJobConfiguration yamlJobConfig = swapper.swapToYamlConfiguration(jobConfig);
+ MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ YamlMigrationJobConfigurationSwapper swapper = new YamlMigrationJobConfigurationSwapper();
+ YamlMigrationJobConfiguration yamlJobConfig = swapper.swapToYamlConfiguration(jobConfig);
yamlJobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
RuleAlteredJobWorker.createRuleAlteredContext(swapper.swapToObject(yamlJobConfig));
}
@@ -97,11 +98,11 @@
// TODO improve assertHasUncompletedJob, refactor hasUncompletedJobOfSameDatabaseName for easier unit test
// @Test
public void assertHasUncompletedJob() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException {
- final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- RuleAlteredJobContext jobItemContext = new RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+ final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
jobItemContext.setStatus(JobStatus.PREPARING);
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+ MigrationJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
URL jobConfigUrl = getClass().getClassLoader().getResource("scaling/rule_alter/scaling_job_config.yaml");
assertNotNull(jobConfigUrl);
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobItemContext.getJobId()), FileUtils.readFileToString(new File(jobConfigUrl.getFile()), StandardCharsets.UTF_8));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/logback-test.xml
index 40ecd46..bffa220 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/logback-test.xml
@@ -23,8 +23,8 @@
</encoder>
</appender>
<logger name="org.apache.zookeeper" level="off" />
- <logger name="org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter" level="off" />
- <logger name="org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl" level="off" />
+ <logger name="org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter" level="off" />
+ <logger name="org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl" level="off" />
<logger name="org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper" level="off" />
<root>