Fix CPU 100% when deleting namespace (#10337) (#10454)
When deleting the namespace, the namespace Policies will be marked as deleted.
This will trigger topic's `onPoliciesUpdate`
However, in onPoliciesUpdate, the data of the Policies node on zk will be read, such as: `checkReplicationAndRetryOnFailure`
Due to the deletion of the namespace, the zk node may no longer exist at this time.
Failure to read data will trigger infinite retries.
https://github.com/apache/pulsar/blob/e970c2947aff9231202ab72bdbad047d85c55633/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1175-L1193
If there are many topics, there will be a short-term CPU spike
![image](https://user-images.githubusercontent.com/9758905/115834541-ebc32480-a447-11eb-887a-95c4a3d1adf1.png)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d301443..d87707a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1087,7 +1087,8 @@
return closeFuture;
}
- private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
+ @VisibleForTesting
+ CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
@@ -2059,6 +2060,10 @@
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
+ if (data.deleted) {
+ log.debug("Ignore the update because it has been deleted : {}", data);
+ return CompletableFuture.completedFuture(null);
+ }
isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
new file mode 100644
index 0000000..f4d11b8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class PersistentTopicTest extends BrokerTestBase {
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testDeleteNamespaceInfiniteRetry() throws Exception {
+ //init namespace
+ final String myNamespace = "prop/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
+ final String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry";
+ //init topic and policies
+ pulsarClient.newProducer().topic(topic).create().close();
+ admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
+
+ PersistentTopic persistentTopic =
+ spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get());
+
+ Policies policies = new Policies();
+ policies.deleted = true;
+ persistentTopic.onPoliciesUpdate(policies);
+ verify(persistentTopic, times(0)).checkReplicationAndRetryOnFailure();
+
+ policies.deleted = false;
+ persistentTopic.onPoliciesUpdate(policies);
+ verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure();
+ }
+}
\ No newline at end of file