blob: 3f9a083787bb5fdd56cb646819c4f34e4be8e13f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@Slf4j
public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
private final ConcurrentSkipListMap<Long, OpRequestSend> outstandingRequests;
private final GrowableArrayBlockingQueue<OpRequestSend> pendingRequests;
private final AtomicLong requestIdGenerator = new AtomicLong();
private final long operationTimeoutInMills;
private final HashedWheelTimer timer;
private final PulsarService pulsarService;
private final PulsarClientImpl pulsarClient;
private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits");
private volatile int requestCredits;
public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
this.pulsarService = pulsarService;
this.pulsarClient = (PulsarClientImpl) pulsarService.getClient();
this.outstandingRequests = new ConcurrentSkipListMap<>();
this.pendingRequests = new GrowableArrayBlockingQueue<>();
this.operationTimeoutInMills = operationTimeoutInMills;
this.timer = timer;
this.requestCredits = Math.max(100, maxConcurrentRequests);
}
@Override
public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits,
TxnAction action, long lowWaterMark) {
if (log.isDebugEnabled()) {
log.debug("[{}] endTxnOnTopic txnId: [{}], txnAction: [{}]",
topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue());
}
CompletableFuture<TxnID> cb = new CompletableFuture<>();
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits,
topic, action, lowWaterMark);
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic));
if (checkRequestCredits(op)) {
endTxn(op);
}
return cb;
}
@Override
public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription,
long txnIdMostBits, long txnIdLeastBits,
TxnAction action, long lowWaterMark) {
if (log.isDebugEnabled()) {
log.debug("[{}] endTxnOnSubscription txnId: [{}], txnAction: [{}]",
topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue());
}
CompletableFuture<TxnID> cb = new CompletableFuture<>();
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
topic, subscription, action, lowWaterMark);
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic));
if (checkRequestCredits(op)) {
endTxn(op);
}
return cb;
}
private boolean checkRequestCredits(OpRequestSend op) {
int currentPermits = REQUEST_CREDITS_UPDATER.get(this);
if (currentPermits > 0 && pendingRequests.peek() == null) {
if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits, currentPermits - 1)) {
return true;
} else {
return checkRequestCredits(op);
}
} else {
pendingRequests.add(op);
return false;
}
}
public void endTxn(OpRequestSend op) {
op.cnx.whenComplete((clientCnx, ex) -> {
if (ex == null) {
if (clientCnx.ctx().channel().isActive()) {
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
outstandingRequests.put(op.requestId, op);
timer.newTimeout(timeout -> {
OpRequestSend peek = outstandingRequests.remove(op.requestId);
if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
peek.cb.completeExceptionally(new TransactionBufferClientException
.RequestTimeoutException());
onResponse(peek);
}
}, operationTimeoutInMills, TimeUnit.MILLISECONDS);
op.cmd.retain();
clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
} else {
op.cb.completeExceptionally(
new PulsarClientException.LookupException(op.topic + " endTxn channel is not active"));
onResponse(op);
}
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.error("endTxn error topic: [{}]", op.topic, cause);
if (cause instanceof PulsarClientException.BrokerMetadataException) {
op.cb.complete(null);
} else {
op.cb.completeExceptionally(
new PulsarClientException.LookupException(cause.getMessage()));
}
onResponse(op);
}
});
}
@Override
public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
OpRequestSend op = outstandingRequests.remove(requestId);
if (op == null) {
if (log.isDebugEnabled()) {
log.debug("Got end txn on topic response for timeout {} - {}", response.getTxnidMostBits(),
response.getTxnidLeastBits());
}
return;
}
try {
if (!response.hasError()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Got end txn on topic response for for request {}", op.topic,
response.getRequestId());
}
op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
} else {
log.error("[{}] Got end txn on topic response for request {} error {}", op.topic,
response.getRequestId(),
response.getError());
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
}
} finally {
onResponse(op);
}
}
@Override
public void handleEndTxnOnSubscriptionResponse(long requestId,
CommandEndTxnOnSubscriptionResponse response) {
OpRequestSend op = outstandingRequests.remove(requestId);
if (op == null) {
if (log.isDebugEnabled()) {
log.debug("Got end txn on subscription response for timeout {} - {}", response.getTxnidMostBits(),
response.getTxnidLeastBits());
}
return;
}
try {
if (!response.hasError()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Got end txn on subscription response for for request {}",
op.topic, response.getRequestId());
}
op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
} else {
log.error("[{}] Got end txn on subscription response for request {} error {}",
op.topic, response.getRequestId(), response.getError());
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
}
} finally {
onResponse(op);
}
}
public void onResponse(OpRequestSend op) {
REQUEST_CREDITS_UPDATER.incrementAndGet(this);
if (op != null) {
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
}
checkPendingRequests();
}
private void checkPendingRequests() {
while (true) {
int permits = REQUEST_CREDITS_UPDATER.get(this);
if (permits > 0 && pendingRequests.peek() != null) {
if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) {
OpRequestSend polled = pendingRequests.poll();
if (polled != null) {
CompletableFuture<ClientCnx> clientCnx = getClientCnx(polled.topic);
if (polled.cnx != clientCnx) {
OpRequestSend invalid = polled;
polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
clientCnx);
invalid.recycle();
}
endTxn(polled);
} else {
REQUEST_CREDITS_UPDATER.incrementAndGet(this);
}
}
} else {
break;
}
}
}
public static final class OpRequestSend {
long requestId;
String topic;
ByteBuf cmd;
CompletableFuture<TxnID> cb;
long createdAt;
CompletableFuture<ClientCnx> cnx;
static OpRequestSend create(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb,
CompletableFuture<ClientCnx> cnx) {
OpRequestSend op = RECYCLER.get();
op.requestId = requestId;
op.topic = topic;
op.cmd = cmd;
op.cb = cb;
op.createdAt = System.currentTimeMillis();
op.cnx = cnx;
return op;
}
void recycle() {
recyclerHandle.recycle(this);
}
private OpRequestSend(Recycler.Handle<OpRequestSend> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private final Recycler.Handle<OpRequestSend> recyclerHandle;
private static final Recycler<OpRequestSend> RECYCLER = new Recycler<OpRequestSend>() {
@Override
protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
return new OpRequestSend(handle);
}
};
}
public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
return pulsarClient.getConnection(topic);
}
public CompletableFuture<ClientCnx> getClientCnx(String topic) {
NamespaceService namespaceService = pulsarService.getNamespaceService();
CompletableFuture<NamespaceBundle> nsBundle = namespaceService.getBundleAsync(TopicName.get(topic));
return nsBundle
.thenCompose(bundle -> namespaceService.getOwnerAsync(bundle))
.thenCompose(data -> {
if (data.isPresent()) {
NamespaceEphemeralData ephemeralData = data.get();
try {
if (!ephemeralData.isDisabled()) {
URI uri;
if (pulsarClient.getConfiguration().isUseTls()) {
uri = new URI(ephemeralData.getNativeUrlTls());
} else {
uri = new URI(ephemeralData.getNativeUrl());
}
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return pulsarClient.getConnection(brokerAddress, brokerAddress);
} else {
// Bundle is unloading, lookup topic
return getClientCnxWithLookup(topic);
}
} catch (URISyntaxException e) {
// Should never go here
return getClientCnxWithLookup(topic);
}
} else {
// Bundle is not loaded yet, lookup topic
return getClientCnxWithLookup(topic);
}
});
}
@Override
public void close() {
this.timer.stop();
}
@Override
public int getAvailableRequestCredits() {
return REQUEST_CREDITS_UPDATER.get(this);
}
@Override
public int getPendingRequestsCount() {
return pendingRequests.size();
}
}