blob: bd875384aecb320c18374deecc62e46d521d0f4c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;
import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-compaction")
public class CompactorTest extends MockedPulsarServiceBaseTest {
private ScheduledExecutorService compactionScheduler;
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("use",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
compactionScheduler.shutdownNow();
}
private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
long compactedLedgerId = compactor.compact(topic).get();
LedgerHandle ledger = bk.openLedger(compactedLedgerId,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac
expected.size(),
"Should have as many entries as there is keys");
List<String> keys = new ArrayList<>();
Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());
while (entries.hasMoreElements()) {
ByteBuf buf = entries.nextElement().getEntryBuffer();
RawMessage m = RawMessageImpl.deserializeFrom(buf);
String key = extractKey(m);
keys.add(key);
ByteBuf payload = extractPayload(m);
byte[] bytes = new byte[payload.readableBytes()];
payload.readBytes(bytes);
Assert.assertEquals(bytes, expected.remove(key),
"Compacted version should match expected version");
m.close();
}
if (checkMetrics) {
CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get();
long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
long lastCompactDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills();
Assert.assertTrue(compactedTopicRemovedEventCount >= 1);
Assert.assertTrue(lastCompactSucceedTimestamp >= 1L);
Assert.assertTrue(lastCompactDurationTimeInMills >= 0L);
Assert.assertEquals(lastCompactFailedTimestamp, 0L);
}
Assert.assertTrue(expected.isEmpty(), "All expected keys should have been found");
return keys;
}
@Test
public void testCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
final int numMessages = 1000;
final int maxKeys = 10;
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Map<String, byte[]> expected = new HashMap<>();
Random r = new Random(0);
for (int j = 0; j < numMessages; j++) {
int keyIndex = r.nextInt(maxKeys);
String key = "key"+keyIndex;
byte[] data = ("my-message-" + key + "-" + j).getBytes();
producer.newMessage()
.key(key)
.value(data)
.send();
expected.put(key, data);
}
compactAndVerify(topic, expected, true);
}
@Test
public void testCompactAddCompact() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Map<String, byte[]> expected = new HashMap<>();
producer.newMessage()
.key("a")
.value("A_1".getBytes())
.send();
producer.newMessage()
.key("b")
.value("B_1".getBytes())
.send();
producer.newMessage()
.key("a")
.value("A_2".getBytes())
.send();
expected.put("a", "A_2".getBytes());
expected.put("b", "B_1".getBytes());
compactAndVerify(topic, new HashMap<>(expected), false);
producer.newMessage()
.key("b")
.value("B_2".getBytes())
.send();
expected.put("b", "B_2".getBytes());
compactAndVerify(topic, expected, false);
}
@Test
public void testCompactedInOrder() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.newMessage()
.key("c")
.value("C_1".getBytes()).send();
producer.newMessage()
.key("a")
.value("A_1".getBytes()).send();
producer.newMessage()
.key("b")
.value("B_1".getBytes()).send();
producer.newMessage()
.key("a")
.value("A_2".getBytes()).send();
Map<String, byte[]> expected = new HashMap<>();
expected.put("a", "A_2".getBytes());
expected.put("b", "B_1".getBytes());
expected.put("c", "C_1".getBytes());
List<String> keyOrder = compactAndVerify(topic, expected, false);
Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a"));
}
@Test
public void testCompactEmptyTopic() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
// trigger creation of topic on server side
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();
}
@Test
public void testPhaseOneLoopTimeConfiguration() {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
}
public ByteBuf extractPayload(RawMessage m) throws Exception {
ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
Commands.skipChecksumIfPresent(payloadAndMetadata);
int metadataSize = payloadAndMetadata.readInt(); // metadata size
byte[] metadata = new byte[metadataSize];
payloadAndMetadata.readBytes(metadata);
return payloadAndMetadata.slice();
}
}