blob: 62ad40ce696dcbe628157107d709276a7f479fac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.heap;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.util.CollectionUtil;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is
* supported by hash set for fast contains (de-duplication) and deletes. Object identification
* happens based on {@link #equals(Object)}.
*
* <p>Possible future improvements:
*
* <ul>
* <li>We could also implement shrinking for the heap and the deduplication set.
* <li>We could replace the deduplication maps with more efficient custom implementations. In
* particular, a hash set would be enough if it could return existing elements on unsuccessful
* adding, etc..
* </ul>
*
* @param <T> type of the contained elements.
*/
public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T>
implements KeyGroupedInternalPriorityQueue<T> {
/** Function to extract the key from contained elements. */
private final KeyExtractorFunction<T> keyExtractor;
/**
* This array contains one hash set per key-group. The sets are used for fast de-duplication and
* deletes of elements.
*/
private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
/** The key-group range of elements that are managed by this queue. */
private final KeyGroupRange keyGroupRange;
/** The total number of key-groups of the job. */
private final int totalNumberOfKeyGroups;
/**
* Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
*
* @param elementPriorityComparator comparator for the priority of contained elements.
* @param keyExtractor function to extract a key from the contained elements.
* @param minimumCapacity the minimum and initial capacity of this priority queue.
* @param keyGroupRange the key-group range of the elements in this set.
* @param totalNumberOfKeyGroups the total number of key-groups of the job.
*/
@SuppressWarnings("unchecked")
public HeapPriorityQueueSet(
@Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnegative int minimumCapacity,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalNumberOfKeyGroups) {
super(elementPriorityComparator, minimumCapacity);
this.keyExtractor = keyExtractor;
this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
this.keyGroupRange = keyGroupRange;
final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
for (int i = 0; i < keyGroupsInLocalRange; ++i) {
deduplicationMapsByKeyGroup[i] =
CollectionUtil.newHashMapWithExpectedSize(deduplicationSetSize);
}
}
@Override
@Nullable
public T poll() {
final T toRemove = super.poll();
return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
}
/**
* Adds the element to the queue. In contrast to the superclass and to maintain set semantics,
* this happens only if no such element is already contained (determined by {@link
* #equals(Object)}).
*
* @return <code>true</code> if the operation changed the head element or if is it unclear if
* the head element changed. Only returns <code>false</code> iff the head element was not
* changed by this operation.
*/
@Override
public boolean add(@Nonnull T element) {
return getDedupMapForElement(element).putIfAbsent(element, element) == null
&& super.add(element);
}
/**
* In contrast to the superclass and to maintain set semantics, removal here is based on
* comparing the given element via {@link #equals(Object)}.
*
* @return <code>true</code> if the operation changed the head element or if is it unclear if
* the head element changed. Only returns <code>false</code> iff the head element was not
* changed by this operation.
*/
@Override
public boolean remove(@Nonnull T toRemove) {
T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
return storedElement != null && super.remove(storedElement);
}
@Override
public void clear() {
super.clear();
for (HashMap<?, ?> elementHashMap : deduplicationMapsByKeyGroup) {
elementHashMap.clear();
}
}
private HashMap<T, T> getDedupMapForKeyGroup(@Nonnegative int keyGroupId) {
return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
}
private HashMap<T, T> getDedupMapForElement(T element) {
int keyGroup =
KeyGroupRangeAssignment.assignToKeyGroup(
keyExtractor.extractKeyFromElement(element), totalNumberOfKeyGroups);
return getDedupMapForKeyGroup(keyGroup);
}
private int globalKeyGroupToLocalIndex(int keyGroup) {
checkArgument(
keyGroupRange.contains(keyGroup),
"%s does not contain key group %s",
keyGroupRange,
keyGroup);
return keyGroup - keyGroupRange.getStartKeyGroup();
}
@Nonnull
@Override
public Set<T> getSubsetForKeyGroup(int keyGroupId) {
return getDedupMapForKeyGroup(keyGroupId).keySet();
}
}