| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.transaction.buffer.impl; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.util.HashedWheelTimer; |
| import io.netty.util.Recycler; |
| import io.netty.util.ReferenceCountUtil; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import java.util.concurrent.atomic.AtomicLong; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; |
| import org.apache.pulsar.broker.namespace.NamespaceService; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.transaction.TransactionBufferClientException; |
| import org.apache.pulsar.client.api.transaction.TxnID; |
| import org.apache.pulsar.client.impl.ClientCnx; |
| import org.apache.pulsar.client.impl.PulsarClientImpl; |
| import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler; |
| import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; |
| import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; |
| import org.apache.pulsar.common.api.proto.TxnAction; |
| import org.apache.pulsar.common.naming.NamespaceBundle; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; |
| |
| @Slf4j |
| public class TransactionBufferHandlerImpl implements TransactionBufferHandler { |
| |
| private final ConcurrentSkipListMap<Long, OpRequestSend> outstandingRequests; |
| private final GrowableArrayBlockingQueue<OpRequestSend> pendingRequests; |
| private final AtomicLong requestIdGenerator = new AtomicLong(); |
| private final long operationTimeoutInMills; |
| private final HashedWheelTimer timer; |
| private final PulsarService pulsarService; |
| private final PulsarClientImpl pulsarClient; |
| |
| private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER = |
| AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits"); |
| private volatile int requestCredits; |
| |
| public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTimer timer, |
| int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException { |
| this.pulsarService = pulsarService; |
| this.pulsarClient = (PulsarClientImpl) pulsarService.getClient(); |
| this.outstandingRequests = new ConcurrentSkipListMap<>(); |
| this.pendingRequests = new GrowableArrayBlockingQueue<>(); |
| this.operationTimeoutInMills = operationTimeoutInMills; |
| this.timer = timer; |
| this.requestCredits = Math.max(100, maxConcurrentRequests); |
| } |
| |
| @Override |
| public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits, |
| TxnAction action, long lowWaterMark) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] endTxnOnTopic txnId: [{}], txnAction: [{}]", |
| topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue()); |
| } |
| CompletableFuture<TxnID> cb = new CompletableFuture<>(); |
| long requestId = requestIdGenerator.getAndIncrement(); |
| ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits, |
| topic, action, lowWaterMark); |
| |
| OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic)); |
| if (checkRequestCredits(op)) { |
| endTxn(op); |
| } |
| return cb; |
| } |
| |
| @Override |
| public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription, |
| long txnIdMostBits, long txnIdLeastBits, |
| TxnAction action, long lowWaterMark) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] endTxnOnSubscription txnId: [{}], txnAction: [{}]", |
| topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue()); |
| } |
| CompletableFuture<TxnID> cb = new CompletableFuture<>(); |
| long requestId = requestIdGenerator.getAndIncrement(); |
| ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits, |
| topic, subscription, action, lowWaterMark); |
| OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic)); |
| if (checkRequestCredits(op)) { |
| endTxn(op); |
| } |
| return cb; |
| } |
| |
| private boolean checkRequestCredits(OpRequestSend op) { |
| int currentPermits = REQUEST_CREDITS_UPDATER.get(this); |
| if (currentPermits > 0 && pendingRequests.peek() == null) { |
| if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits, currentPermits - 1)) { |
| return true; |
| } else { |
| return checkRequestCredits(op); |
| } |
| } else { |
| pendingRequests.add(op); |
| return false; |
| } |
| } |
| |
| public void endTxn(OpRequestSend op) { |
| op.cnx.whenComplete((clientCnx, ex) -> { |
| if (ex == null) { |
| if (clientCnx.ctx().channel().isActive()) { |
| clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this); |
| outstandingRequests.put(op.requestId, op); |
| timer.newTimeout(timeout -> { |
| OpRequestSend peek = outstandingRequests.remove(op.requestId); |
| if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) { |
| peek.cb.completeExceptionally(new TransactionBufferClientException |
| .RequestTimeoutException()); |
| onResponse(peek); |
| } |
| }, operationTimeoutInMills, TimeUnit.MILLISECONDS); |
| op.cmd.retain(); |
| clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise()); |
| } else { |
| op.cb.completeExceptionally( |
| new PulsarClientException.LookupException(op.topic + " endTxn channel is not active")); |
| onResponse(op); |
| } |
| } else { |
| Throwable cause = FutureUtil.unwrapCompletionException(ex); |
| log.error("endTxn error topic: [{}]", op.topic, cause); |
| if (cause instanceof PulsarClientException.BrokerMetadataException) { |
| op.cb.complete(null); |
| } else { |
| op.cb.completeExceptionally( |
| new PulsarClientException.LookupException(cause.getMessage())); |
| } |
| onResponse(op); |
| } |
| }); |
| } |
| |
| @Override |
| public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) { |
| OpRequestSend op = outstandingRequests.remove(requestId); |
| if (op == null) { |
| if (log.isDebugEnabled()) { |
| log.debug("Got end txn on topic response for timeout {} - {}", response.getTxnidMostBits(), |
| response.getTxnidLeastBits()); |
| } |
| return; |
| } |
| try { |
| if (!response.hasError()) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Got end txn on topic response for for request {}", op.topic, |
| response.getRequestId()); |
| } |
| op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits())); |
| } else { |
| log.error("[{}] Got end txn on topic response for request {} error {}", op.topic, |
| response.getRequestId(), |
| response.getError()); |
| op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), |
| response.getMessage())); |
| } |
| } finally { |
| onResponse(op); |
| } |
| } |
| |
| @Override |
| public void handleEndTxnOnSubscriptionResponse(long requestId, |
| CommandEndTxnOnSubscriptionResponse response) { |
| OpRequestSend op = outstandingRequests.remove(requestId); |
| if (op == null) { |
| if (log.isDebugEnabled()) { |
| log.debug("Got end txn on subscription response for timeout {} - {}", response.getTxnidMostBits(), |
| response.getTxnidLeastBits()); |
| } |
| return; |
| } |
| |
| try { |
| if (!response.hasError()) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Got end txn on subscription response for for request {}", |
| op.topic, response.getRequestId()); |
| } |
| op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits())); |
| } else { |
| log.error("[{}] Got end txn on subscription response for request {} error {}", |
| op.topic, response.getRequestId(), response.getError()); |
| op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), |
| response.getMessage())); |
| } |
| } finally { |
| onResponse(op); |
| } |
| } |
| |
| public void onResponse(OpRequestSend op) { |
| REQUEST_CREDITS_UPDATER.incrementAndGet(this); |
| if (op != null) { |
| ReferenceCountUtil.safeRelease(op.cmd); |
| op.recycle(); |
| } |
| checkPendingRequests(); |
| } |
| |
| private void checkPendingRequests() { |
| while (true) { |
| int permits = REQUEST_CREDITS_UPDATER.get(this); |
| if (permits > 0 && pendingRequests.peek() != null) { |
| if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) { |
| OpRequestSend polled = pendingRequests.poll(); |
| if (polled != null) { |
| CompletableFuture<ClientCnx> clientCnx = getClientCnx(polled.topic); |
| if (polled.cnx != clientCnx) { |
| OpRequestSend invalid = polled; |
| polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb, |
| clientCnx); |
| invalid.recycle(); |
| } |
| endTxn(polled); |
| } else { |
| REQUEST_CREDITS_UPDATER.incrementAndGet(this); |
| } |
| } |
| } else { |
| break; |
| } |
| } |
| } |
| |
| public static final class OpRequestSend { |
| |
| long requestId; |
| String topic; |
| ByteBuf cmd; |
| CompletableFuture<TxnID> cb; |
| long createdAt; |
| CompletableFuture<ClientCnx> cnx; |
| |
| static OpRequestSend create(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb, |
| CompletableFuture<ClientCnx> cnx) { |
| OpRequestSend op = RECYCLER.get(); |
| op.requestId = requestId; |
| op.topic = topic; |
| op.cmd = cmd; |
| op.cb = cb; |
| op.createdAt = System.currentTimeMillis(); |
| op.cnx = cnx; |
| return op; |
| } |
| |
| void recycle() { |
| recyclerHandle.recycle(this); |
| } |
| |
| private OpRequestSend(Recycler.Handle<OpRequestSend> recyclerHandle) { |
| this.recyclerHandle = recyclerHandle; |
| } |
| |
| private final Recycler.Handle<OpRequestSend> recyclerHandle; |
| |
| private static final Recycler<OpRequestSend> RECYCLER = new Recycler<OpRequestSend>() { |
| @Override |
| protected OpRequestSend newObject(Handle<OpRequestSend> handle) { |
| return new OpRequestSend(handle); |
| } |
| }; |
| } |
| |
| public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) { |
| return pulsarClient.getConnection(topic); |
| } |
| |
| public CompletableFuture<ClientCnx> getClientCnx(String topic) { |
| NamespaceService namespaceService = pulsarService.getNamespaceService(); |
| CompletableFuture<NamespaceBundle> nsBundle = namespaceService.getBundleAsync(TopicName.get(topic)); |
| return nsBundle |
| .thenCompose(bundle -> namespaceService.getOwnerAsync(bundle)) |
| .thenCompose(data -> { |
| if (data.isPresent()) { |
| NamespaceEphemeralData ephemeralData = data.get(); |
| try { |
| if (!ephemeralData.isDisabled()) { |
| URI uri; |
| if (pulsarClient.getConfiguration().isUseTls()) { |
| uri = new URI(ephemeralData.getNativeUrlTls()); |
| } else { |
| uri = new URI(ephemeralData.getNativeUrl()); |
| } |
| InetSocketAddress brokerAddress = |
| InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()); |
| return pulsarClient.getConnection(brokerAddress, brokerAddress); |
| } else { |
| // Bundle is unloading, lookup topic |
| return getClientCnxWithLookup(topic); |
| } |
| } catch (URISyntaxException e) { |
| // Should never go here |
| return getClientCnxWithLookup(topic); |
| } |
| } else { |
| // Bundle is not loaded yet, lookup topic |
| return getClientCnxWithLookup(topic); |
| } |
| }); |
| } |
| |
| @Override |
| public void close() { |
| this.timer.stop(); |
| } |
| |
| @Override |
| public int getAvailableRequestCredits() { |
| return REQUEST_CREDITS_UPDATER.get(this); |
| } |
| |
| @Override |
| public int getPendingRequestsCount() { |
| return pendingRequests.size(); |
| } |
| } |