blob: 5e2ff9914d07c4e4c624e49dcbfd04e16b3ba12b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.DuplicateStorageUnitException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
* Migration job API.
*/
@Slf4j
public final class MigrationJobAPI implements TransmissionJobAPI {
private final PipelineJobManager jobManager;
private final PipelineJobConfigurationManager jobConfigManager;
private final PipelineDataSourcePersistService dataSourcePersistService;
public MigrationJobAPI() {
PipelineJobType jobType = new MigrationJobType();
jobManager = new PipelineJobManager(jobType);
jobConfigManager = new PipelineJobConfigurationManager(jobType);
dataSourcePersistService = new PipelineDataSourcePersistService();
}
/**
* Start migration job.
*
* @param contextKey context key
* @param param create migration job parameter
* @return job id
*/
public String start(final PipelineContextKey contextKey, final MigrateTableStatement param) {
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param));
jobManager.start(jobConfig);
return jobConfig.getJobId();
}
private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setTargetDatabaseName(param.getTargetDatabaseName());
Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
.thenComparing(each -> DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
for (SourceTargetEntry each : sourceTargetEntries) {
sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
() -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName()));
String dataSourceName = each.getSource().getDataSourceName();
if (configSources.containsKey(dataSourceName)) {
continue;
}
ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
() -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.isSchemaAvailable()) {
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
}
DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
if (null == result.getSourceDatabaseType()) {
result.setSourceDatabaseType(sourceDatabaseType.getType());
} else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
throw new PipelineInvalidParameterException("Source storage units have different database types");
}
}
result.setSources(configSources);
ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
.map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
return result;
}
private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) {
YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
result.setType(type);
result.setParameter(param);
return result;
}
private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
}
YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations());
return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
}
private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
ShardingSpherePreconditions.checkNotEmpty(rules, () -> new EmptyRuleException(databaseName));
YamlRootConfiguration result = new YamlRootConfiguration();
result.setDatabaseName(databaseName);
result.setDataSources(yamlDataSources);
result.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
return result;
}
private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) {
Map<String, String> result = new LinkedHashMap<>();
sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName()));
return result;
}
/**
* Register migration source storage units.
*
* @param contextKey context key
* @param propsMap data source pool properties map
*/
public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
if (existDataSources.containsKey(entry.getKey())) {
duplicateDataSourceNames.add(entry.getKey());
}
}
ShardingSpherePreconditions.checkMustEmpty(duplicateDataSourceNames, () -> new DuplicateStorageUnitException(contextKey.getDatabaseName(), duplicateDataSourceNames));
Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
result.putAll(propsMap);
dataSourcePersistService.persist(contextKey, getType(), result);
}
/**
* Drop migration source resources.
*
* @param contextKey context key
* @param resourceNames resource names
*/
public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
Collection<String> notExistedResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
ShardingSpherePreconditions.checkMustEmpty(notExistedResources, () -> new MissingRequiredStorageUnitsException(contextKey.getDatabaseName(), notExistedResources));
for (String each : resourceNames) {
metaDataDataSource.remove(each);
}
dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
}
/**
* Query migration source resources list.
*
* @param contextKey context key
* @return migration source resources
*/
public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
String dataSourceName = entry.getKey();
DataSourcePoolProperties value = entry.getValue();
Collection<Object> props = new LinkedList<>();
props.add(dataSourceName);
String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
DatabaseType databaseType = DatabaseTypeFactory.get(url);
props.add(databaseType.getType());
ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null);
props.add(connectionProps.getHostname());
props.add(connectionProps.getPort());
props.add(connectionProps.getCatalog());
Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties();
props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds"));
props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds"));
props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds"));
props.add(getStandardProperty(standardProps, "maxPoolSize"));
props.add(getStandardProperty(standardProps, "minPoolSize"));
props.add(getStandardProperty(standardProps, "readOnly"));
Map<String, Object> otherProps = value.getCustomProperties().getProperties();
props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps));
result.add(props);
}
return result;
}
private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
return standardProps.containsKey(key) && null != standardProps.get(key) ? standardProps.get(key).toString() : "";
}
@Override
public void commit(final String jobId) {
log.info("Commit job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
jobManager.stop(jobId);
dropCheckJobs(jobId);
MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
private void refreshTableMetadata(final String jobId, final String databaseName) {
// TODO use origin database name now, wait reloadDatabaseMetaData fix case-sensitive probelm
ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
contextManager.refreshTableMetaData(database);
}
@Override
public void rollback(final String jobId) throws SQLException {
final long startTimeMillis = System.currentTimeMillis();
dropCheckJobs(jobId);
cleanTempTableOnRollback(jobId);
jobManager.drop(jobId);
log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
}
private void dropCheckJobs(final String jobId) {
Collection<String> checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
if (checkJobIds.isEmpty()) {
return;
}
for (String each : checkJobIds) {
try {
jobManager.drop(each);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
}
}
}
private void cleanTempTableOnRollback(final String jobId) throws SQLException {
MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType())).getJobConfiguration(jobId);
PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
for (String each : jobConfig.getTargetTableNames()) {
String targetSchemaName = mapping.getSchemaName(each);
String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each);
log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql);
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
}
}
}
}
@Override
public String getType() {
return "MIGRATION";
}
}