| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.timeline.partition; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.FluentIterable; |
| import com.google.common.collect.Iterators; |
| import it.unimi.dsi.fastutil.objects.AbstractObjectCollection; |
| import it.unimi.dsi.fastutil.objects.ObjectCollection; |
| import it.unimi.dsi.fastutil.objects.ObjectIterator; |
| import it.unimi.dsi.fastutil.objects.ObjectIterators; |
| import it.unimi.dsi.fastutil.objects.ObjectSortedSet; |
| import it.unimi.dsi.fastutil.objects.ObjectSortedSets; |
| import it.unimi.dsi.fastutil.shorts.AbstractShort2ObjectSortedMap; |
| import it.unimi.dsi.fastutil.shorts.Short2ObjectMap; |
| import it.unimi.dsi.fastutil.shorts.Short2ObjectRBTreeMap; |
| import it.unimi.dsi.fastutil.shorts.Short2ObjectSortedMap; |
| import it.unimi.dsi.fastutil.shorts.ShortComparator; |
| import it.unimi.dsi.fastutil.shorts.ShortComparators; |
| import it.unimi.dsi.fastutil.shorts.ShortSortedSet; |
| import it.unimi.dsi.fastutil.shorts.ShortSortedSets; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.timeline.Overshadowable; |
| |
| import javax.annotation.Nullable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.NoSuchElementException; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.TreeMap; |
| |
| /** |
| * OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details. |
| * Note that an AtomicUpdateGroup can consist of {@link Overshadowable}s of the same majorVersion, minorVersion, |
| * rootPartition range, and atomicUpdateGroupSize. |
| * In {@link org.apache.druid.timeline.VersionedIntervalTimeline}, this class is used to manage segments in the same |
| * timeChunk. |
| * |
| * This class is not thread-safe. |
| */ |
| class OvershadowableManager<T extends Overshadowable<T>> |
| { |
| /** |
| * There are 3 states for atomicUpdateGroups. |
| * There could be at most one visible atomicUpdateGroup at any time in a non-empty overshadowableManager. |
| * |
| * - Visible: fully available atomicUpdateGroup of the highest version if any. |
| * If there's no fully available atomicUpdateGroup, the standby atomicUpdateGroup of the highest version |
| * becomes visible. |
| * - Standby: all atomicUpdateGroups of higher versions than that of the visible atomicUpdateGroup. |
| * - Overshadowed: all atomicUpdateGroups of lower versions than that of the visible atomicUpdateGroup. |
| */ |
| @VisibleForTesting |
| enum State |
| { |
| STANDBY, |
| VISIBLE, |
| OVERSHADOWED |
| } |
| |
| private final Map<Integer, PartitionChunk<T>> knownPartitionChunks; // served segments |
| |
| // (start partitionId, end partitionId) -> minorVersion -> atomicUpdateGroup |
| private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> standbyGroups; |
| /** |
| * The values in this map must always be {@link SingleEntryShort2ObjectSortedMap}, hence there is at most one visible |
| * group per range. The same type is used as in {@link #standbyGroups} and {@link #overshadowedGroups} to reuse helper |
| * functions across all {@link State}s. |
| */ |
| private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> visibleGroupPerRange; |
| private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> overshadowedGroups; |
| |
| OvershadowableManager() |
| { |
| this.knownPartitionChunks = new HashMap<>(); |
| this.standbyGroups = new TreeMap<>(); |
| this.visibleGroupPerRange = new TreeMap<>(); |
| this.overshadowedGroups = new TreeMap<>(); |
| } |
| |
| public static <T extends Overshadowable<T>> OvershadowableManager<T> copyVisible(OvershadowableManager<T> original) |
| { |
| final OvershadowableManager<T> copy = new OvershadowableManager<>(); |
| original.visibleGroupPerRange.forEach((partitionRange, versionToGroups) -> { |
| // There should be only one group per partition range |
| final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next(); |
| group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk)); |
| |
| copy.visibleGroupPerRange.put( |
| partitionRange, |
| new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group)) |
| ); |
| }); |
| return copy; |
| } |
| |
| public static <T extends Overshadowable<T>> OvershadowableManager<T> deepCopy(OvershadowableManager<T> original) |
| { |
| final OvershadowableManager<T> copy = copyVisible(original); |
| original.overshadowedGroups.forEach((partitionRange, versionToGroups) -> { |
| // There should be only one group per partition range |
| final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next(); |
| group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk)); |
| |
| copy.overshadowedGroups.put( |
| partitionRange, |
| new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group)) |
| ); |
| }); |
| original.standbyGroups.forEach((partitionRange, versionToGroups) -> { |
| // There should be only one group per partition range |
| final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next(); |
| group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk)); |
| |
| copy.standbyGroups.put( |
| partitionRange, |
| new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group)) |
| ); |
| }); |
| return copy; |
| } |
| |
| private TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> getStateMap(State state) |
| { |
| switch (state) { |
| case STANDBY: |
| return standbyGroups; |
| case VISIBLE: |
| return visibleGroupPerRange; |
| case OVERSHADOWED: |
| return overshadowedGroups; |
| default: |
| throw new ISE("Unknown state[%s]", state); |
| } |
| } |
| |
| private Short2ObjectSortedMap<AtomicUpdateGroup<T>> createMinorVersionToAugMap(State state) |
| { |
| switch (state) { |
| case STANDBY: |
| case OVERSHADOWED: |
| return new Short2ObjectRBTreeMap<>(); |
| case VISIBLE: |
| return new SingleEntryShort2ObjectSortedMap<>(); |
| default: |
| throw new ISE("Unknown state[%s]", state); |
| } |
| } |
| |
| private void transitAtomicUpdateGroupState(AtomicUpdateGroup<T> atomicUpdateGroup, State from, State to) |
| { |
| Preconditions.checkNotNull(atomicUpdateGroup, "atomicUpdateGroup"); |
| Preconditions.checkArgument(!atomicUpdateGroup.isEmpty(), "empty atomicUpdateGroup"); |
| |
| removeFrom(atomicUpdateGroup, from); |
| addAtomicUpdateGroupWithState(atomicUpdateGroup, to, false); |
| } |
| |
| /** |
| * Replace the oldVisibleGroups with the newVisibleGroups. |
| * This method first removes the oldVisibleGroups from the visibles map, |
| * moves the newVisibleGroups from its old state map to the visibles map, |
| * and finally add the oldVisibleGroups to its new state map. |
| */ |
| private void replaceVisibleWith( |
| Collection<AtomicUpdateGroup<T>> oldVisibleGroups, |
| State newStateOfOldVisibleGroup, |
| Collection<AtomicUpdateGroup<T>> newVisibleGroups, |
| State oldStateOfNewVisibleGroups |
| ) |
| { |
| oldVisibleGroups.forEach( |
| group -> { |
| if (!group.isEmpty()) { |
| removeFrom(group, State.VISIBLE); |
| } |
| } |
| ); |
| newVisibleGroups.forEach( |
| entry -> transitAtomicUpdateGroupState(entry, oldStateOfNewVisibleGroups, State.VISIBLE) |
| ); |
| oldVisibleGroups.forEach( |
| group -> { |
| if (!group.isEmpty()) { |
| addAtomicUpdateGroupWithState(group, newStateOfOldVisibleGroup, false); |
| } |
| } |
| ); |
| } |
| |
| /** |
| * Find the {@link AtomicUpdateGroup} of the given state which has the same {@link RootPartitionRange} and |
| * minorVersion with {@link PartitionChunk}. |
| */ |
| @Nullable |
| private AtomicUpdateGroup<T> findAtomicUpdateGroupWith(PartitionChunk<T> chunk, State state) |
| { |
| final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = getStateMap(state).get( |
| RootPartitionRange.of(chunk) |
| ); |
| if (versionToGroup != null) { |
| final AtomicUpdateGroup<T> atomicUpdateGroup = versionToGroup.get(chunk.getObject().getMinorVersion()); |
| if (atomicUpdateGroup != null) { |
| return atomicUpdateGroup; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Returns null if atomicUpdateGroup is not found for the state. |
| * Can return an empty atomicUpdateGroup. |
| */ |
| @Nullable |
| private AtomicUpdateGroup<T> tryRemoveChunkFromGroupWithState(PartitionChunk<T> chunk, State state) |
| { |
| final RootPartitionRange rangeKey = RootPartitionRange.of(chunk); |
| final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = getStateMap(state).get(rangeKey); |
| if (versionToGroup != null) { |
| final AtomicUpdateGroup<T> atomicUpdateGroup = versionToGroup.get(chunk.getObject().getMinorVersion()); |
| if (atomicUpdateGroup != null) { |
| atomicUpdateGroup.remove(chunk); |
| if (atomicUpdateGroup.isEmpty()) { |
| versionToGroup.remove(chunk.getObject().getMinorVersion()); |
| if (versionToGroup.isEmpty()) { |
| getStateMap(state).remove(rangeKey); |
| } |
| } |
| |
| determineVisibleGroupAfterRemove( |
| atomicUpdateGroup, |
| RootPartitionRange.of(chunk), |
| chunk.getObject().getMinorVersion(), |
| state |
| ); |
| return atomicUpdateGroup; |
| } |
| } |
| return null; |
| } |
| |
| private List<AtomicUpdateGroup<T>> findOvershadowedBy( |
| AtomicUpdateGroup<T> aug, |
| State fromState |
| ) |
| { |
| final RootPartitionRange rangeKeyOfGivenAug = RootPartitionRange.of(aug); |
| return findOvershadowedBy(rangeKeyOfGivenAug, aug.getMinorVersion(), fromState); |
| } |
| |
| /** |
| * Find all atomicUpdateGroups of the given state overshadowed by the minorVersion in the given rootPartitionRange. |
| * The atomicUpdateGroup of a higher minorVersion can have a wider RootPartitionRange. |
| * To find all atomicUpdateGroups overshadowed by the given rootPartitionRange and minorVersion, |
| * we first need to find the first key contained by the given rootPartitionRange. |
| * Once we find such key, then we go through the entire map until we see an atomicUpdateGroup of which |
| * rootRangePartition is not contained by the given rootPartitionRange. |
| * |
| * @param rangeOfAug the partition range to search for overshadowed groups. |
| * @param minorVersion the minor version to check overshadow relation. The found groups will have lower minor versions |
| * than this. |
| * @param fromState the state to search for overshadowed groups. |
| * |
| * @return a list of found atomicUpdateGroups. It could be empty if no groups are found. |
| */ |
| @VisibleForTesting |
| List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug, short minorVersion, State fromState) |
| { |
| final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState); |
| final List<AtomicUpdateGroup<T>> found = new ArrayList<>(); |
| final Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> iterator = |
| entryIteratorGreaterThan(rangeOfAug.startPartitionId, stateMap); |
| while (iterator.hasNext()) { |
| final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = iterator.next(); |
| if (rangeOfAug.contains(current.getKey())) { |
| // versionToGroup is sorted by minorVersion. |
| // versionToGroup.headMap(minorVersion) below returns a map containing all entries of lower minorVersions |
| // than the given minorVersion. |
| final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue(); |
| // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized. |
| // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called. |
| // See AbstractObjectCollection.toArray(). |
| // If you see performance degradation here, probably we need to improve the below line. |
| if (versionToGroup.firstShortKey() < minorVersion) { |
| found.addAll(versionToGroup.headMap(minorVersion).values()); |
| } |
| } else { |
| break; |
| } |
| } |
| return found; |
| } |
| |
| private List<AtomicUpdateGroup<T>> findOvershadows(AtomicUpdateGroup<T> aug, State fromState) |
| { |
| return findOvershadows(RootPartitionRange.of(aug), aug.getMinorVersion(), fromState); |
| } |
| |
| /** |
| * Find all atomicUpdateGroups which overshadow others of the given minorVersion in the given rootPartitionRange. |
| * Similar to {@link #findOvershadowedBy}. |
| * |
| * Note that one atomicUpdateGroup can overshadow multiple other groups. If you're finding overshadowing |
| * atomicUpdateGroups by calling this method in a loop, the results of this method can contain duplicate groups. |
| * |
| * @param rangeOfAug the partition range to search for overshadowing groups. |
| * @param minorVersion the minor version to check overshadow relation. The found groups will have higher minor |
| * versions than this. |
| * @param fromState the state to search for overshadowed groups. |
| * |
| * @return a list of found atomicUpdateGroups. It could be empty if no groups are found. |
| */ |
| @VisibleForTesting |
| List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug, short minorVersion, State fromState) |
| { |
| final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState); |
| final Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> iterator = |
| entryIteratorSmallerThan(rangeOfAug.endPartitionId, stateMap); |
| |
| // Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug. |
| // Note that RootPartitionRange of entries are always consecutive. |
| final List<AtomicUpdateGroup<T>> found = new ArrayList<>(); |
| while (iterator.hasNext()) { |
| final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = iterator.next(); |
| if (!current.getKey().overlaps(rangeOfAug)) { |
| break; |
| } |
| if (current.getKey().contains(rangeOfAug)) { |
| // versionToGroup is sorted by minorVersion. |
| // versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher |
| // minorVersions than the given minorVersion. |
| final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue(); |
| // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized. |
| // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called. |
| // See AbstractObjectCollection.toArray(). |
| // If you see performance degradation here, probably we need to improve the below line. |
| if (versionToGroup.lastShortKey() > minorVersion) { |
| found.addAll(versionToGroup.tailMap(minorVersion).values()); |
| } |
| } |
| } |
| return found; |
| } |
| |
| boolean isOvershadowedByVisibleGroup(RootPartitionRange partitionRange, short minorVersion) |
| { |
| final Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> iterator = |
| entryIteratorSmallerThan(partitionRange.endPartitionId, visibleGroupPerRange); |
| |
| // Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug. |
| // Note that RootPartitionRange of entries are always consecutive. |
| while (iterator.hasNext()) { |
| final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = iterator.next(); |
| if (!current.getKey().overlaps(partitionRange)) { |
| break; |
| } |
| if (current.getKey().contains(partitionRange)) { |
| // versionToGroup is sorted by minorVersion. |
| // versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher |
| // minorVersions than the given minorVersion. |
| final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue(); |
| if (versionToGroup.lastShortKey() > minorVersion) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Returns an iterator of entries that has a {@link RootPartitionRange} smaller than the given partitionId. |
| * A RootPartitionRange is smaller than a partitionId if {@link RootPartitionRange#startPartitionId} < partitionId. |
| */ |
| private Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> entryIteratorSmallerThan( |
| short partitionId, |
| TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap |
| ) |
| { |
| final RootPartitionRange lowFench = new RootPartitionRange((short) 0, (short) 0); |
| final RootPartitionRange highFence = new RootPartitionRange(partitionId, partitionId); |
| return stateMap.subMap(lowFench, false, highFence, false).descendingMap().entrySet().iterator(); |
| } |
| |
| /** |
| * Returns an iterator of entries that has a {@link RootPartitionRange} greater than the given partitionId. |
| * A RootPartitionRange is greater than a partitionId if {@link RootPartitionRange#startPartitionId} >= partitionId |
| * and {@link RootPartitionRange#endPartitionId} > partitionId. |
| */ |
| private Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> entryIteratorGreaterThan( |
| short partitionId, |
| TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap |
| ) |
| { |
| final RootPartitionRange lowFench = new RootPartitionRange(partitionId, partitionId); |
| final RootPartitionRange highFence = new RootPartitionRange(Short.MAX_VALUE, Short.MAX_VALUE); |
| return stateMap.subMap(lowFench, false, highFence, false).entrySet().iterator(); |
| } |
| |
| /** |
| * Determine the visible group after a new chunk is added. |
| */ |
| private void determineVisibleGroupAfterAdd(AtomicUpdateGroup<T> aug, State stateOfAug) |
| { |
| if (stateOfAug == State.STANDBY) { |
| moveNewStandbyToVisibleIfNecessary(aug, stateOfAug); |
| } else if (stateOfAug == State.OVERSHADOWED) { |
| checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(aug, stateOfAug); |
| } |
| } |
| |
| /** |
| * This method is called in {@link #determineVisibleGroupAfterAdd}. |
| * The given standby group can be visible in the below two cases: |
| * |
| * - The standby group is full. Since every standby group has a higher version than the current visible group, |
| * it should become visible immediately when it's full. |
| * - The standby group is not full but not empty and the current visible is not full. If there's no fully available |
| * group, the group of the highest version should be the visible. |
| */ |
| private void moveNewStandbyToVisibleIfNecessary(AtomicUpdateGroup<T> standbyGroup, State stateOfGroup) |
| { |
| assert stateOfGroup == State.STANDBY; |
| |
| // A standby atomicUpdateGroup becomes visible when its all segments are available. |
| if (standbyGroup.isFull()) { |
| // A current visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup |
| // becomes visible. |
| replaceVisibleWith( |
| findOvershadowedBy(standbyGroup, State.VISIBLE), |
| State.OVERSHADOWED, |
| Collections.singletonList(standbyGroup), |
| State.STANDBY |
| ); |
| findOvershadowedBy(standbyGroup, State.STANDBY) |
| .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED)); |
| } else { |
| // The given atomicUpdateGroup is in the standby state which means it's not overshadowed by the visible group. |
| // If the visible group is not fully available, then the new standby group should be visible since it has a |
| // higher minor version. |
| if (!standbyGroup.isEmpty()) { |
| // Check there are visible atomicUpdateGroups overshadowed by the given atomicUpdateGroup. |
| final List<AtomicUpdateGroup<T>> overshadowedVisibles = findOvershadowedBy( |
| standbyGroup, |
| State.VISIBLE |
| ); |
| if (overshadowedVisibles.isEmpty()) { |
| // There is no visible atomicUpdateGroup for the rootPartitionRange of the given aug. |
| // The given aug should be visible. |
| transitAtomicUpdateGroupState(standbyGroup, State.STANDBY, State.VISIBLE); |
| findOvershadowedBy(standbyGroup, State.STANDBY) |
| .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED)); |
| } else { |
| // Check there is any missing chunk in the current visible groups. |
| // If the current visible groups don't cover the partitino range of the given standby group, |
| // the given standby group should be visible. |
| final boolean fullyCoverAugRange = doGroupsFullyCoverPartitionRange( |
| overshadowedVisibles, |
| standbyGroup.getStartRootPartitionId(), |
| standbyGroup.getEndRootPartitionId() |
| ); |
| if (!fullyCoverAugRange) { |
| replaceVisibleWith( |
| overshadowedVisibles, |
| State.OVERSHADOWED, |
| Collections.singletonList(standbyGroup), |
| State.STANDBY |
| ); |
| findOvershadowedBy(standbyGroup, State.STANDBY) |
| .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED)); |
| |
| } |
| // If all visible atomicUpdateGroups are full, then the given atomicUpdateGroup should stay in the standby |
| // state. |
| } |
| } |
| } |
| } |
| |
| /** |
| * This method is called in {@link #determineVisibleGroupAfterAdd}. It first checks the current visible group is |
| * fully available. If not, it checks there are overshadowed groups which can cover the rootPartitionRange of |
| * the visible groups and are fully available. If it finds such groups, they become visible. |
| */ |
| private void checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible( |
| AtomicUpdateGroup<T> group, |
| State stateOfGroup |
| ) |
| { |
| assert stateOfGroup == State.OVERSHADOWED; |
| if (group.isFull()) { |
| // Since this atomicUpdateGroup is full, it could be changed to visible if the current visible group is not |
| // fully available. To check this, we first check the current visible is fully available. |
| // And if not, we check the overshadowed groups are fully available and can cover the partition range of |
| // the atomicUpdateGroups overshadow the given overshadowed group. |
| |
| // Visible or standby groups overshadowing the given group. |
| // Used to both 1) check fully available visible group and |
| // 2) get the partition range which the fully available overshadowed groups should cover to become visible. |
| final List<AtomicUpdateGroup<T>> groupsOvershadowingAug; |
| final boolean isOvershadowingGroupsFull; |
| |
| final List<AtomicUpdateGroup<T>> overshadowingVisibles = findOvershadows(group, State.VISIBLE); |
| if (overshadowingVisibles.isEmpty()) { |
| final List<AtomicUpdateGroup<T>> overshadowingStandbys = findLatestNonFullyAvailableAtomicUpdateGroups( |
| findOvershadows(group, State.STANDBY) |
| ); |
| if (overshadowingStandbys.isEmpty()) { |
| throw new ISE("Unexpected state: atomicUpdateGroup[%s] is overshadowed, but nothing overshadows it", group); |
| } |
| groupsOvershadowingAug = overshadowingStandbys; |
| isOvershadowingGroupsFull = false; |
| } else { |
| groupsOvershadowingAug = overshadowingVisibles; |
| isOvershadowingGroupsFull = doGroupsFullyCoverPartitionRange( |
| groupsOvershadowingAug, |
| groupsOvershadowingAug.get(0).getStartRootPartitionId(), |
| groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId() |
| ); |
| } |
| |
| // If groupsOvershadowingAug is the standby groups, isOvershadowingGroupsFull is always false. |
| // If groupsOvershadowingAug is the visible groups, isOvershadowingGroupsFull indicates the visible group is |
| // fully available or not. |
| if (!isOvershadowingGroupsFull) { |
| // Let's check the overshadowed groups can cover the partition range of groupsOvershadowingAug |
| // and are fully available. |
| //noinspection ConstantConditions |
| final List<AtomicUpdateGroup<T>> latestFullGroups = FluentIterable |
| .from(groupsOvershadowingAug) |
| .transformAndConcat(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups( |
| RootPartitionRange.of(eachFullgroup), |
| eachFullgroup.getMinorVersion() |
| )) |
| .toList(); |
| |
| if (!latestFullGroups.isEmpty()) { |
| final boolean isOvershadowedGroupsFull = doGroupsFullyCoverPartitionRange( |
| latestFullGroups, |
| groupsOvershadowingAug.get(0).getStartRootPartitionId(), |
| groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId() |
| ); |
| |
| if (isOvershadowedGroupsFull) { |
| replaceVisibleWith(overshadowingVisibles, State.STANDBY, latestFullGroups, State.OVERSHADOWED); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Checks if the given groups fully cover the given partition range. To fully cover the range, the given groups |
| * should satisfy the below: |
| * |
| * - All groups must be full. |
| * - All groups must be adjacent. |
| * - The lowest startPartitionId and the highest endPartitionId must be same with the given startPartitionId and |
| * the given endPartitionId, respectively. |
| * |
| * @param groups atomicUpdateGroups sorted by their rootPartitionRange |
| * @param startRootPartitionId the start partitionId of the root partition range to check the coverage |
| * @param endRootPartitionId the end partitionId of the root partition range to check the coverage |
| * |
| * @return true if the given groups fully cover the given partition range. |
| */ |
| private boolean doGroupsFullyCoverPartitionRange( |
| List<AtomicUpdateGroup<T>> groups, |
| int startRootPartitionId, |
| int endRootPartitionId |
| ) |
| { |
| final int startRootPartitionIdOfOvershadowed = groups.get(0).getStartRootPartitionId(); |
| final int endRootPartitionIdOfOvershadowed = groups.get(groups.size() - 1).getEndRootPartitionId(); |
| if (startRootPartitionId != startRootPartitionIdOfOvershadowed |
| || endRootPartitionId != endRootPartitionIdOfOvershadowed) { |
| return false; |
| } else { |
| int prevEndPartitionId = groups.get(0).getStartRootPartitionId(); |
| for (AtomicUpdateGroup<T> group : groups) { |
| if (!group.isFull() || prevEndPartitionId != group.getStartRootPartitionId()) { |
| // If any visible atomicUpdateGroup overshadowed by the given standby atomicUpdateGroup is not full, |
| // then the given atomicUpdateGroup should be visible since it has a higher version. |
| return false; |
| } |
| prevEndPartitionId = group.getEndRootPartitionId(); |
| } |
| } |
| return true; |
| } |
| |
| private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state, boolean determineVisible) |
| { |
| final AtomicUpdateGroup<T> existing = getStateMap(state) |
| .computeIfAbsent(RootPartitionRange.of(aug), k -> createMinorVersionToAugMap(state)) |
| .put(aug.getMinorVersion(), aug); |
| |
| if (existing != null) { |
| throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", existing, state); |
| } |
| |
| if (determineVisible) { |
| determineVisibleGroupAfterAdd(aug, state); |
| } |
| } |
| |
| boolean addChunk(PartitionChunk<T> chunk) |
| { |
| // Sanity check. ExistingChunk should be usually null. |
| final PartitionChunk<T> existingChunk = knownPartitionChunks.put(chunk.getChunkNumber(), chunk); |
| if (existingChunk != null) { |
| if (!existingChunk.equals(chunk)) { |
| throw new ISE( |
| "existingChunk[%s] is different from newChunk[%s] for partitionId[%d]", |
| existingChunk, |
| chunk, |
| chunk.getChunkNumber() |
| ); |
| } else { |
| // A new chunk of the same major version and partitionId can be added in segment handoff |
| // from stream ingestion tasks to historicals |
| return false; |
| } |
| } |
| |
| // Find atomicUpdateGroup of the new chunk |
| AtomicUpdateGroup<T> atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.OVERSHADOWED); |
| |
| if (atomicUpdateGroup != null) { |
| atomicUpdateGroup.add(chunk); |
| // If overshadowed atomicUpdateGroup is full and visible atomicUpdateGroup is not full, |
| // move overshadowed one to visible. |
| determineVisibleGroupAfterAdd(atomicUpdateGroup, State.OVERSHADOWED); |
| } else { |
| atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.STANDBY); |
| |
| if (atomicUpdateGroup != null) { |
| atomicUpdateGroup.add(chunk); |
| determineVisibleGroupAfterAdd(atomicUpdateGroup, State.STANDBY); |
| } else { |
| atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.VISIBLE); |
| |
| if (atomicUpdateGroup != null) { |
| if (atomicUpdateGroup.findChunk(chunk.getChunkNumber()) == null) { |
| // If this chunk is not in the atomicUpdateGroup, then we add the chunk to it if it's not full. |
| if (!atomicUpdateGroup.isFull()) { |
| atomicUpdateGroup.add(chunk); |
| } else { |
| throw new ISE("Can't add chunk[%s] to a full atomicUpdateGroup[%s]", chunk, atomicUpdateGroup); |
| } |
| } else { |
| // If this chunk is already in the atomicUpdateGroup, it should be in knownPartitionChunks |
| // and this code must not be executed. |
| throw new ISE( |
| "Unexpected state: chunk[%s] is in the atomicUpdateGroup[%s] but not in knownPartitionChunks[%s]", |
| chunk, |
| atomicUpdateGroup, |
| knownPartitionChunks |
| ); |
| } |
| } else { |
| final AtomicUpdateGroup<T> newAtomicUpdateGroup = new AtomicUpdateGroup<>(chunk); |
| |
| // Decide the initial state of the new atomicUpdateGroup |
| final boolean overshadowed = isOvershadowedByVisibleGroup( |
| RootPartitionRange.of(newAtomicUpdateGroup), |
| newAtomicUpdateGroup.getMinorVersion() |
| ); |
| |
| if (overshadowed) { |
| addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED, true); |
| } else { |
| addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.STANDBY, true); |
| } |
| } |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Handles the removal of an empty atomicUpdateGroup from a state. |
| */ |
| private void determineVisibleGroupAfterRemove( |
| AtomicUpdateGroup<T> augOfRemovedChunk, |
| RootPartitionRange rangeOfAug, |
| short minorVersion, |
| State stateOfRemovedAug |
| ) |
| { |
| // If an atomicUpdateGroup is overshadowed by another non-visible atomicUpdateGroup, there must be another visible |
| // atomicUpdateGroup which also overshadows the same atomicUpdateGroup. |
| // As a result, the state of overshadowed atomicUpdateGroup should be updated only when a visible atomicUpdateGroup |
| // is removed. |
| |
| if (stateOfRemovedAug == State.VISIBLE) { |
| // A chunk is removed from the current visible group. |
| // Fall back to |
| // 1) the latest fully available overshadowed group if any |
| // 2) the latest standby group if any |
| // 3) the latest overshadowed group if any |
| |
| // Check there is a fully available latest overshadowed atomicUpdateGroup. |
| final List<AtomicUpdateGroup<T>> latestFullAugs = findLatestFullyAvailableOvershadowedAtomicUpdateGroups( |
| rangeOfAug, |
| minorVersion |
| ); |
| |
| // If there are fully available overshadowed groups, then the latest one becomes visible. |
| if (!latestFullAugs.isEmpty()) { |
| // The current visible atomicUpdateGroup becomes standby |
| // and the fully available overshadowed atomicUpdateGroups become visible |
| final Set<AtomicUpdateGroup<T>> overshadowsLatestFullAugsInVisible = FluentIterable |
| .from(latestFullAugs) |
| .transformAndConcat(group -> findOvershadows(group, State.VISIBLE)) |
| .toSet(); |
| replaceVisibleWith( |
| overshadowsLatestFullAugsInVisible, |
| State.STANDBY, |
| latestFullAugs, |
| State.OVERSHADOWED |
| ); |
| FluentIterable.from(latestFullAugs) |
| .transformAndConcat(group -> findOvershadows(group, State.OVERSHADOWED)) |
| .forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.STANDBY)); |
| } else { |
| // Find the latest non-fully available atomicUpdateGroups |
| final List<AtomicUpdateGroup<T>> latestStandby = findLatestNonFullyAvailableAtomicUpdateGroups( |
| findOvershadows(rangeOfAug, minorVersion, State.STANDBY) |
| ); |
| if (!latestStandby.isEmpty()) { |
| final List<AtomicUpdateGroup<T>> overshadowedByLatestStandby = FluentIterable |
| .from(latestStandby) |
| .transformAndConcat(group -> findOvershadowedBy(group, State.VISIBLE)) |
| .toList(); |
| replaceVisibleWith(overshadowedByLatestStandby, State.OVERSHADOWED, latestStandby, State.STANDBY); |
| |
| // All standby groups overshadowed by the new visible group should be moved to overshadowed |
| FluentIterable |
| .from(latestStandby) |
| .transformAndConcat(group -> findOvershadowedBy(group, State.STANDBY)) |
| .forEach(aug -> transitAtomicUpdateGroupState(aug, State.STANDBY, State.OVERSHADOWED)); |
| } else if (augOfRemovedChunk.isEmpty()) { |
| // Visible is empty. Move the latest overshadowed to visible. |
| final List<AtomicUpdateGroup<T>> latestOvershadowed = findLatestNonFullyAvailableAtomicUpdateGroups( |
| findOvershadowedBy(rangeOfAug, minorVersion, State.OVERSHADOWED) |
| ); |
| if (!latestOvershadowed.isEmpty()) { |
| latestOvershadowed.forEach(aug -> transitAtomicUpdateGroupState(aug, State.OVERSHADOWED, State.VISIBLE)); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Find the latest NON-FULLY available atomicUpdateGroups from the given groups. |
| * |
| * This method MUST be called only when there is no fully available ones in the given groups. If the given groups |
| * are in the overshadowed state, calls {@link #findLatestFullyAvailableOvershadowedAtomicUpdateGroups} first |
| * to check there is any fully available group. |
| * If the given groups are in the standby state, you can freely call this method because there should be no fully |
| * available one in the standby groups at any time. |
| */ |
| private List<AtomicUpdateGroup<T>> findLatestNonFullyAvailableAtomicUpdateGroups(List<AtomicUpdateGroup<T>> groups) |
| { |
| if (groups.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| |
| final TreeMap<RootPartitionRange, AtomicUpdateGroup<T>> rangeToGroup = new TreeMap<>(); |
| for (AtomicUpdateGroup<T> group : groups) { |
| rangeToGroup.put(RootPartitionRange.of(group), group); |
| } |
| final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>(); |
| // rangeToGroup is sorted by RootPartitionRange which means, the groups of the wider range will appear later |
| // in the rangeToGroup map. Since the wider groups have higer minor versions than narrower groups, |
| // we iterate the rangeToGroup from the last entry in descending order. |
| Entry<RootPartitionRange, AtomicUpdateGroup<T>> currEntry = rangeToGroup.lastEntry(); |
| while (currEntry != null) { |
| final Entry<RootPartitionRange, AtomicUpdateGroup<T>> lowerEntry = rangeToGroup.lowerEntry(currEntry.getKey()); |
| if (lowerEntry != null) { |
| if (lowerEntry.getKey().endPartitionId != currEntry.getKey().startPartitionId) { |
| return Collections.emptyList(); |
| } |
| } |
| visibles.add(currEntry.getValue()); |
| currEntry = lowerEntry; |
| } |
| // visibles should be sorted. |
| visibles.sort(Comparator.comparing(RootPartitionRange::of)); |
| return visibles; |
| } |
| |
| private List<AtomicUpdateGroup<T>> findLatestFullyAvailableOvershadowedAtomicUpdateGroups( |
| RootPartitionRange rangeOfAug, |
| short minorVersion |
| ) |
| { |
| final List<AtomicUpdateGroup<T>> overshadowedGroups = findOvershadowedBy( |
| rangeOfAug, |
| minorVersion, |
| State.OVERSHADOWED |
| ); |
| |
| // Filter out non-fully available groups. |
| final TreeMap<RootPartitionRange, AtomicUpdateGroup<T>> fullGroups = new TreeMap<>(); |
| for (AtomicUpdateGroup<T> group : FluentIterable.from(overshadowedGroups).filter(AtomicUpdateGroup::isFull)) { |
| fullGroups.put(RootPartitionRange.of(group), group); |
| } |
| if (fullGroups.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| if (fullGroups.firstKey().startPartitionId != rangeOfAug.startPartitionId |
| || fullGroups.lastKey().endPartitionId != rangeOfAug.endPartitionId) { |
| return Collections.emptyList(); |
| } |
| |
| // Find latest fully available groups. |
| final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>(); |
| // fullGroups is sorted by RootPartitionRange which means, the groups of the wider range will appear later |
| // in the fullGroups map. Since the wider groups have higer minor versions than narrower groups, |
| // we iterate the fullGroups from the last entry in descending order. |
| Entry<RootPartitionRange, AtomicUpdateGroup<T>> currEntry = fullGroups.lastEntry(); |
| while (currEntry != null) { |
| final Entry<RootPartitionRange, AtomicUpdateGroup<T>> lowerEntry = fullGroups.lowerEntry(currEntry.getKey()); |
| if (lowerEntry != null) { |
| if (lowerEntry.getKey().endPartitionId != currEntry.getKey().startPartitionId) { |
| return Collections.emptyList(); |
| } |
| } |
| visibles.add(currEntry.getValue()); |
| currEntry = lowerEntry; |
| } |
| // visibles should be sorted. |
| visibles.sort(Comparator.comparing(RootPartitionRange::of)); |
| return visibles; |
| } |
| |
| private void removeFrom(AtomicUpdateGroup<T> aug, State state) |
| { |
| final RootPartitionRange rangeKey = RootPartitionRange.of(aug); |
| final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = getStateMap(state).get(rangeKey); |
| if (versionToGroup == null) { |
| throw new ISE("Unknown atomicUpdateGroup[%s] in state[%s]", aug, state); |
| } |
| |
| final AtomicUpdateGroup<T> removed = versionToGroup.remove(aug.getMinorVersion()); |
| if (removed == null) { |
| throw new ISE("Unknown atomicUpdateGroup[%s] in state[%s]", aug, state); |
| } |
| |
| if (!removed.equals(aug)) { |
| throw new ISE( |
| "Unexpected state: Removed atomicUpdateGroup[%s] is different from expected atomicUpdateGroup[%s]", |
| removed, |
| aug |
| ); |
| } |
| |
| if (versionToGroup.isEmpty()) { |
| getStateMap(state).remove(rangeKey); |
| } |
| } |
| |
| @Nullable |
| PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk) |
| { |
| final PartitionChunk<T> knownChunk = knownPartitionChunks.get(partitionChunk.getChunkNumber()); |
| if (knownChunk == null) { |
| return null; |
| } |
| |
| if (!knownChunk.equals(partitionChunk)) { |
| throw new ISE( |
| "Unexpected state: Same partitionId[%d], but known partition[%s] is different from the input partition[%s]", |
| partitionChunk.getChunkNumber(), |
| knownChunk, |
| partitionChunk |
| ); |
| } |
| |
| AtomicUpdateGroup<T> augOfRemovedChunk = tryRemoveChunkFromGroupWithState(partitionChunk, State.STANDBY); |
| |
| if (augOfRemovedChunk == null) { |
| augOfRemovedChunk = tryRemoveChunkFromGroupWithState(partitionChunk, State.VISIBLE); |
| if (augOfRemovedChunk == null) { |
| augOfRemovedChunk = tryRemoveChunkFromGroupWithState(partitionChunk, State.OVERSHADOWED); |
| if (augOfRemovedChunk == null) { |
| throw new ISE("Can't find atomicUpdateGroup for partitionChunk[%s]", partitionChunk); |
| } |
| } |
| } |
| |
| return knownPartitionChunks.remove(partitionChunk.getChunkNumber()); |
| } |
| |
| public boolean isEmpty() |
| { |
| return visibleGroupPerRange.isEmpty(); |
| } |
| |
| public boolean isComplete() |
| { |
| return Iterators.all( |
| visibleGroupPerRange.values().iterator(), |
| map -> { |
| SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>> singleMap = |
| (SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>>) map; |
| //noinspection ConstantConditions |
| return singleMap.val.isFull(); |
| } |
| ); |
| } |
| |
| @Nullable |
| PartitionChunk<T> getChunk(int partitionId) |
| { |
| final PartitionChunk<T> chunk = knownPartitionChunks.get(partitionId); |
| if (chunk == null) { |
| return null; |
| } |
| final AtomicUpdateGroup<T> aug = findAtomicUpdateGroupWith(chunk, State.VISIBLE); |
| if (aug == null) { |
| return null; |
| } else { |
| return Preconditions.checkNotNull( |
| aug.findChunk(partitionId), |
| "Can't find partitionChunk for partitionId[%s] in atomicUpdateGroup[%s]", |
| partitionId, |
| aug |
| ); |
| } |
| } |
| |
| Iterator<PartitionChunk<T>> visibleChunksIterator() |
| { |
| final FluentIterable<Short2ObjectSortedMap<AtomicUpdateGroup<T>>> versionToGroupIterable = FluentIterable.from( |
| visibleGroupPerRange.values() |
| ); |
| return versionToGroupIterable |
| .transformAndConcat(map -> { |
| SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>> singleMap = |
| (SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>>) map; |
| //noinspection ConstantConditions |
| return singleMap.val.getChunks(); |
| }).iterator(); |
| } |
| |
| List<PartitionChunk<T>> getOvershadowedChunks() |
| { |
| return getAllChunks(overshadowedGroups); |
| } |
| |
| @VisibleForTesting |
| List<PartitionChunk<T>> getStandbyChunks() |
| { |
| return getAllChunks(standbyGroups); |
| } |
| |
| private static <T extends Overshadowable<T>> List<PartitionChunk<T>> getAllChunks( |
| TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap |
| ) |
| { |
| final List<PartitionChunk<T>> allChunks = new ArrayList<>(); |
| for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : stateMap.values()) { |
| for (AtomicUpdateGroup<T> aug : treeMap.values()) { |
| allChunks.addAll(aug.getChunks()); |
| } |
| } |
| return allChunks; |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| OvershadowableManager<?> that = (OvershadowableManager<?>) o; |
| return Objects.equals(knownPartitionChunks, that.knownPartitionChunks) && |
| Objects.equals(standbyGroups, that.standbyGroups) && |
| Objects.equals(visibleGroupPerRange, that.visibleGroupPerRange) && |
| Objects.equals(overshadowedGroups, that.overshadowedGroups); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash(knownPartitionChunks, standbyGroups, visibleGroupPerRange, overshadowedGroups); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "OvershadowableManager{" + |
| "knownPartitionChunks=" + knownPartitionChunks + |
| ", standbyGroups=" + standbyGroups + |
| ", visibleGroupPerRangePerVersion=" + visibleGroupPerRange + |
| ", overshadowedGroups=" + overshadowedGroups + |
| '}'; |
| } |
| |
| @VisibleForTesting |
| static class RootPartitionRange implements Comparable<RootPartitionRange> |
| { |
| private final short startPartitionId; |
| private final short endPartitionId; |
| |
| @VisibleForTesting |
| static RootPartitionRange of(int startPartitionId, int endPartitionId) |
| { |
| return new RootPartitionRange((short) startPartitionId, (short) endPartitionId); |
| } |
| |
| private static <T extends Overshadowable<T>> RootPartitionRange of(PartitionChunk<T> chunk) |
| { |
| return of(chunk.getObject().getStartRootPartitionId(), chunk.getObject().getEndRootPartitionId()); |
| } |
| |
| private static <T extends Overshadowable<T>> RootPartitionRange of(AtomicUpdateGroup<T> aug) |
| { |
| return of(aug.getStartRootPartitionId(), aug.getEndRootPartitionId()); |
| } |
| |
| private RootPartitionRange(short startPartitionId, short endPartitionId) |
| { |
| this.startPartitionId = startPartitionId; |
| this.endPartitionId = endPartitionId; |
| } |
| |
| public boolean contains(RootPartitionRange that) |
| { |
| return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId) |
| && Short.toUnsignedInt(this.endPartitionId) >= Short.toUnsignedInt(that.endPartitionId); |
| } |
| |
| public boolean overlaps(RootPartitionRange that) |
| { |
| return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId) |
| && Short.toUnsignedInt(endPartitionId) > Short.toUnsignedInt(that.startPartitionId) |
| || Short.toUnsignedInt(startPartitionId) >= Short.toUnsignedInt(that.startPartitionId) |
| && Short.toUnsignedInt(startPartitionId) < Short.toUnsignedInt(that.endPartitionId); |
| } |
| |
| @Override |
| public int compareTo(RootPartitionRange o) |
| { |
| if (startPartitionId != o.startPartitionId) { |
| return Integer.compare(Short.toUnsignedInt(startPartitionId), Short.toUnsignedInt(o.startPartitionId)); |
| } else { |
| return Integer.compare(Short.toUnsignedInt(endPartitionId), Short.toUnsignedInt(o.endPartitionId)); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| RootPartitionRange that = (RootPartitionRange) o; |
| return startPartitionId == that.startPartitionId && |
| endPartitionId == that.endPartitionId; |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash(startPartitionId, endPartitionId); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "RootPartitionRange{" + |
| "startPartitionId=" + startPartitionId + |
| ", endPartitionId=" + endPartitionId + |
| '}'; |
| } |
| } |
| |
| /** |
| * Map can store at most a single entry. |
| * Comparing to{@link it.unimi.dsi.fastutil.shorts.Short2ObjectSortedMaps.Singleton}, it's different from the |
| * perspective of that this class supports update. |
| */ |
| private static class SingleEntryShort2ObjectSortedMap<V> extends AbstractShort2ObjectSortedMap<V> |
| { |
| private short key; |
| private V val; |
| |
| private SingleEntryShort2ObjectSortedMap() |
| { |
| key = -1; |
| val = null; |
| } |
| |
| private SingleEntryShort2ObjectSortedMap(short key, V val) |
| { |
| this.key = key; |
| this.val = val; |
| } |
| |
| @Override |
| public Short2ObjectSortedMap<V> subMap(short fromKey, short toKey) |
| { |
| if (fromKey <= key && toKey > key) { |
| return this; |
| } else { |
| throw new IAE("fromKey: %s, toKey: %s, key: %s", fromKey, toKey, key); |
| } |
| } |
| |
| @Override |
| public Short2ObjectSortedMap<V> headMap(short toKey) |
| { |
| if (toKey > key) { |
| return this; |
| } else { |
| throw new IAE("toKey: %s, key: %s", toKey, key); |
| } |
| } |
| |
| @Override |
| public Short2ObjectSortedMap<V> tailMap(short fromKey) |
| { |
| if (fromKey <= key) { |
| return this; |
| } else { |
| throw new IAE("fromKey: %s, key: %s", fromKey, key); |
| } |
| } |
| |
| @Override |
| public short firstShortKey() |
| { |
| if (key < 0) { |
| throw new NoSuchElementException(); |
| } |
| return key; |
| } |
| |
| @Override |
| public short lastShortKey() |
| { |
| if (key < 0) { |
| throw new NoSuchElementException(); |
| } |
| return key; |
| } |
| |
| @Override |
| public ObjectSortedSet<Short2ObjectMap.Entry<V>> short2ObjectEntrySet() |
| { |
| return isEmpty() ? ObjectSortedSets.EMPTY_SET : ObjectSortedSets.singleton(new BasicEntry<>(key, val)); |
| } |
| |
| @Override |
| public ShortSortedSet keySet() |
| { |
| return isEmpty() ? ShortSortedSets.EMPTY_SET : ShortSortedSets.singleton(key); |
| } |
| |
| @Override |
| public ObjectCollection<V> values() |
| { |
| return new AbstractObjectCollection<V>() |
| { |
| @Override |
| public ObjectIterator<V> iterator() |
| { |
| return size() > 0 ? ObjectIterators.singleton(val) : ObjectIterators.emptyIterator(); |
| } |
| |
| @Override |
| public int size() |
| { |
| return key < 0 ? 0 : 1; |
| } |
| }; |
| } |
| |
| @Override |
| public V put(final short key, final V value) |
| { |
| if (isEmpty()) { |
| this.key = key; |
| this.val = value; |
| return null; |
| } else { |
| if (this.key == key) { |
| final V existing = this.val; |
| this.val = value; |
| return existing; |
| } else { |
| throw new ISE( |
| "Can't add [%d, %s] to non-empty SingleEntryShort2ObjectSortedMap[%d, %s]", |
| key, |
| value, |
| this.key, |
| this.val |
| ); |
| } |
| } |
| } |
| |
| @Override |
| public V get(short key) |
| { |
| return this.key == key ? val : null; |
| } |
| |
| @Override |
| public V remove(final short key) |
| { |
| if (this.key == key) { |
| this.key = -1; |
| return val; |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public boolean containsKey(short key) |
| { |
| return this.key == key; |
| } |
| |
| @Override |
| public ShortComparator comparator() |
| { |
| return ShortComparators.NATURAL_COMPARATOR; |
| } |
| |
| @Override |
| public int size() |
| { |
| return key < 0 ? 0 : 1; |
| } |
| |
| @Override |
| public void defaultReturnValue(V rv) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public V defaultReturnValue() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public boolean isEmpty() |
| { |
| return key < 0; |
| } |
| |
| @Override |
| public boolean containsValue(Object value) |
| { |
| if (key < 0) { |
| return false; |
| } else { |
| return Objects.equals(val, value); |
| } |
| } |
| |
| @Override |
| public void putAll(Map<? extends Short, ? extends V> m) |
| { |
| if (!m.isEmpty()) { |
| if (m.size() == 1) { |
| final Map.Entry<? extends Short, ? extends V> entry = m.entrySet().iterator().next(); |
| this.key = entry.getKey(); |
| this.val = entry.getValue(); |
| } else { |
| throw new IllegalArgumentException(); |
| } |
| } |
| } |
| } |
| } |