blob: c4a9fc2b77407dc8dac9404029176c581718256a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
/**
* The in-memory implementation of {@link TransactionBuffer}.
*/
class InMemTransactionBuffer implements TransactionBuffer {
/**
* A class represents the buffer of a transaction.
*/
private static class TxnBuffer implements TransactionMeta, AutoCloseable {
private final TxnID txnid;
private final SortedMap<Long, ByteBuf> entries;
private TxnStatus status;
private long committedAtLedgerId = -1L;
private long committedAtEntryId = -1L;
TxnBuffer(TxnID txnid) {
this.txnid = txnid;
this.entries = new TreeMap<>();
this.status = TxnStatus.OPEN;
}
@Override
public TxnID id() {
return txnid;
}
@Override
public synchronized TxnStatus status() {
return status;
}
@Override
public int numEntries() {
synchronized (entries) {
return entries.size();
}
}
@Override
public int numMessageInTxn() throws TransactionBufferException.TransactionStatusException {
return -1;
}
@Override
public long committedAtLedgerId() {
return committedAtLedgerId;
}
@Override
public long committedAtEntryId() {
return committedAtEntryId;
}
@Override
public long lastSequenceId() {
return entries.lastKey();
}
@Override
public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}
@Override
public CompletableFuture<Position> appendEntry(long sequenceId, Position position, int batchSize) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}
@Override
public CompletableFuture<TransactionMeta> committingTxn() {
status = TxnStatus.COMMITTING;
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) {
try {
return CompletableFuture.completedFuture(commitAt(committedAtLedgerId, committedAtEntryId));
} catch (TransactionBufferException.TransactionStatusException e) {
return FutureUtil.failedFuture(e);
}
}
@Override
public CompletableFuture<TransactionMeta> abortTxn() {
try {
return CompletableFuture.completedFuture(abort());
} catch (TransactionBufferException.TransactionStatusException e) {
return FutureUtil.failedFuture(e);
}
}
synchronized TxnBuffer abort() throws TransactionBufferException.TransactionStatusException {
if (TxnStatus.OPEN != status) {
throw new TransactionBufferException.TransactionStatusException(txnid, TxnStatus.OPEN, status);
}
this.status = TxnStatus.ABORTED;
return this;
}
synchronized TxnBuffer commitAt(long committedAtLedgerId, long committedAtEntryId)
throws TransactionBufferException.TransactionStatusException {
if (TxnStatus.OPEN != status) {
throw new TransactionBufferException.TransactionStatusException(txnid, TxnStatus.OPEN, status);
}
this.committedAtLedgerId = committedAtLedgerId;
this.committedAtEntryId = committedAtEntryId;
this.status = TxnStatus.COMMITTED;
return this;
}
@Override
public void close() {
synchronized (entries) {
entries.forEach((sequenceId, buffer) -> {
buffer.release();
});
entries.clear();
}
}
public void appendEntry(long sequenceId, ByteBuf entry) throws
TransactionBufferException.TransactionSealedException {
synchronized (this) {
if (TxnStatus.OPEN != status) {
// the transaction is not open anymore, reject the append operations
throw new TransactionBufferException
.TransactionSealedException("Transaction `" + txnid + "` is already sealed");
}
}
synchronized (this.entries) {
this.entries.put(sequenceId, entry);
}
}
public TransactionBufferReader newReader(long sequenceId) throws
TransactionBufferException.TransactionNotSealedException {
synchronized (this) {
if (TxnStatus.COMMITTED != status) {
// the transaction is not committed yet, hence the buffer is not sealed
throw new TransactionBufferException
.TransactionNotSealedException("Transaction `" + txnid + "` is not sealed yet");
}
}
final SortedMap<Long, ByteBuf> entriesToRead = new TreeMap<>();
synchronized (entries) {
SortedMap<Long, ByteBuf> subEntries = entries.tailMap(sequenceId);
subEntries.values().forEach(value -> value.retain());
entriesToRead.putAll(subEntries);
}
return new InMemTransactionBufferReader(
txnid,
entriesToRead.entrySet().iterator(),
committedAtLedgerId,
committedAtEntryId
);
}
}
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
}
@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
CompletableFuture<TransactionMeta> getFuture = new CompletableFuture<>();
try {
getFuture.complete(getTxnBufferOrThrowNotFoundException(txnID));
} catch (TransactionBufferException.TransactionNotFoundException e) {
getFuture.completeExceptionally(e);
}
return getFuture;
}
private TxnBuffer getTxnBufferOrThrowNotFoundException(TxnID txnID)
throws TransactionBufferException.TransactionNotFoundException {
TxnBuffer buffer = buffers.get(txnID);
if (null == buffer) {
throw new TransactionBufferException.TransactionNotFoundException(
"Transaction `" + txnID + "` doesn't exist in the transaction buffer");
}
return buffer;
}
private TxnBuffer getTxnBufferOrCreateIfNotExist(TxnID txnID) {
TxnBuffer buffer = buffers.get(txnID);
if (null == buffer) {
TxnBuffer newBuffer = new TxnBuffer(txnID);
TxnBuffer oldBuffer = buffers.putIfAbsent(txnID, newBuffer);
if (null != oldBuffer) {
newBuffer.close();
return oldBuffer;
} else {
return newBuffer;
}
} else {
return buffer;
}
}
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId,
long sequenceId,
ByteBuf buffer) {
TxnBuffer txnBuffer = getTxnBufferOrCreateIfNotExist(txnId);
CompletableFuture appendFuture = new CompletableFuture();
try {
txnBuffer.appendEntry(sequenceId, buffer);
appendFuture.complete(null);
} catch (TransactionBufferException.TransactionSealedException e) {
appendFuture.completeExceptionally(e);
}
return appendFuture;
}
@Override
public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(
TxnID txnID, long startSequenceId) {
CompletableFuture<TransactionBufferReader> openFuture = new CompletableFuture<>();
try {
TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
TransactionBufferReader reader = txnBuffer.newReader(startSequenceId);
openFuture.complete(reader);
} catch (TransactionBufferException.TransactionNotFoundException
| TransactionBufferException.TransactionNotSealedException e) {
openFuture.completeExceptionally(e);
}
return openFuture;
}
@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
try {
TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
synchronized (txnBuffer) {
// the committed position should be generated
long committedAtLedgerId = -1L;
long committedAtEntryId = -1L;
txnBuffer.commitAt(committedAtLedgerId, committedAtEntryId);
addTxnToTxnIdex(txnID, committedAtLedgerId);
}
commitFuture.complete(null);
} catch (TransactionBufferException.TransactionNotFoundException
| TransactionBufferException.TransactionStatusException e) {
commitFuture.completeExceptionally(e);
}
return commitFuture;
}
private void addTxnToTxnIdex(TxnID txnId, long committedAtLedgerId) {
synchronized (txnIndex) {
txnIndex
.computeIfAbsent(committedAtLedgerId, ledgerId -> new HashSet<>())
.add(txnId);
}
}
@Override
public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
try {
TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
txnBuffer.abort();
buffers.remove(txnID, txnBuffer);
abortFuture.complete(null);
} catch (TransactionBufferException.TransactionNotFoundException
| TransactionBufferException.TransactionStatusException e) {
abortFuture.completeExceptionally(e);
}
return abortFuture;
}
@Override
public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
List<TxnBuffer> buffersToPurge = new ArrayList<>();
synchronized (txnIndex) {
dataLedgers.forEach(ledger -> {
Set<TxnID> txns = txnIndex.remove(ledger);
txns.forEach(txnId -> {
TxnBuffer tb = buffers.remove(txnId);
if (null != tb) {
buffersToPurge.add(tb);
}
});
});
}
buffersToPurge.forEach(TxnBuffer::close);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> clearSnapshot() {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> closeAsync() {
buffers.values().forEach(TxnBuffer::close);
return CompletableFuture.completedFuture(null);
}
@Override
public boolean isTxnAborted(TxnID txnID) {
return false;
}
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
//no-op
}
@Override
public PositionImpl getMaxReadPosition() {
return PositionImpl.LATEST;
}
@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return null;
}
@Override
public TransactionBufferStats getStats(boolean lowWaterMarks) {
return null;
}
@Override
public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
return CompletableFuture.completedFuture(null);
}
@Override
public long getOngoingTxnCount() {
return this.buffers.values().stream()
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.OPEN)
|| txnBuffer.status.equals(TxnStatus.COMMITTING)
|| txnBuffer.status.equals(TxnStatus.ABORTING)
)
.count();
}
@Override
public long getAbortedTxnCount() {
return this.buffers.values().stream()
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.ABORTED))
.count();
}
@Override
public long getCommittedTxnCount() {
return this.buffers.values().stream()
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
.count();
}
}