blob: 09c3ebe4193942c3a7cbaf4e4ece3574b1687816 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Deleted;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Init;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-compaction")
public class ServiceUnitStateCompactionTest extends MockedPulsarServiceBaseTest {
private ScheduledExecutorService compactionScheduler;
private BookKeeper bk;
private Schema<ServiceUnitStateData> schema;
private ServiceUnitStateCompactionStrategy strategy;
private ServiceUnitState testState = Init;
private ServiceUnitStateData testData = null;
private static Random RANDOM = new Random();
private ServiceUnitStateData testValue(ServiceUnitState state, String broker) {
if (state == Init) {
testData = null;
} else {
testData = new ServiceUnitStateData(state, broker, versionId(testData) + 1);
}
return testData;
}
private ServiceUnitStateData testValue(String broker) {
testState = nextValidStateNonSplit(testState);
return testValue(testState, broker);
}
private ServiceUnitState nextValidState(ServiceUnitState from) {
List<ServiceUnitState> candidates = Arrays.stream(ServiceUnitState.values())
.filter(to -> isValidTransition(from, to))
.collect(Collectors.toList());
var state= candidates.get(RANDOM.nextInt(candidates.size()));
return state;
}
private ServiceUnitState nextValidStateNonSplit(ServiceUnitState from) {
List<ServiceUnitState> candidates = Arrays.stream(ServiceUnitState.values())
.filter(to -> to != Init && to != Splitting && to != Deleted
&& isValidTransition(from, to))
.collect(Collectors.toList());
var state= candidates.get(RANDOM.nextInt(candidates.size()));
return state;
}
private ServiceUnitState nextInvalidState(ServiceUnitState from) {
List<ServiceUnitState> candidates = Arrays.stream(ServiceUnitState.values())
.filter(to -> !isValidTransition(from, to))
.collect(Collectors.toList());
if (candidates.size() == 0) {
return Init;
}
return candidates.get(RANDOM.nextInt(candidates.size()));
}
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
schema = Schema.JSON(ServiceUnitStateData.class);
strategy = new ServiceUnitStateCompactionStrategy();
strategy.checkBrokers(false);
testState = Init;
testData = null;
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
bk.close();
if (compactionScheduler != null) {
compactionScheduler.shutdownNow();
}
}
public record TestData(
String topic,
Map<String, ServiceUnitStateData> expected,
List<Pair<String, ServiceUnitStateData>> all) {
}
TestData generateTestData() throws PulsarAdminException, PulsarClientException {
String topic = "persistent://my-property/use/my-ns/my-topic1";
final int numMessages = 20;
final int maxKeys = 5;
// Configure retention to ensue data is retained for reader
admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1));
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Map<String, ServiceUnitStateData> expected = new HashMap<>();
List<Pair<String, ServiceUnitStateData>> all = new ArrayList<>();
Random r = new Random(0);
pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub1")
.readCompacted(true)
.subscribe().close();
for (int j = 0; j < numMessages; j++) {
int keyIndex = r.nextInt(maxKeys);
String key = "key" + keyIndex;
ServiceUnitStateData prev = expected.get(key);
ServiceUnitState prevState = state(prev);
boolean invalid = r.nextBoolean();
ServiceUnitState state = invalid ? nextInvalidState(prevState) :
nextValidState(prevState);
ServiceUnitStateData value;
long versionId = versionId(prev) + 1;
if (invalid) {
value = new ServiceUnitStateData(state, key + ":" + j, false, versionId);
} else {
if (state == Init) {
value = new ServiceUnitStateData(state, key + ":" + j, true, versionId);
} else {
value = new ServiceUnitStateData(state, key + ":" + j, false, versionId);
}
}
producer.newMessage().key(key).value(value).send();
if (!strategy.shouldKeepLeft(prev, value)) {
expected.put(key, value);
}
all.add(Pair.of(key, value));
}
return new TestData(topic, expected, all);
}
@Test
public void testCompaction() throws Exception {
TestData testData = generateTestData();
var topic = testData.topic;
var expected = testData.expected;
var all = testData.all;
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
// Compacted topic ledger should have same number of entry equals to number of unique key.
//Assert.assertEquals(internalStats.compactedLedger.entries, expected.size());
Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
Assert.assertFalse(internalStats.compactedLedger.offloaded);
// consumer with readCompacted enabled only get compacted entries
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
while (true) {
Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
if (expected.isEmpty()) {
break;
}
}
Assert.assertTrue(expected.isEmpty());
}
// can get full backlog if read compacted disabled
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(false).subscribe()) {
while (true) {
Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
Pair<String, ServiceUnitStateData> expectedMessage = all.remove(0);
Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
Assert.assertEquals(expectedMessage.getRight(), m.getValue());
if (all.isEmpty()) {
break;
}
}
Assert.assertTrue(all.isEmpty());
}
}
@Test
public void testCompactionWithReader() throws Exception {
TestData testData = generateTestData();
var topic = testData.topic;
var expected = testData.expected;
var all = testData.all;
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
// consumer with readCompacted enabled only get compacted entries
try (Reader<ServiceUnitStateData> reader = pulsarClient.newReader(schema).topic(topic).readCompacted(true)
.startMessageId(MessageId.earliest).create()) {
while (true) {
Message<ServiceUnitStateData> m = reader.readNext(2, TimeUnit.SECONDS);
Assert.assertEquals(expected.remove(m.getKey()), m.getValue());
if (expected.isEmpty()) {
break;
}
}
Assert.assertTrue(expected.isEmpty());
}
// can get full backlog if read compacted disabled
try (Reader<ServiceUnitStateData> reader = pulsarClient.newReader(schema).topic(topic).readCompacted(false)
.startMessageId(MessageId.earliest).create()) {
while (true) {
Message<ServiceUnitStateData> m = reader.readNext(2, TimeUnit.SECONDS);
Pair<String, ServiceUnitStateData> expectedMessage = all.remove(0);
Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
Assert.assertEquals(expectedMessage.getRight(), m.getValue());
if (all.isEmpty()) {
break;
}
}
Assert.assertTrue(all.isEmpty());
}
}
@Test
public void testCompactionWithTableview() throws Exception {
var tv = pulsar.getClient().newTableViewBuilder(schema)
.topic("persistent://my-property/use/my-ns/my-topic1")
.loadConf(Map.of(
"topicCompactionStrategyClassName",
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
((ServiceUnitStateCompactionStrategy)
FieldUtils.readDeclaredField(tv, "compactionStrategy", true))
.checkBrokers(false);
TestData testData = generateTestData();
var topic = testData.topic;
var expected = testData.expected;
var expectedCopy = new HashMap<>(expected);
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(expectedCopy.size(), tv.size()));
for(var etr : tv.entrySet()){
Assert.assertEquals(expectedCopy.remove(etr.getKey()), etr.getValue());
if (expectedCopy.isEmpty()) {
break;
}
}
Assert.assertTrue(expectedCopy.isEmpty());
tv.close();;
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
// consumer with readCompacted enabled only get compacted entries
var tableview = pulsar.getClient().newTableView(schema)
.topic(topic)
.loadConf(Map.of(
"topicCompactionStrategyClassName",
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
for(var etr : tableview.entrySet()){
Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue());
if (expected.isEmpty()) {
break;
}
}
Assert.assertTrue(expected.isEmpty());
tableview.close();
}
@Test
public void testReadCompactedBeforeCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
String key = "key0";
var testValues = Arrays.asList(
testValue("content0"), testValue("content1"), testValue("content2"));
for (var val : testValues) {
producer.newMessage().key(key).value(val).send();
}
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(0));
m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(1));
m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(2));
}
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(2));
}
}
@Test
public void testReadEntriesAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
String key = "key0";
var testValues = Arrays.asList(
testValue( "content0"),
testValue("content1"),
testValue( "content2"),
testValue("content3"));
producer.newMessage().key(key).value(testValues.get(0)).send();
producer.newMessage().key(key).value(testValues.get(1)).send();
producer.newMessage().key(key).value(testValues.get(2)).send();
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
producer.newMessage().key(key).value(testValues.get(3)).send();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(2));
m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(3));
}
}
@Test
public void testSeekEarliestAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
String key = "key0";
var testValues = Arrays.asList(
testValue("content0"),
testValue("content1"),
testValue("content2"));
for (var val : testValues) {
producer.newMessage().key(key).value(val).send();
}
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
consumer.seek(MessageId.earliest);
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(2));
}
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(false).subscribe()) {
consumer.seek(MessageId.earliest);
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(0));
m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(1));
m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(2));
}
}
@Test
public void testSlowTableviewAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
String strategyClassName = "topicCompactionStrategyClassName";
strategy.checkBrokers(true);
pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub1")
.readCompacted(true)
.subscribe().close();
var fastTV = pulsar.getClient().newTableViewBuilder(schema)
.topic(topic)
.subscriptionName("fastTV")
.loadConf(Map.of(
strategyClassName,
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
var defaultConf = getDefaultConf();
var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
var pulsar2 = additionalPulsarTestContext.getPulsarService();
var slowTV = pulsar2.getClient().newTableViewBuilder(schema)
.topic(topic)
.subscriptionName("slowTV")
.loadConf(Map.of(
strategyClassName,
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
var semaphore = new Semaphore(0);
AtomicBoolean handledReleased = new AtomicBoolean(false);
slowTV.listen((k, v) -> {
if (v.state() == Assigning) {
try {
// Stuck at handling Assigned
handledReleased.set(false);
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else if (v.state() == Releasing) {
handledReleased.set(true);
}
});
// Configure retention to ensue data is retained for reader
admin.namespaces().setRetention("my-property/use/my-ns",
new RetentionPolicies(-1, -1));
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
String bundle = "bundle1";
String src = "broker0";
String dst = "broker1";
long versionId = 1;
producer.newMessage().key(bundle).value(new ServiceUnitStateData(Owned, src, versionId++)).send();
for (int i = 0; i < 3; i++) {
var releasedStateData = new ServiceUnitStateData(Releasing, dst, src, versionId++);
producer.newMessage().key(bundle).value(releasedStateData).send();
producer.newMessage().key(bundle).value(releasedStateData).send();
var assignedStateData = new ServiceUnitStateData(Assigning, dst, src, versionId++);
producer.newMessage().key(bundle).value(assignedStateData).send();
producer.newMessage().key(bundle).value(assignedStateData).send();
var ownedStateData = new ServiceUnitStateData(Owned, dst, src, versionId++);
producer.newMessage().key(bundle).value(ownedStateData).send();
producer.newMessage().key(bundle).value(ownedStateData).send();
compactor.compact(topic, strategy).get();
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(fastTV.get(bundle), ownedStateData));
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(slowTV.get(bundle), assignedStateData));
assertTrue(!handledReleased.get());
semaphore.release();
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(slowTV.get(bundle), ownedStateData));
var newTv = pulsar.getClient().newTableView(schema)
.topic(topic)
.loadConf(Map.of(
strategyClassName,
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(newTv.get(bundle), ownedStateData));
src = dst;
dst = "broker" + (i + 2);
newTv.close();
}
producer.close();
slowTV.close();
fastTV.close();
pulsar2.close();
}
@Test
public void testSlowReceiveTableviewAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
String strategyClassName = "topicCompactionStrategyClassName";
pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub1")
.readCompacted(true)
.subscribe().close();
var tv = pulsar.getClient().newTableViewBuilder(schema)
.topic(topic)
.subscriptionName("slowTV")
.loadConf(Map.of(
strategyClassName,
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
// Configure retention to ensue data is retained for reader
admin.namespaces().setRetention("my-property/use/my-ns",
new RetentionPolicies(-1, -1));
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
var reader = ((CompletableFuture<ReaderImpl<ServiceUnitStateData>>) FieldUtils
.readDeclaredField(tv, "reader", true)).get();
var consumer = spy(reader.getConsumer());
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
String bundle = "bundle1";
final AtomicInteger versionId = new AtomicInteger(0);
final AtomicInteger cnt = new AtomicInteger(1);
int msgAddCount = 1000; // has to be big enough to cover compacted cursor fast-forward.
doAnswer(invocationOnMock -> {
if (cnt.decrementAndGet() == 0) {
var msg = consumer.receiveAsync();
for (int i = 0; i < msgAddCount; i++) {
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
versionId.get())).send();
}
compactor.compact(topic, strategy).join();
return msg;
}
// Call the real method
reset(consumer);
return consumer.receiveAsync();
}).when(consumer).receiveAsync();
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker", true,
versionId.incrementAndGet())).send();
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
versionId.get())).send();
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
var val = tv.get(bundle);
assertNotNull(val);
assertEquals(val.dstBroker(), "broker" + versionId.get());
}
);
producer.close();
tv.close();
}
@Test
public void testBrokerRestartAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
String key = "key0";
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
var testValues = Arrays.asList(
testValue("content0"), testValue("content1"), testValue("content2"));
for (var val : testValues) {
producer.newMessage().key(key).value(val).send();
}
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(testValues.size() - 1));
}
stopBroker();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
consumer.receive();
Assert.fail("Shouldn't have been able to receive anything");
} catch (PulsarClientException e) {
// correct behaviour
}
startBroker();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), key);
Assert.assertEquals(m.getValue(), testValues.get(testValues.size() - 1));
}
}
@Test
public void testCompactEmptyTopic() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
var testValue = testValue( "content0");
producer.newMessage().key("key0").value(testValue).send();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive();
Assert.assertEquals(m.getKey(), "key0");
Assert.assertEquals(m.getValue(), testValue);
}
}
@Test
public void testWholeBatchCompactedOut() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe().close();
try (Producer<ServiceUnitStateData> producerNormal = pulsarClient.newProducer(schema).topic(topic)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Producer<ServiceUnitStateData> producerBatch = pulsarClient.newProducer(schema).topic(topic)
.maxPendingMessages(3)
.enableBatching(true)
.batchingMaxMessages(3)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create()) {
producerBatch.newMessage().key("key1").value(testValue("my-message-1")).sendAsync();
producerBatch.newMessage().key("key1").value(testValue( "my-message-2")).sendAsync();
producerBatch.newMessage().key("key1").value(testValue("my-message-3")).sendAsync();
producerNormal.newMessage().key("key1").value(testValue( "my-message-4")).send();
}
// compact the topic
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> message = consumer.receive();
Assert.assertEquals(message.getKey(), "key1");
Assert.assertEquals(new String(message.getValue().dstBroker()), "my-message-4");
}
}
public void testCompactionWithLastDeletedKey() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema).topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
producer.newMessage().key("1").value(testValue("1")).send();
producer.newMessage().key("2").value(testValue("3")).send();
producer.newMessage().key("3").value(testValue( "5")).send();
producer.newMessage().key("1").value(null).send();
producer.newMessage().key("2").value(null).send();
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
Set<String> expected = Sets.newHashSet("3");
// consumer with readCompacted enabled only get compacted entries
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
assertTrue(expected.remove(m.getKey()));
}
}
@Test(timeOut = 20000)
public void testEmptyCompactionLedger() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema).topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
producer.newMessage().key("1").value(testValue(Owned, "1")).send();
producer.newMessage().key("2").value(testValue(Owned, "3")).send();
producer.newMessage().key("1").value(null).send();
producer.newMessage().key("2").value(null).send();
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
// consumer with readCompacted enabled only get compacted entries
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
}
}
@Test(timeOut = 20000)
public void testAllEmptyCompactionLedger() throws Exception {
final String topic =
"persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString();
final int messages = 10;
// 1.create producer and publish message to the topic.
ProducerBuilder<ServiceUnitStateData> builder = pulsarClient.newProducer(schema)
.compressionType(MSG_COMPRESSION_TYPE).topic(topic);
builder.batchingMaxMessages(messages / 5);
Producer<ServiceUnitStateData> producer = builder.create();
List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key("1").value(null).sendAsync());
}
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
// consumer with readCompacted enabled only get compacted entries
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
Message<ServiceUnitStateData> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
}
}
@Test(timeOut = 20000)
public void testCompactMultipleTimesWithoutEmptyMessage()
throws PulsarClientException, ExecutionException, InterruptedException {
final String topic =
"persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID()
.toString();
final int messages = 10;
final String key = "1";
// 1.create producer and publish message to the topic.
ProducerBuilder<ServiceUnitStateData> builder = pulsarClient.newProducer(schema).topic(topic);
builder.compressionType(MSG_COMPRESSION_TYPE);
builder.enableBatching(true);
Producer<ServiceUnitStateData> producer = builder.create();
List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key(key).value(testValue((i + ""))).sendAsync());
}
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
// 3. Send more ten messages
futures.clear();
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key(key).value(testValue((i + 10 + ""))).sendAsync());
}
FutureUtil.waitForAll(futures).get();
// 4.compact again.
compactor.compact(topic, strategy).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
Message<ServiceUnitStateData> m1 = consumer.receive();
assertNotNull(m1);
assertEquals(m1.getKey(), key);
assertEquals(m1.getValue().dstBroker(), "19");
Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
assertNull(none);
}
}
@Test(timeOut = 200000)
public void testReadUnCompacted()
throws PulsarClientException, ExecutionException, InterruptedException {
final String topic = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString();
final int messages = 10;
final String key = "1";
// 1.create producer and publish message to the topic.
ProducerBuilder<ServiceUnitStateData> builder = pulsarClient.newProducer(schema).topic(topic);
builder.compressionType(MSG_COMPRESSION_TYPE);
builder.batchingMaxMessages(messages / 5);
Producer<ServiceUnitStateData> producer = builder.create();
List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key(key).value(testValue((i + ""))).sendAsync());
}
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic, strategy).get();
// 3. Send more ten messages
futures.clear();
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key(key).value(testValue((i + 10 + ""))).sendAsync());
}
FutureUtil.waitForAll(futures).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub1")
.readCompacted(true)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
for (int i = 0; i < 11; i++) {
Message<ServiceUnitStateData> received = consumer.receive();
assertNotNull(received);
assertEquals(received.getKey(), key);
assertEquals(received.getValue().dstBroker(), i + 9 + "");
consumer.acknowledge(received);
}
Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
assertNull(none);
}
// 4.Send empty message to delete the key-value in the compacted topic.
producer.newMessage().key(key).value(null).send();
// 5.compact the topic.
compactor.compact(topic, strategy).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub2")
.readCompacted(true)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
assertNull(none);
}
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key(key).value(testValue((i + 20 + ""))).sendAsync());
}
FutureUtil.waitForAll(futures).get();
try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub3")
.readCompacted(true)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
for (int i = 0; i < 10; i++) {
Message<ServiceUnitStateData> received = consumer.receive();
assertNotNull(received);
assertEquals(received.getKey(), key);
assertEquals(received.getValue().dstBroker(), i + 20 + "");
consumer.acknowledge(received);
}
Message<ServiceUnitStateData> none = consumer.receive(2, TimeUnit.SECONDS);
assertNull(none);
}
}
public static long versionId(ServiceUnitStateData data) {
return data == null ? ServiceUnitStateChannelImpl.VERSION_ID_INIT - 1 : data.versionId();
}
}