blob: ed829da38a3a39606db7c67de78056f30f163cd8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.sharding.algorithm.sharding.datetime;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import org.apache.shardingsphere.infra.algorithm.core.exception.AlgorithmInitializationException;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;
import org.apache.shardingsphere.sharding.exception.data.InvalidDatetimeFormatException;
import org.apache.shardingsphere.sharding.exception.data.NullShardingValueException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Month;
import java.time.Year;
import java.time.YearMonth;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Interval sharding algorithm.
*/
public final class IntervalShardingAlgorithm implements StandardShardingAlgorithm<Comparable<?>> {
private static final String DATE_TIME_PATTERN_KEY = "datetime-pattern";
private static final String DATE_TIME_LOWER_KEY = "datetime-lower";
private static final String DATE_TIME_UPPER_KEY = "datetime-upper";
private static final String SHARDING_SUFFIX_FORMAT_KEY = "sharding-suffix-pattern";
private static final String INTERVAL_AMOUNT_KEY = "datetime-interval-amount";
private static final String INTERVAL_UNIT_KEY = "datetime-interval-unit";
private String dateTimePatternString;
private DateTimeFormatter dateTimeFormatter;
private int dateTimePatternLength;
private TemporalAccessor dateTimeLower;
private TemporalAccessor dateTimeUpper;
private DateTimeFormatter tableSuffixPattern;
private int stepAmount;
private ChronoUnit stepUnit;
@Override
public void init(final Properties props) {
dateTimePatternString = getDateTimePattern(props);
dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimePatternString);
dateTimePatternLength = dateTimePatternString.length();
dateTimeLower = getDateTimeLower(props, dateTimePatternString);
dateTimeUpper = getDateTimeUpper(props, dateTimePatternString);
tableSuffixPattern = getTableSuffixPattern(props);
stepAmount = Integer.parseInt(props.getOrDefault(INTERVAL_AMOUNT_KEY, 1).toString());
stepUnit = props.containsKey(INTERVAL_UNIT_KEY) ? getStepUnit(props.getProperty(INTERVAL_UNIT_KEY)) : ChronoUnit.DAYS;
}
private String getDateTimePattern(final Properties props) {
ShardingSpherePreconditions.checkContainsKey(props, DATE_TIME_PATTERN_KEY, () -> new AlgorithmInitializationException(this, String.format("%s can not be null", DATE_TIME_PATTERN_KEY)));
return props.getProperty(DATE_TIME_PATTERN_KEY);
}
private TemporalAccessor getDateTimeLower(final Properties props, final String dateTimePattern) {
ShardingSpherePreconditions.checkContainsKey(props, DATE_TIME_LOWER_KEY, () -> new AlgorithmInitializationException(this, String.format("%s can not be null.", DATE_TIME_LOWER_KEY)));
return getDateTime(DATE_TIME_LOWER_KEY, props.getProperty(DATE_TIME_LOWER_KEY), dateTimePattern);
}
private TemporalAccessor getDateTimeUpper(final Properties props, final String dateTimePattern) {
return props.containsKey(DATE_TIME_UPPER_KEY) ? getDateTime(DATE_TIME_UPPER_KEY, props.getProperty(DATE_TIME_UPPER_KEY), dateTimePattern) : LocalDateTime.now();
}
private TemporalAccessor getDateTime(final String dateTimeKey, final String dateTimeValue, final String dateTimePattern) {
try {
return dateTimeFormatter.parse(dateTimeValue);
} catch (final DateTimeParseException ignored) {
throw new InvalidDatetimeFormatException(dateTimeKey, dateTimeValue, dateTimePattern);
}
}
private DateTimeFormatter getTableSuffixPattern(final Properties props) {
String suffix = props.getProperty(SHARDING_SUFFIX_FORMAT_KEY);
ShardingSpherePreconditions.checkNotEmpty(suffix, () -> new AlgorithmInitializationException(this, String.format("%s can not be null or empty.", SHARDING_SUFFIX_FORMAT_KEY)));
return DateTimeFormatter.ofPattern(suffix);
}
private ChronoUnit getStepUnit(final String stepUnit) {
for (ChronoUnit each : ChronoUnit.values()) {
if (each.toString().equalsIgnoreCase(stepUnit)) {
return each;
}
}
throw new UnsupportedSQLOperationException(String.format("Cannot find step unit for specified %s property: `%s`", INTERVAL_UNIT_KEY, stepUnit));
}
@Override
public String doSharding(final Collection<String> availableTargetNames, final PreciseShardingValue<Comparable<?>> shardingValue) {
ShardingSpherePreconditions.checkNotNull(shardingValue.getValue(), NullShardingValueException::new);
return doSharding(availableTargetNames, Range.singleton(shardingValue.getValue())).stream().findFirst().orElse(null);
}
@Override
public Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeShardingValue<Comparable<?>> shardingValue) {
return doSharding(availableTargetNames, shardingValue.getValueRange());
}
private Collection<String> doSharding(final Collection<String> availableTargetNames, final Range<Comparable<?>> range) {
TemporalAccessor calculateTime = dateTimeLower;
if (!calculateTime.isSupported(ChronoField.NANO_OF_DAY)) {
if (calculateTime.isSupported(ChronoField.EPOCH_DAY)) {
return doShardingInLocalDate(availableTargetNames, range, calculateTime);
}
if (calculateTime.isSupported(ChronoField.YEAR) && calculateTime.isSupported(ChronoField.MONTH_OF_YEAR)) {
return doShardingInYearMonth(availableTargetNames, range, calculateTime);
}
if (calculateTime.isSupported(ChronoField.YEAR)) {
return doShardingInYear(availableTargetNames, range, calculateTime);
}
if (calculateTime.isSupported(ChronoField.MONTH_OF_YEAR)) {
return doShardingInMonth(availableTargetNames, range, calculateTime);
}
}
if (!calculateTime.isSupported(ChronoField.EPOCH_DAY)) {
return doShardingInLocalTime(availableTargetNames, range, calculateTime);
}
return doShardingInLocalDateTime(availableTargetNames, range, calculateTime);
}
private Collection<String> doShardingInLocalDateTime(final Collection<String> availableTargetNames, final Range<Comparable<?>> range, final TemporalAccessor calculateTime) {
Set<String> result = new HashSet<>();
LocalDateTime calculateTimeAsView = LocalDateTime.from(calculateTime);
LocalDateTime dateTimeUpperAsLocalDateTime = LocalDateTime.from(dateTimeUpper);
LocalDateTime dateTimeLowerAsLocalDateTime = LocalDateTime.from(dateTimeLower);
while (!calculateTimeAsView.isAfter(dateTimeUpperAsLocalDateTime)) {
if (hasIntersection(Range.closedOpen(calculateTimeAsView, calculateTimeAsView.plus(stepAmount, stepUnit)), range, dateTimeLowerAsLocalDateTime, dateTimeUpperAsLocalDateTime)) {
result.addAll(getMatchedTables(calculateTimeAsView, availableTargetNames));
}
calculateTimeAsView = calculateTimeAsView.plus(stepAmount, stepUnit);
}
return result;
}
private Collection<String> doShardingInLocalTime(final Collection<String> availableTargetNames, final Range<Comparable<?>> range, final TemporalAccessor calculateTime) {
Set<String> result = new HashSet<>();
LocalTime dateTimeUpperAsLocalTime = dateTimeUpper.query(TemporalQueries.localTime());
LocalTime dateTimeLowerAsLocalTime = dateTimeLower.query(TemporalQueries.localTime());
LocalTime calculateTimeAsView = calculateTime.query(TemporalQueries.localTime());
while (!calculateTimeAsView.isAfter(dateTimeUpperAsLocalTime)) {
if (hasIntersection(Range.closedOpen(calculateTimeAsView, calculateTimeAsView.plus(stepAmount, stepUnit)), range, dateTimeLowerAsLocalTime, dateTimeUpperAsLocalTime)) {
result.addAll(getMatchedTables(calculateTimeAsView, availableTargetNames));
}
calculateTimeAsView = calculateTimeAsView.plus(stepAmount, stepUnit);
}
return result;
}
private Collection<String> doShardingInLocalDate(final Collection<String> availableTargetNames, final Range<Comparable<?>> range, final TemporalAccessor calculateTime) {
Set<String> result = new HashSet<>();
LocalDate dateTimeUpperAsLocalDate = dateTimeUpper.query(TemporalQueries.localDate());
LocalDate dateTimeLowerAsLocalDate = dateTimeLower.query(TemporalQueries.localDate());
LocalDate calculateTimeAsView = calculateTime.query(TemporalQueries.localDate());
while (!calculateTimeAsView.isAfter(dateTimeUpperAsLocalDate)) {
if (hasIntersection(Range.closedOpen(calculateTimeAsView, calculateTimeAsView.plus(stepAmount, stepUnit)), range, dateTimeLowerAsLocalDate, dateTimeUpperAsLocalDate)) {
result.addAll(getMatchedTables(calculateTimeAsView, availableTargetNames));
}
calculateTimeAsView = calculateTimeAsView.plus(stepAmount, stepUnit);
}
return result;
}
private Collection<String> doShardingInYear(final Collection<String> availableTargetNames, final Range<Comparable<?>> range, final TemporalAccessor calculateTime) {
Set<String> result = new HashSet<>();
Year dateTimeUpperAsYear = dateTimeUpper.query(Year::from);
Year dateTimeLowerAsYear = dateTimeLower.query(Year::from);
Year calculateTimeAsView = calculateTime.query(Year::from);
while (!calculateTimeAsView.isAfter(dateTimeUpperAsYear)) {
if (hasIntersection(Range.closedOpen(calculateTimeAsView, calculateTimeAsView.plus(stepAmount, stepUnit)), range, dateTimeLowerAsYear, dateTimeUpperAsYear)) {
result.addAll(getMatchedTables(calculateTimeAsView, availableTargetNames));
}
calculateTimeAsView = calculateTimeAsView.plus(stepAmount, stepUnit);
}
return result;
}
private Collection<String> doShardingInMonth(final Collection<String> availableTargetNames, final Range<Comparable<?>> range, final TemporalAccessor calculateTime) {
Set<String> result = new HashSet<>();
Month dateTimeUpperAsMonth = dateTimeUpper.query(Month::from);
Month dateTimeLowerAsMonth = dateTimeLower.query(Month::from);
Month calculateTimeAsView = calculateTime.query(Month::from);
while (calculateTimeAsView.getValue() <= dateTimeUpperAsMonth.getValue() && (calculateTimeAsView.getValue() + stepAmount) <= Month.DECEMBER.getValue()) {
if (hasIntersection(Range.closedOpen(calculateTimeAsView, calculateTimeAsView.plus(stepAmount)), range, dateTimeLowerAsMonth, dateTimeUpperAsMonth)) {
result.addAll(getMatchedTables(calculateTimeAsView, availableTargetNames));
}
calculateTimeAsView = calculateTimeAsView.plus(stepAmount);
}
return result;
}
private Collection<String> doShardingInYearMonth(final Collection<String> availableTargetNames, final Range<Comparable<?>> range, final TemporalAccessor calculateTime) {
Set<String> result = new HashSet<>();
YearMonth dateTimeUpperAsYearMonth = dateTimeUpper.query(YearMonth::from);
YearMonth dateTimeLowerAsYearMonth = dateTimeLower.query(YearMonth::from);
YearMonth calculateTimeAsView = calculateTime.query(YearMonth::from);
while (!calculateTimeAsView.isAfter(dateTimeUpperAsYearMonth)) {
if (hasIntersection(Range.closedOpen(calculateTimeAsView, calculateTimeAsView.plus(stepAmount, stepUnit)), range, dateTimeLowerAsYearMonth, dateTimeUpperAsYearMonth)) {
result.addAll(getMatchedTables(calculateTimeAsView, availableTargetNames));
}
calculateTimeAsView = calculateTimeAsView.plus(stepAmount, stepUnit);
}
return result;
}
private boolean hasIntersection(final Range<LocalDateTime> calculateRange, final Range<Comparable<?>> range, final LocalDateTime dateTimeLower, final LocalDateTime dateTimeUpper) {
LocalDateTime lower = range.hasLowerBound() ? parseLocalDateTime(range.lowerEndpoint()) : dateTimeLower;
LocalDateTime upper = range.hasUpperBound() ? parseLocalDateTime(range.upperEndpoint()) : dateTimeUpper;
BoundType lowerBoundType = range.hasLowerBound() ? range.lowerBoundType() : BoundType.CLOSED;
BoundType upperBoundType = range.hasUpperBound() ? range.upperBoundType() : BoundType.CLOSED;
Range<LocalDateTime> dateTimeRange = Range.range(lower, lowerBoundType, upper, upperBoundType);
return calculateRange.isConnected(dateTimeRange) && !calculateRange.intersection(dateTimeRange).isEmpty();
}
private boolean hasIntersection(final Range<LocalDate> calculateRange, final Range<Comparable<?>> range, final LocalDate dateTimeLower, final LocalDate dateTimeUpper) {
LocalDate lower = range.hasLowerBound() ? parseLocalDate(range.lowerEndpoint()) : dateTimeLower;
LocalDate upper = range.hasUpperBound() ? parseLocalDate(range.upperEndpoint()) : dateTimeUpper;
BoundType lowerBoundType = range.hasLowerBound() ? range.lowerBoundType() : BoundType.CLOSED;
BoundType upperBoundType = range.hasUpperBound() ? range.upperBoundType() : BoundType.CLOSED;
Range<LocalDate> dateTimeRange = Range.range(lower, lowerBoundType, upper, upperBoundType);
return calculateRange.isConnected(dateTimeRange) && !calculateRange.intersection(dateTimeRange).isEmpty();
}
private boolean hasIntersection(final Range<LocalTime> calculateRange, final Range<Comparable<?>> range, final LocalTime dateTimeLower, final LocalTime dateTimeUpper) {
LocalTime lower = range.hasLowerBound() ? parseLocalTime(range.lowerEndpoint()) : dateTimeLower;
LocalTime upper = range.hasUpperBound() ? parseLocalTime(range.upperEndpoint()) : dateTimeUpper;
BoundType lowerBoundType = range.hasLowerBound() ? range.lowerBoundType() : BoundType.CLOSED;
BoundType upperBoundType = range.hasUpperBound() ? range.upperBoundType() : BoundType.CLOSED;
Range<LocalTime> dateTimeRange = Range.range(lower, lowerBoundType, upper, upperBoundType);
return calculateRange.isConnected(dateTimeRange) && !calculateRange.intersection(dateTimeRange).isEmpty();
}
private boolean hasIntersection(final Range<Year> calculateRange, final Range<Comparable<?>> range, final Year dateTimeLower, final Year dateTimeUpper) {
Year lower = range.hasLowerBound() ? parseYear(range.lowerEndpoint()) : dateTimeLower;
Year upper = range.hasUpperBound() ? parseYear(range.upperEndpoint()) : dateTimeUpper;
BoundType lowerBoundType = range.hasLowerBound() ? range.lowerBoundType() : BoundType.CLOSED;
BoundType upperBoundType = range.hasUpperBound() ? range.upperBoundType() : BoundType.CLOSED;
Range<Year> dateTimeRange = Range.range(lower, lowerBoundType, upper, upperBoundType);
return calculateRange.isConnected(dateTimeRange) && !calculateRange.intersection(dateTimeRange).isEmpty();
}
private boolean hasIntersection(final Range<Month> calculateRange, final Range<Comparable<?>> range, final Month dateTimeLower, final Month dateTimeUpper) {
Month lower = range.hasLowerBound() ? parseMonth(range.lowerEndpoint()) : dateTimeLower;
Month upper = range.hasUpperBound() ? parseMonth(range.upperEndpoint()) : dateTimeUpper;
BoundType lowerBoundType = range.hasLowerBound() ? range.lowerBoundType() : BoundType.CLOSED;
BoundType upperBoundType = range.hasUpperBound() ? range.upperBoundType() : BoundType.CLOSED;
Range<Month> dateTimeRange = Range.range(lower, lowerBoundType, upper, upperBoundType);
return calculateRange.isConnected(dateTimeRange) && !calculateRange.intersection(dateTimeRange).isEmpty();
}
private boolean hasIntersection(final Range<YearMonth> calculateRange, final Range<Comparable<?>> range, final YearMonth dateTimeLower, final YearMonth dateTimeUpper) {
YearMonth lower = range.hasLowerBound() ? parseYearMonth(range.lowerEndpoint()) : dateTimeLower;
YearMonth upper = range.hasUpperBound() ? parseYearMonth(range.upperEndpoint()) : dateTimeUpper;
BoundType lowerBoundType = range.hasLowerBound() ? range.lowerBoundType() : BoundType.CLOSED;
BoundType upperBoundType = range.hasUpperBound() ? range.upperBoundType() : BoundType.CLOSED;
Range<YearMonth> dateTimeRange = Range.range(lower, lowerBoundType, upper, upperBoundType);
return calculateRange.isConnected(dateTimeRange) && !calculateRange.intersection(dateTimeRange).isEmpty();
}
private LocalDateTime parseLocalDateTime(final Comparable<?> endpoint) {
String dateTimeText = getDateTimeText(endpoint);
if (dateTimeText.length() >= dateTimePatternLength) {
return LocalDateTime.parse(dateTimeText.substring(0, dateTimePatternLength), dateTimeFormatter);
}
return LocalDateTime.parse(dateTimeText, createRelaxedDateTimeFormatter(dateTimeText));
}
private LocalDate parseLocalDate(final Comparable<?> endpoint) {
String dateTimeText = getDateTimeText(endpoint);
if (dateTimeText.length() >= dateTimePatternLength) {
return LocalDate.parse(dateTimeText.substring(0, dateTimePatternLength), dateTimeFormatter);
}
return LocalDate.parse(dateTimeText, createRelaxedDateTimeFormatter(dateTimeText));
}
private LocalTime parseLocalTime(final Comparable<?> endpoint) {
String dateTimeText = getDateTimeText(endpoint);
if (dateTimeText.length() >= dateTimePatternLength) {
return LocalTime.parse(dateTimeText.substring(0, dateTimePatternLength), dateTimeFormatter);
}
return LocalTime.parse(dateTimeText, createRelaxedDateTimeFormatter(dateTimeText));
}
private Year parseYear(final Comparable<?> endpoint) {
String dateTimeText = getDateTimeText(endpoint);
if (dateTimeText.length() >= dateTimePatternLength) {
return Year.parse(dateTimeText.substring(0, dateTimePatternLength), dateTimeFormatter);
}
return Year.parse(dateTimeText, createRelaxedDateTimeFormatter(dateTimeText));
}
private YearMonth parseYearMonth(final Comparable<?> endpoint) {
String dateTimeText = getDateTimeText(endpoint);
if (dateTimeText.length() >= dateTimePatternLength) {
return YearMonth.parse(dateTimeText.substring(0, dateTimePatternLength), dateTimeFormatter);
}
return YearMonth.parse(dateTimeText, createRelaxedDateTimeFormatter(dateTimeText));
}
/**
* After the sharding key is formatted as a {@link String},
* if the length of the {@link String} is less than `datetime-pattern`,
* it usually means there is a problem with the sharding key.
* @param endpoint A class carrying time information with an unknown class name.
* @return {@link java.time.Month}
*/
private Month parseMonth(final Comparable<?> endpoint) {
return Month.of(Integer.parseInt(getDateTimeText(endpoint).substring(0, dateTimePatternLength)));
}
/**
* When the sharding key is a {@link String} and the length of this {@link String} is less than the `datetime-pattern` set by the algorithm,
* ShardingSphere will try to use a substring of `datetime-pattern` to parse the sharding key.
* This is to be compatible with the behavior of ORM libraries such as <a href="https://github.com/go-gorm/gorm">go-gorm/gorm</a>.
* @param dateTimeText Sharding key with class name {@link String}
* @return Child `datetime-pattern`, the pattern length is consistent with the shard key.
*/
private DateTimeFormatter createRelaxedDateTimeFormatter(final String dateTimeText) {
String dateTimeFormatterString = dateTimePatternString.substring(0, dateTimeText.length());
return DateTimeFormatter.ofPattern(dateTimeFormatterString);
}
private String getDateTimeText(final Comparable<?> endpoint) {
if (endpoint instanceof Instant) {
return dateTimeFormatter.format(((Instant) endpoint).atZone(ZoneId.systemDefault()));
}
if (endpoint instanceof TemporalAccessor) {
return dateTimeFormatter.format((TemporalAccessor) endpoint);
}
if (endpoint instanceof java.sql.Date) {
return dateTimeFormatter.format(((java.sql.Date) endpoint).toLocalDate());
}
if (endpoint instanceof java.util.Date) {
return dateTimeFormatter.format(((java.util.Date) endpoint).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime());
}
return endpoint.toString();
}
private Collection<String> getMatchedTables(final TemporalAccessor dateTime, final Collection<String> availableTargetNames) {
String tableSuffix;
if (!dateTime.isSupported(ChronoField.NANO_OF_DAY)) {
if (dateTime.isSupported(ChronoField.EPOCH_DAY)) {
tableSuffix = tableSuffixPattern.format(dateTime.query(TemporalQueries.localDate()));
return availableTargetNames.parallelStream().filter(each -> each.endsWith(tableSuffix)).collect(Collectors.toSet());
}
if (dateTime.isSupported(ChronoField.YEAR) && dateTime.isSupported(ChronoField.MONTH_OF_YEAR)) {
tableSuffix = tableSuffixPattern.format(dateTime.query(YearMonth::from));
return availableTargetNames.parallelStream().filter(each -> each.endsWith(tableSuffix)).collect(Collectors.toSet());
}
if (dateTime.isSupported(ChronoField.YEAR)) {
tableSuffix = tableSuffixPattern.format(dateTime.query(Year::from));
return availableTargetNames.parallelStream().filter(each -> each.endsWith(tableSuffix)).collect(Collectors.toSet());
}
if (dateTime.isSupported(ChronoField.MONTH_OF_YEAR)) {
tableSuffix = tableSuffixPattern.format(dateTime.query(Month::from));
return availableTargetNames.parallelStream().filter(each -> each.endsWith(tableSuffix)).collect(Collectors.toSet());
}
}
if (!dateTime.isSupported(ChronoField.EPOCH_DAY)) {
tableSuffix = dateTime.query(TemporalQueries.localTime()).format(tableSuffixPattern);
return availableTargetNames.parallelStream().filter(each -> each.endsWith(tableSuffix)).collect(Collectors.toSet());
}
tableSuffix = LocalDateTime.from(dateTime).format(tableSuffixPattern);
return availableTargetNames.parallelStream().filter(each -> each.endsWith(tableSuffix)).collect(Collectors.toSet());
}
@Override
public String getType() {
return "INTERVAL";
}
}