blob: e6be9bf0ebe0ca5eed858b53e64c6feef4622f07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.util.Murmur3_32Hash;
/**
* This is a sticky-key consumer selector based user provided range.
* User is responsible for making sure provided range for all consumers cover the rangeSize
* else there'll be chance that a key fall in a `whole` that not handled by any consumer.
*/
public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
private final int rangeSize;
private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
public HashRangeExclusiveStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
}
public HashRangeExclusiveStickyKeyConsumerSelector(int rangeSize) {
super();
if (rangeSize < 1) {
throw new IllegalArgumentException("range size must greater than 0");
}
this.rangeSize = rangeSize;
this.rangeMap = new ConcurrentSkipListMap<>();
}
@Override
public void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
validateKeySharedMeta(consumer);
for (IntRange intRange : consumer.getKeySharedMeta().getHashRangesList()) {
rangeMap.put(intRange.getStart(), consumer);
rangeMap.put(intRange.getEnd(), consumer);
}
}
@Override
public void removeConsumer(Consumer consumer) {
rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer));
}
@Override
public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}
@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
Map.Entry<Integer, Consumer> prev = null;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
if (prev == null) {
prev = entry;
} else {
if (prev.getValue().equals(entry.getValue())) {
result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
.add("[" + prev.getKey() + ", " + entry.getKey() + "]");
}
prev = null;
}
}
return result;
}
Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot);
Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
return ceilingConsumer;
} else {
return null;
}
} else {
return null;
}
}
private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
if (consumer.getKeySharedMeta() == null) {
throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
}
List<IntRange> ranges = consumer.getKeySharedMeta().getHashRangesList();
if (ranges.isEmpty()) {
throw new BrokerServiceException.ConsumerAssignException("Ranges for KeyShared policy must not be empty.");
}
for (IntRange intRange : ranges) {
if (intRange.getStart() > intRange.getEnd()) {
throw new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end");
}
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(intRange.getStart());
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(intRange.getEnd());
if (floorEntry != null && floorEntry.getKey() >= intRange.getStart()) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer "
+ floorEntry.getValue());
}
if (ceilingEntry != null && ceilingEntry.getKey() <= intRange.getEnd()) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer "
+ ceilingEntry.getValue());
}
if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) {
KeySharedMeta keySharedMeta = ceilingEntry.getValue().getKeySharedMeta();
for (IntRange range : keySharedMeta.getHashRangesList()) {
int start = Math.max(intRange.getStart(), range.getStart());
int end = Math.min(intRange.getEnd(), range.getEnd());
if (end >= start) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer "
+ ceilingEntry.getValue());
}
}
}
}
}
Map<Integer, Consumer> getRangeConsumer() {
return Collections.unmodifiableMap(rangeMap);
}
}