blob: 7d3b5863cb6f6ffcd638e4c6b5304c4649d497dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Compaction will go through the topic in two passes. The first pass
* selects latest offset for each key in the topic. Then the second pass
* writes these values to a ledger.
*
* <p>The two passes are required to avoid holding the payloads of each of
* the latest values in memory, as the payload can be many orders of
* magnitude larger than a message id.
*/
public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private final Duration phaseOneLoopReadTimeout;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
}
@Override
protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
return reader.hasMessageAvailableAsync()
.thenCompose(available -> {
if (available) {
return phaseOne(reader).thenCompose(
(r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
} else {
log.info("Skip compaction of the empty topic {}", reader.getTopic());
return CompletableFuture.completedFuture(-1L);
}
});
}
private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
Map<String, MessageId> latestForKey = new HashMap<>();
CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<>();
reader.getLastMessageIdAsync()
.thenAccept(lastMessageId -> {
log.info("Commencing phase one of compaction for {}, reading to {}",
reader.getTopic(), lastMessageId);
// Each entry is processed as a whole, discard the batchIndex part deliberately.
MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(),
lastImpl.getPartitionIndex());
phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey,
loopPromise);
}).exceptionally(ex -> {
loopPromise.completeExceptionally(ex);
return null;
});
return loopPromise;
}
private void phaseOneLoop(RawReader reader,
Optional<MessageId> firstMessageId,
Optional<MessageId> toMessageId,
MessageId lastMessageId,
Map<String, MessageId> latestForKey,
CompletableFuture<PhaseOneResult> loopPromise) {
if (loopPromise.isDone()) {
return;
}
CompletableFuture<RawMessage> future = reader.readNextAsync();
FutureUtil.addTimeoutHandling(future,
phaseOneLoopReadTimeout, scheduler,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
future.thenAcceptAsync(m -> {
try {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(e.getMiddle());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
}
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
if (keyAndSize != null) {
if (keyAndSize.getRight() > 0) {
MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
if (id.compareTo(lastMessageId) == 0) {
loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to,
lastMessageId, latestForKey));
} else {
phaseOneLoop(reader,
Optional.ofNullable(first),
Optional.ofNullable(to),
lastMessageId,
latestForKey, loopPromise);
}
} finally {
m.close();
}
}, scheduler).exceptionally(ex -> {
loopPromise.completeExceptionally(ex);
return null;
});
}
private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
Map<String, MessageId> latestForKey, BookKeeper bk) {
Map<String, byte[]> metadata =
LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(), to.toByteArray());
return createLedger(bk, metadata).thenCompose((ledger) -> {
log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger);
});
}
private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to,
MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
CompletableFuture<Long> promise = new CompletableFuture<>();
reader.seekAsync(from).thenCompose((v) -> {
Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
CompletableFuture<Void> loopPromise = new CompletableFuture<>();
phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
return loopPromise;
}).thenCompose((v) -> closeLedger(ledger))
.thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
.whenComplete((res, exception) -> {
if (exception != null) {
deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
if (exception2 != null) {
log.warn("Cleanup of ledger {} for failed", ledger, exception2);
}
// complete with original exception
promise.completeExceptionally(exception);
});
} else {
promise.complete(ledger.getId());
}
});
return promise;
}
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
if (promise.isDone()) {
return;
}
reader.readNextAsync().thenAcceptAsync(m -> {
if (promise.isDone()) {
m.close();
return;
}
try {
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = rebatchMessage(
m, (key, subid) -> subid.equals(latestForKey.get(key)));
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
messageToAdd = Optional.of(m);
}
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
promise.completeExceptionally(new IllegalArgumentException(
"Compaction phase found empty record from sorted key-map"));
}
messageToAdd = Optional.of(m);
}
}
if (messageToAdd.isPresent()) {
RawMessage message = messageToAdd.get();
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message, reader.getTopic())
.whenComplete((res, exception2) -> {
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
} finally {
if (message != m) {
message.close();
}
}
} else if (to.equals(id)) {
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so,
// not present under latestForKey. Complete the compaction.
try {
// make sure all inflight writes have finished
outstanding.acquire(MAX_OUTSTANDING);
promise.complete(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
promise.completeExceptionally(e);
}
return;
}
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
} finally {
m.close();
}
}, scheduler).exceptionally(ex -> {
promise.completeExceptionally(ex);
return null;
});
}
protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String, byte[]> metadata) {
CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
try {
bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
conf.getManagedLedgerDefaultWriteQuorum(),
conf.getManagedLedgerDefaultAckQuorum(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
(rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(ledger);
}
}, null, metadata);
} catch (Throwable t) {
log.error("Encountered unexpected error when creating compaction ledger", t);
return FutureUtil.failedFuture(t);
}
return bkf;
}
protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
try {
bk.asyncDeleteLedger(lh.getId(),
(rc, ctx) -> {
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(null);
}
}, null);
} catch (Throwable t) {
return FutureUtil.failedFuture(t);
}
return bkf;
}
protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
try {
lh.asyncClose((rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(null);
}
}, null);
} catch (Throwable t) {
return FutureUtil.failedFuture(t);
}
return bkf;
}
private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m, String topic) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
ByteBuf serialized = m.serialize();
try {
mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes());
long start = System.nanoTime();
lh.asyncAddEntry(serialized,
(rc, ledger, eid, ctx) -> {
mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(null);
}
}, null);
} catch (Throwable t) {
return FutureUtil.failedFuture(t);
}
return bkf;
}
protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
if (msgMetadata.hasPartitionKey()) {
int size = headersAndPayload.readableBytes();
if (msgMetadata.hasUncompressedSize()) {
size = msgMetadata.getUncompressedSize();
}
return Pair.of(msgMetadata.getPartitionKey(), size);
} else {
return null;
}
}
protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
throws IOException {
return RawBatchConverter.extractIdsAndKeysAndSize(msg);
}
protected Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter)
throws IOException {
return RawBatchConverter.rebatchMessage(msg, filter);
}
private static class PhaseOneResult {
final MessageId from;
final MessageId to; // last undeleted messageId
final MessageId lastReadId; // last read messageId
final Map<String, MessageId> latestForKey;
PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey) {
this.from = from;
this.to = to;
this.lastReadId = lastReadId;
this.latestForKey = latestForKey;
}
}
public long getPhaseOneLoopReadTimeoutInSeconds() {
return phaseOneLoopReadTimeout.getSeconds();
}
}