kafka-1984; java producer may miss an available partition; patched by Jun Rao; reviewed by Ewen Cheslack-Postava, Jay Kreps, and Guozhang Wang
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index 483899d..f5abdd1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -55,14 +55,15 @@
+ "].");
return record.partition();
} else if (record.key() == null) {
- // choose the next available node in a round-robin fashion
- for (int i = 0; i < numPartitions; i++) {
- int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
- if (partitions.get(partition).leader() != null)
- return partition;
+ int nextValue = counter.getAndIncrement();
+ List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
+ if (availablePartitions.size() > 0) {
+ int part = Utils.abs(nextValue) % availablePartitions.size();
+ return availablePartitions.get(part).partition();
+ } else {
+ // no partitions are available, give a non-available partition
+ return Utils.abs(nextValue) % numPartitions;
}
- // no partitions are available, give a non-available partition
- return Utils.abs(counter.getAndIncrement()) % numPartitions;
} else {
// hash the key to choose a partition
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index d3299b9..1e54527 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -25,6 +25,7 @@
private final List<Node> nodes;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
+ private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
/**
@@ -63,8 +64,18 @@
}
}
this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
- for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet())
- this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+ this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+ for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
+ String topic = entry.getKey();
+ List<PartitionInfo> partitionList = entry.getValue();
+ this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
+ List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
+ for (PartitionInfo part : partitionList) {
+ if (part.leader() != null)
+ availablePartitions.add(part);
+ }
+ this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
+ }
this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
@@ -130,6 +141,15 @@
}
/**
+ * Get the list of available partitions for this topic
+ * @param topic The topic name
+ * @return A list of partitions
+ */
+ public List<PartitionInfo> availablePartitionsForTopic(String topic) {
+ return this.availablePartitionsByTopic.get(topic);
+ }
+
+ /**
* Get the list of partitions whose leader is this node
* @param nodeId The node id
* @return A list of partitions
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index 1002f05..2519ce4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -38,9 +38,10 @@
private Node node2 = new Node(2, "localhost", 101);
private Node[] nodes = new Node[] { node0, node1, node2 };
private String topic = "test";
- private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes),
- new PartitionInfo(topic, 1, node1, nodes, nodes),
- new PartitionInfo(topic, 2, null, nodes, nodes));
+ // Intentionally make the partition list not in partition order to test the edge cases.
+ private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
+ new PartitionInfo(topic, 2, node1, nodes, nodes),
+ new PartitionInfo(topic, 0, node0, nodes, nodes));
private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
@Test
@@ -59,20 +60,19 @@
}
@Test
- public void testRoundRobinIsStable() {
- int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
+ public void testRoundRobinWithUnavailablePartitions() {
+ // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
+ // and (2) the available partitions are selected in a round robin way.
+ int countForPart0 = 0;
+ int countForPart2 = 0;
for (int i = 1; i <= 100; i++) {
- int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
- assertEquals("Should yield a different partition each call with round-robin partitioner",
- partition, (startPart + i) % 2);
- }
- }
-
- @Test
- public void testRoundRobinWithDownNode() {
- for (int i = 0; i < partitions.size(); i++) {
int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
- assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
+ assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
+ if (part == 0)
+ countForPart0++;
+ else
+ countForPart2++;
}
+ assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
}
}