blob: d574baef34b21d29d4fae77123da72f33ea8d6e0 [file] [log] [blame] [view]
# PIP-143: Support split bundle by specified boundaries
As we all know, a namespace bundle may contain lots of topic partitions belonging to different topics.
The throughput of these topics may vary greatly. Some topics may have a very high rate/throughput while other topics have a very low rate/throughput.
These partitions with high rate/throughput can cause broker overload and bundle unloading.
At this point, if we split bundle manually with `range_equally_divide` or `topic_count_equally_divide` split algorithm, there may need many times split before these high rate/through partitions assigned to different bundles.
For convenience, we call these high throughput topics `outstanding topic` and their partitions `outstanding partition` in this PIP.
## Goal
Our goal is to make it easier to split `outstanding partition` into new bundles. So we raised up this PIP to introduce a more flexible algorithm to split namespace bundle.
The main idea is, for topics in a bundle, we can get their hash position for every topic first. After getting these hash positions, it's much easier for us to decide the position to split the bundle. We can split the bundle into either two throughput-equally bundles or multi throughput-equally bundles.
For example, there is bundle with boundaries `0x00000000` to `0x00000200`, and four topics : `t1` , `t2` , `t3` , `t4` .
**Step one. Get the hash position of these topics**
`t1` with hashcode 10
`t2` with hashcode 20
`t3` with hashcode 80
`t4` with hashcode 90
**Step two. Split the bundle**
Here we have multi choices, like :
- split the bundle into two topics equally bundles like the `topic_count_equally_divide` way, we can split at position between 21 ~ 80
- split the bundle into four bundles and each bundle has one topic, we can split at the positions 15, 50, 85
- split base on topic's throughput
- ...
## API Changes
We need two API changes for this PIP.
1. Add a new API to get the positions for one ore more topics
```JAVA
/**
* Get positions for topic list in a bundle.
*
* @param namespace
* @param bundle range of bundle
* @param topicList
* @return hash positions for all topics in topicList
* @throws PulsarAdminException
*/
TopicHashPositions getTopicHashPositions(String namespace, String bundle, List<String> topicList) throws PulsarAdminException;
```
2. Change the bundle split API to supporting split bundle at one or more specified hash positions
```JAVA
/**
* Split namespace bundle.
*
* @param namespace
* @param bundle range of bundle to split
* @param unloadSplitBundles
* @param splitAlgorithmName
* @param splitBoundaries
* @throws PulsarAdminException
*/
void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,
String splitAlgorithmName, List<Long> splitBoundaries) throws PulsarAdminException;
```
## Implementation
### New API for getting topics positions
Add a new admin command `GetTopicHashPositions` for `CmdNamespaces`,
```java
private class GetTopicHashPositions extends CliCommand {
@Parameter(
names = { "--bundle", "-b" },
description = "{start-boundary}_{end-boundary} format namespace bundle",
required = false)
private String bundle;
@Parameter(
names = { "--topic-list", "-tl" },
description = "The list of topics to get posisions in this bunel",
required = false)
private List<String> topicList;
}
```
Add a new GET method `getTopicHashPositions` for `Namespaces`
```java
@GET
@Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
@ApiOperation(value = "Get hash positions for topics")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public TopicHashPositions getTopicHashPositions(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("topicList") List<String> topicList) {
validateNamespaceName(tenant, namespace);
return internalGetTopicHashPositions(bundleRange, new ArrayList<>(topicList));
}
```
### Add support for the split bundle by specified hash positions
Change the admin API to support split bundle by specified hash positions(split boundaries) in `CmdNamespaces`,
```java
private class SplitBundle extends CliCommand {
@Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split "
+ "namespace bundle. Valid options are: [range_equally_divide, topic_count_equally_divide, "
+ "specified_positions_divide]. Use broker side config if absent", required = false)
private String splitAlgorithmName;
@Parameter(names = { "--split-boundaries",
"-sb" }, description = "Specified split boundary for bundle split, will split one bundle "
+ "to multi bundles only works with specified_positions_divide algorithm", required = false)
private List<Long> splitBoundaries;
```
Change the method of `Namespaces`, adding a parameter for split boundaries.
```java
public void splitNamespaceBundle(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@QueryParam("splitBoundaries") List<Long> splitBoundaries) {
```
For code consistency, encapsulates all the parameters for bundle split into a new class `BundleSplitOption`
```java
public class BundleSplitOption {
private NamespaceService service;
private NamespaceBundle bundle;
private List<Long> positions;
}
```
Then add a new `NamespaceBundleSplitAlgorithm ` named `SpecifiedPositionsBundleSplitAlgorithm` which can valid the split boundaries and return the final split boundaries.
```java
public class SpecifiedPositionsBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm{
@Override
public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
NamespaceService service = bundleSplitOption.getService();
NamespaceBundle bundle = bundleSplitOption.getBundle();
List<Long> positions = bundleSplitOption.getPositions();
if (positions == null || positions.size() == 0) {
return CompletableFuture.completedFuture(null);
}
// sort all positions
Collections.sort(positions);
return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
if (topics == null || topics.size() <= 1) {
return CompletableFuture.completedFuture(null);
}
List<Long> splitBoundaries = positions
.stream()
.filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint())
.collect(Collectors.toList());
if (splitBoundaries.size() == 0) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.completedFuture(splitBoundaries);
});
}
}
```
Also, add the new bundle split algorithm to conf/broker.conf
```shell
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide
```
## Reject Alternatives
Splitting the bundle by `outstanding topic` which will split the bundle into two new bundles and each new bundle contains an equally `outstanding partition` once a time. This algorithm has a disadvantage, it can only deal with one `outstanding topic`.