| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.shardingsphere.readwritesplitting.distsql.handler.checker; |
| |
| import com.google.common.base.Strings; |
| import lombok.AccessLevel; |
| import lombok.NoArgsConstructor; |
| import org.apache.shardingsphere.infra.algorithm.loadbalancer.core.LoadBalanceAlgorithm; |
| import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; |
| import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException; |
| import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.DuplicateRuleException; |
| import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.InvalidRuleConfigurationException; |
| import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException; |
| import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredStrategyException; |
| import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; |
| import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData; |
| import org.apache.shardingsphere.infra.rule.attribute.datasource.DataSourceMapperRuleAttribute; |
| import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; |
| import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration; |
| import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration; |
| import org.apache.shardingsphere.readwritesplitting.api.transaction.TransactionalReadQueryStrategy; |
| import org.apache.shardingsphere.readwritesplitting.constant.ReadwriteSplittingDataSourceType; |
| import org.apache.shardingsphere.readwritesplitting.distsql.segment.ReadwriteSplittingRuleSegment; |
| import org.apache.shardingsphere.readwritesplitting.exception.ReadwriteSplittingRuleExceptionIdentifier; |
| import org.apache.shardingsphere.readwritesplitting.exception.actual.DuplicateReadwriteSplittingActualDataSourceException; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.LinkedList; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Readwrite-splitting rule statement checker. |
| */ |
| @NoArgsConstructor(access = AccessLevel.PRIVATE) |
| public final class ReadwriteSplittingRuleStatementChecker { |
| |
| /** |
| * Check create readwrite-splitting rule statement. |
| * |
| * @param database database |
| * @param segments segments |
| * @param currentRuleConfig current rule config |
| * @param ifNotExists rule if not exists |
| */ |
| public static void checkCreation(final ShardingSphereDatabase database, final Collection<ReadwriteSplittingRuleSegment> segments, |
| final ReadwriteSplittingRuleConfiguration currentRuleConfig, final boolean ifNotExists) { |
| checkDuplicateRuleNames(database, segments, currentRuleConfig, ifNotExists); |
| String databaseName = database.getName(); |
| checkDataSourcesExist(databaseName, segments, database); |
| checkDuplicatedDataSourceNames(databaseName, segments, currentRuleConfig, true); |
| checkTransactionalReadQueryStrategy(segments); |
| checkLoadBalancers(segments); |
| } |
| |
| /** |
| * Check alter readwrite-splitting rule statement. |
| * |
| * @param database database |
| * @param segments segments |
| * @param currentRuleConfig current rule config |
| */ |
| public static void checkAlteration(final ShardingSphereDatabase database, final Collection<ReadwriteSplittingRuleSegment> segments, final ReadwriteSplittingRuleConfiguration currentRuleConfig) { |
| String databaseName = database.getName(); |
| checkDuplicateRuleNamesWithSelf(databaseName, segments); |
| checkRuleNamesExist(segments, currentRuleConfig, databaseName); |
| checkDataSourcesExist(databaseName, segments, database); |
| checkDuplicatedDataSourceNames(databaseName, segments, currentRuleConfig, false); |
| checkTransactionalReadQueryStrategy(segments); |
| checkLoadBalancers(segments); |
| } |
| |
| private static void checkRuleNamesExist(final Collection<ReadwriteSplittingRuleSegment> segments, final ReadwriteSplittingRuleConfiguration currentRuleConfig, final String databaseName) { |
| Collection<String> requiredRuleNames = segments.stream().map(ReadwriteSplittingRuleSegment::getName).collect(Collectors.toList()); |
| Collection<String> currentRuleNames = currentRuleConfig.getDataSources().stream().map(ReadwriteSplittingDataSourceRuleConfiguration::getName).collect(Collectors.toList()); |
| Collection<String> notExistedRuleNames = requiredRuleNames.stream().filter(each -> !currentRuleNames.contains(each)).collect(Collectors.toSet()); |
| ShardingSpherePreconditions.checkMustEmpty(notExistedRuleNames, () -> new MissingRequiredRuleException("Readwrite-splitting", databaseName, notExistedRuleNames)); |
| } |
| |
| private static void checkDuplicateRuleNames(final ShardingSphereDatabase database, |
| final Collection<ReadwriteSplittingRuleSegment> segments, final ReadwriteSplittingRuleConfiguration currentRuleConfig, final boolean ifNotExists) { |
| checkDuplicateRuleNamesWithSelf(database.getName(), segments); |
| checkDuplicateRuleNamesWithExistsDataSources(database, segments); |
| if (!ifNotExists) { |
| checkDuplicateRuleNamesWithRuleConfiguration(database.getName(), currentRuleConfig, segments); |
| } |
| } |
| |
| private static void checkDuplicateRuleNamesWithSelf(final String databaseName, final Collection<ReadwriteSplittingRuleSegment> segments) { |
| Collection<String> duplicatedRuleNames = getDuplicated(segments.stream().map(ReadwriteSplittingRuleSegment::getName).collect(Collectors.toList())); |
| ShardingSpherePreconditions.checkMustEmpty(duplicatedRuleNames, () -> new DuplicateRuleException("Readwrite-splitting", databaseName, duplicatedRuleNames)); |
| } |
| |
| private static Collection<String> getDuplicated(final Collection<String> required) { |
| return required.stream().collect(Collectors.groupingBy(each -> each, Collectors.counting())).entrySet().stream() |
| .filter(each -> each.getValue() > 1).map(Entry::getKey).collect(Collectors.toSet()); |
| } |
| |
| private static void checkDuplicateRuleNamesWithExistsDataSources(final ShardingSphereDatabase database, final Collection<ReadwriteSplittingRuleSegment> segments) { |
| Collection<String> currentRuleNames = new HashSet<>(); |
| ResourceMetaData resourceMetaData = database.getResourceMetaData(); |
| if (null != resourceMetaData && null != resourceMetaData.getStorageUnits()) { |
| currentRuleNames.addAll(resourceMetaData.getStorageUnits().keySet()); |
| } |
| currentRuleNames.addAll(getLogicDataSources(database)); |
| Collection<String> toBeCreatedRuleNames = segments.stream().map(ReadwriteSplittingRuleSegment::getName).filter(currentRuleNames::contains).collect(Collectors.toList()); |
| ShardingSpherePreconditions.checkMustEmpty(toBeCreatedRuleNames, () -> new InvalidRuleConfigurationException("Readwrite-splitting", toBeCreatedRuleNames, |
| Collections.singleton(String.format("%s already exists in storage unit", toBeCreatedRuleNames)))); |
| } |
| |
| private static void checkDuplicateRuleNamesWithRuleConfiguration(final String databaseName, final ReadwriteSplittingRuleConfiguration currentRuleConfig, |
| final Collection<ReadwriteSplittingRuleSegment> segments) { |
| Collection<String> currentRuleNames = new LinkedList<>(); |
| if (null != currentRuleConfig) { |
| currentRuleNames.addAll(currentRuleConfig.getDataSources().stream().map(ReadwriteSplittingDataSourceRuleConfiguration::getName).collect(Collectors.toList())); |
| } |
| Collection<String> toBeCreatedRuleNames = segments.stream().map(ReadwriteSplittingRuleSegment::getName).filter(currentRuleNames::contains).collect(Collectors.toList()); |
| ShardingSpherePreconditions.checkMustEmpty(toBeCreatedRuleNames, () -> new DuplicateRuleException("Readwrite-splitting", databaseName, toBeCreatedRuleNames)); |
| } |
| |
| private static void checkDataSourcesExist(final String databaseName, final Collection<ReadwriteSplittingRuleSegment> segments, final ShardingSphereDatabase database) { |
| Collection<String> requiredDataSources = new LinkedHashSet<>(); |
| segments.forEach(each -> { |
| requiredDataSources.add(each.getWriteDataSource()); |
| requiredDataSources.addAll(each.getReadDataSources()); |
| }); |
| Collection<String> notExistedDataSources = database.getResourceMetaData().getNotExistedDataSources(requiredDataSources); |
| ShardingSpherePreconditions.checkMustEmpty(notExistedDataSources, () -> new MissingRequiredStorageUnitsException(databaseName, notExistedDataSources)); |
| } |
| |
| private static Collection<String> getLogicDataSources(final ShardingSphereDatabase database) { |
| Collection<String> result = new LinkedHashSet<>(); |
| for (DataSourceMapperRuleAttribute each : database.getRuleMetaData().getAttributes(DataSourceMapperRuleAttribute.class)) { |
| result.addAll(each.getDataSourceMapper().keySet()); |
| } |
| return result; |
| } |
| |
| private static void checkDuplicatedDataSourceNames(final String databaseName, final Collection<ReadwriteSplittingRuleSegment> segments, |
| final ReadwriteSplittingRuleConfiguration currentRuleConfig, final boolean isCreating) { |
| Collection<String> existedWriteDataSourceNames = new HashSet<>(); |
| Collection<String> existedReadDataSourceNames = new HashSet<>(); |
| if (null != currentRuleConfig) { |
| Collection<String> toBeAlteredRuleNames = isCreating ? Collections.emptySet() : getToBeAlteredRuleNames(segments); |
| for (ReadwriteSplittingDataSourceRuleConfiguration each : currentRuleConfig.getDataSources()) { |
| if (toBeAlteredRuleNames.contains(each.getName())) { |
| continue; |
| } |
| existedWriteDataSourceNames.add(each.getWriteDataSourceName()); |
| existedReadDataSourceNames.addAll(each.getReadDataSourceNames()); |
| } |
| } |
| checkDuplicateWriteDataSourceNames(segments, databaseName, existedWriteDataSourceNames); |
| checkDuplicateReadDataSourceNames(segments, databaseName, existedReadDataSourceNames); |
| } |
| |
| private static Collection<String> getToBeAlteredRuleNames(final Collection<ReadwriteSplittingRuleSegment> segments) { |
| return segments.stream().map(ReadwriteSplittingRuleSegment::getName).collect(Collectors.toSet()); |
| } |
| |
| private static void checkDuplicateWriteDataSourceNames(final Collection<ReadwriteSplittingRuleSegment> segments, final String databaseName, |
| final Collection<String> writeDataSourceNames) { |
| for (ReadwriteSplittingRuleSegment each : segments) { |
| if (Strings.isNullOrEmpty(each.getWriteDataSource())) { |
| continue; |
| } |
| String writeDataSource = each.getWriteDataSource(); |
| ShardingSpherePreconditions.checkState(writeDataSourceNames.add(writeDataSource), () -> new DuplicateReadwriteSplittingActualDataSourceException( |
| ReadwriteSplittingDataSourceType.WRITE, writeDataSource, new ReadwriteSplittingRuleExceptionIdentifier(databaseName, ""))); |
| } |
| } |
| |
| private static void checkDuplicateReadDataSourceNames(final Collection<ReadwriteSplittingRuleSegment> segments, final String databaseName, |
| final Collection<String> readDataSourceNames) { |
| for (ReadwriteSplittingRuleSegment each : segments) { |
| if (null != each.getReadDataSources()) { |
| checkDuplicateReadDataSourceNames(each, databaseName, readDataSourceNames); |
| } |
| } |
| } |
| |
| private static void checkDuplicateReadDataSourceNames(final ReadwriteSplittingRuleSegment segment, final String databaseName, |
| final Collection<String> readDataSourceNames) { |
| for (String each : segment.getReadDataSources()) { |
| ShardingSpherePreconditions.checkState(readDataSourceNames.add(each), () -> new DuplicateReadwriteSplittingActualDataSourceException( |
| ReadwriteSplittingDataSourceType.READ, each, new ReadwriteSplittingRuleExceptionIdentifier(databaseName, ""))); |
| } |
| } |
| |
| private static void checkTransactionalReadQueryStrategy(final Collection<ReadwriteSplittingRuleSegment> segments) { |
| Collection<String> validStrategyNames = Arrays.stream(TransactionalReadQueryStrategy.values()).map(Enum::name).collect(Collectors.toSet()); |
| for (ReadwriteSplittingRuleSegment each : segments) { |
| if (null != each.getTransactionalReadQueryStrategy()) { |
| ShardingSpherePreconditions.checkContains(validStrategyNames, each.getTransactionalReadQueryStrategy().toUpperCase(), |
| () -> new MissingRequiredStrategyException("Transactional read query", Collections.singleton(each.getTransactionalReadQueryStrategy()))); |
| } |
| } |
| } |
| |
| private static void checkLoadBalancers(final Collection<ReadwriteSplittingRuleSegment> segments) { |
| segments.stream().map(ReadwriteSplittingRuleSegment::getLoadBalancer).filter(Objects::nonNull) |
| .forEach(each -> TypedSPILoader.checkService(LoadBalanceAlgorithm.class, each.getName(), each.getProps())); |
| } |
| } |