blob: 4445f64a715f9cc3776fa33d893a9914f6c88b52 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.prometheus.client.Gauge;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.CommandEndTxn;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.api.proto.CommandGetLastMessageId;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandNewTxn;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private State state;
private volatile boolean isActive = true;
String authRole = null;
private volatile AuthenticationDataSource authenticationData;
AuthenticationProvider authenticationProvider;
AuthenticationState authState;
// In case of proxy, if the authentication credentials are forwardable,
// it will hold the credentials of the original client
AuthenticationState originalAuthState;
AuthenticationDataSource originalAuthData;
private boolean pendingAuthChallengeResponse = false;
// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write spikes on the broker.
private final int maxPendingSendRequests;
private final int resumeReadsThreshold;
private int pendingSendRequest = 0;
private final String replicatorPrefix;
private String clientVersion = null;
private int nonPersistentPendingMessages = 0;
private final int maxNonPersistentPendingMessages;
private String originalPrincipal = null;
private Set<String> proxyRoles;
private boolean authenticateOriginalAuthData;
private final boolean schemaValidationEnforced;
private String authMethod = "none";
private final int maxMessageSize;
private boolean preciseDispatcherFlowControl;
private boolean preciseTopicPublishRateLimitingEnable;
private boolean encryptionRequireOnProducer;
// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
private FeatureFlags features;
private PulsarCommandSender commandSender;
private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta()
.setKeySharedMode(KeySharedMode.AUTO_SPLIT);
// Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
private boolean autoReadDisabledPublishBufferLimiting = false;
private final long maxPendingBytesPerThread;
private final long resumeThresholdPendingBytesPerThread;
// Number of bytes pending to be published from a single specific IO thread.
private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>() {
@Override
protected MutableLong initialValue() throws Exception {
return new MutableLong();
}
};
// A set of connections tied to the current thread
private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>() {
@Override
protected Set<ServerCnx> initialValue() throws Exception {
return Collections.newSetFromMap(new IdentityHashMap<>());
}
};
enum State {
Start, Connected, Failed, Connecting
}
public ServerCnx(PulsarService pulsar) {
super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
this.service = pulsar.getBrokerService();
this.schemaService = pulsar.getSchemaRegistryService();
this.state = State.Start;
ServiceConfiguration conf = pulsar.getConfiguration();
// This maps are not heavily contended since most accesses are within the cnx thread
this.producers = new ConcurrentLongHashMap<>(8, 1);
this.consumers = new ConcurrentLongHashMap<>(8, 1);
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
this.authenticateOriginalAuthData = conf.isAuthenticateOriginalAuthData();
this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
this.maxMessageSize = conf.getMaxMessageSize();
this.maxPendingSendRequests = conf.getMaxPendingPublishRequestsPerConnection();
this.resumeReadsThreshold = maxPendingSendRequests / 2;
this.preciseDispatcherFlowControl = conf.isPreciseDispatcherFlowControl();
this.preciseTopicPublishRateLimitingEnable = conf.isPreciseTopicPublishRateLimiterEnable();
this.encryptionRequireOnProducer = conf.isEncryptionRequireOnProducer();
// Assign a portion of max-pending bytes to each IO thread
this.maxPendingBytesPerThread = conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
/ conf.getNumIOThreads();
this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
log.info("New connection from {}", remoteAddress);
this.ctx = ctx;
this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
this.service.getPulsarStats().recordConnectionCreate();
cnxsPerThread.get().add(this);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
isActive = false;
log.info("Closed connection from {}", remoteAddress);
BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionClosed(this);
}
cnxsPerThread.get().remove(this);
// Connection is gone, close the producers immediately
producers.forEach((__, producerFuture) -> {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
}
});
consumers.forEach((__, consumerFuture) -> {
Consumer consumer;
if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumer = consumerFuture.getNow(null);
} else {
return;
}
try {
consumer.close();
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", consumer, e);
}
});
this.service.getPulsarStats().recordConnectionClose();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Channel writability has changed to: {}", ctx.channel().isWritable());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (state != State.Failed) {
// No need to report stack trace for known exceptions that happen in disconnections
log.warn("[{}] Got exception {}", remoteAddress,
ClientCnx.isKnownException(cause) ? cause : ExceptionUtils.getStackTrace(cause));
state = State.Failed;
if (log.isDebugEnabled()) {
log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Failed.name());
}
} else {
// At default info level, suppress all subsequent exceptions that are thrown when the connection has already
// failed
if (log.isDebugEnabled()) {
log.debug("[{}] Got exception: {}", remoteAddress, cause);
}
}
ctx.close();
}
/*
* If authentication and authorization is enabled(and not sasl) and
* if the authRole is one of proxyRoles we want to enforce
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal
*/
private boolean invalidOriginalPrincipal(String originalPrincipal) {
return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()
&& proxyRoles.contains(authRole) && (StringUtils.isBlank(originalPrincipal)
|| proxyRoles.contains(originalPrincipal)));
}
// ////
// // Incoming commands handling
// ////
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation) {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
if (service.isAuthorizationEnabled()) {
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, originalPrincipal, getAuthenticationData());
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
}
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}",
originalPrincipal, operation, topicName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on topic {}",
authRole, operation, topicName);
}
return isProxyAuthorized && isAuthorized;
});
}
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName,
TopicOperation operation) {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
if (service.isAuthorizationEnabled()) {
if (authenticationData == null) {
authenticationData = new AuthenticationDataCommand("", subscriptionName);
} else {
authenticationData.setSubscription(subscriptionName);
}
if (originalAuthData != null) {
originalAuthData.setSubscription(subscriptionName);
}
return isTopicOperationAllowed(topicName, operation);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
}
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}, subscription {}",
originalPrincipal, operation, topicName, subscriptionName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on topic {}, subscription {}",
authRole, operation, topicName, subscriptionName);
}
return isProxyAuthorized && isAuthorized;
});
}
@Override
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();
final boolean authoritative = lookup.isAuthoritative();
final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName()
: null;
if (log.isDebugEnabled()) {
log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
}
TopicName topicName = validateTopicName(lookup.getTopic(), requestId, lookup);
if (topicName == null) {
return;
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for lookup ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
getPrincipal(), getAuthenticationData(),
requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
// it should never happen
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topicName,
ex.getMessage(), ex);
ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
lookupSemaphore.release();
return null;
});
} else {
final String msg = "Proxy Client is not authorized to Lookup";
log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "lookup", getPrincipal(), Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to authorize lookup";
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topicName);
}
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
final long requestId = partitionMetadata.getRequestId();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(),
remoteAddress, requestId);
}
TopicName topicName = validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata);
if (topicName == null) {
return;
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId);
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
if (isAuthorized) {
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
.handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
commandSender.sendPartitionMetadataResponse(partitions, requestId);
} else {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
remoteAddress, topicName, ex.getMessage());
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId);
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
topicName, ex.getMessage(), ex);
ServerError error = (ex instanceof RestException)
&& ((RestException) ex).getResponse().getStatus() < 500
? ServerError.MetadataError
: ServerError.ServiceNotReady;
commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId);
}
}
lookupSemaphore.release();
return null;
});
} else {
final String msg = "Proxy Client is not authorized to Get Partition Metadata";
log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
ctx.writeAndFlush(
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to authorize get Partition Metadata";
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
requestId));
lookupSemaphore.release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", remoteAddress,
topicName);
}
commandSender.sendPartitionMetadataResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId);
}
}
@Override
protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
if (log.isDebugEnabled()) {
log.debug("Received CommandConsumerStats call from {}", remoteAddress);
}
final long requestId = commandConsumerStats.getRequestId();
final long consumerId = commandConsumerStats.getConsumerId();
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
Consumer consumer = consumerFuture.getNow(null);
ByteBuf msg = null;
if (consumer == null) {
log.error(
"Failed to get consumer-stats response - Consumer not found for"
+ " CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]",
remoteAddress, requestId, consumerId);
msg = Commands.newConsumerStatsResponse(ServerError.ConsumerNotFound,
"Consumer " + consumerId + " not found", requestId);
} else {
if (log.isDebugEnabled()) {
log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", requestId, consumer);
}
msg = createConsumerStatsResponse(consumer, requestId);
}
ctx.writeAndFlush(msg);
}
ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
ConsumerStatsImpl consumerStats = consumer.getStats();
Subscription subscription = consumer.getSubscription();
BaseCommand cmd = Commands.newConsumerStatsResponseCommand(ServerError.UnknownError, null, requestId);
cmd.getConsumerStatsResponse()
.clearErrorCode()
.setRequestId(requestId)
.setMsgRateOut(consumerStats.msgRateOut)
.setMsgThroughputOut(consumerStats.msgThroughputOut)
.setMsgRateRedeliver(consumerStats.msgRateRedeliver)
.setConsumerName(consumerStats.consumerName)
.setAvailablePermits(consumerStats.availablePermits)
.setUnackedMessages(consumerStats.unackedMessages)
.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs)
.setAddress(consumerStats.getAddress())
.setConnectedSince(consumerStats.getConnectedSince())
.setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false))
.setMsgRateExpired(subscription.getExpiredMessageRate())
.setType(subscription.getTypeString());
return Commands.serializeWithSize(cmd);
}
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion) {
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
if (log.isDebugEnabled()) {
log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connected.name());
}
setRemoteEndpointProtocolVersion(clientProtoVersion);
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
}
// According to auth result, send newConnected or newAuthChallenge command.
private State doAuthentication(AuthData clientData,
int clientProtocolVersion,
String clientVersion) throws Exception {
// The original auth state can only be set on subsequent auth attempts (and only
// in presence of a proxy and if the proxy is forwarding the credentials).
// In this case, the re-validation needs to be done against the original client
// credentials.
boolean useOriginalAuthState = (originalAuthState != null);
AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
AuthData brokerData = authState.authenticate(clientData);
if (log.isDebugEnabled()) {
log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole);
}
if (authState.isComplete()) {
// Authentication has completed. It was either:
// 1. the 1st time the authentication process was done, in which case we'll
// a `CommandConnected` response
// 2. an authentication refresh, in which case we need to refresh authenticationData
String newAuthRole = authState.getAuthRole();
// Refresh the auth data.
this.authenticationData = authState.getAuthDataSource();
if (log.isDebugEnabled()) {
log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole);
}
if (!useOriginalAuthState) {
this.authRole = newAuthRole;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
}
if (state != State.Connected) {
// First time authentication is done
completeConnect(clientProtocolVersion, clientVersion);
} else {
// If the connection was already ready, it means we're doing a refresh
if (!StringUtils.isEmpty(authRole)) {
if (!authRole.equals(newAuthRole)) {
log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}",
remoteAddress, authRole, newAuthRole);
ctx.close();
} else {
log.info("[{}] Refreshed authentication credentials for role {}", remoteAddress, authRole);
}
}
}
return State.Connected;
}
// auth not complete, continue auth with client side.
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
}
return State.Connecting;
}
public void refreshAuthenticationCredentials() {
AuthenticationState authState = this.originalAuthState != null ? originalAuthState : this.authState;
if (authState == null) {
// Authentication is disabled or there's no local state to refresh
return;
} else if (getState() != State.Connected || !isActive) {
// Connection is either still being established or already closed.
return;
} else if (authState != null && !authState.isExpired()) {
// Credentials are still valid. Nothing to do at this point
return;
} else if (originalPrincipal != null && originalAuthState == null) {
log.info(
"[{}] Cannot revalidate user credential when using proxy and"
+ " not forwarding the credentials. Closing connection",
remoteAddress);
return;
}
ctx.executor().execute(SafeRun.safeRun(() -> {
log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}",
remoteAddress, originalPrincipal, this.authRole);
if (!supportsAuthenticationRefresh()) {
log.warn("[{}] Closing connection because client doesn't support auth credentials refresh",
remoteAddress);
ctx.close();
return;
}
if (pendingAuthChallengeResponse) {
log.warn("[{}] Closing connection after timeout on refreshing auth credentials",
remoteAddress);
ctx.close();
return;
}
try {
AuthData brokerData = authState.refreshAuthentication();
ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
getRemoteEndpointProtocolVersion()));
if (log.isDebugEnabled()) {
log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.",
remoteAddress, authMethod);
}
pendingAuthChallengeResponse = true;
} catch (AuthenticationException e) {
log.warn("[{}] Failed to refresh authentication: {}", remoteAddress, e);
ctx.close();
}
}));
}
private static final byte[] emptyArray = new byte[0];
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Start);
if (log.isDebugEnabled()) {
log.debug("Received CONNECT from {}, auth enabled: {}:"
+ " has original principal = {}, original principal = {}",
remoteAddress,
service.isAuthenticationEnabled(),
connect.hasOriginalPrincipal(),
connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null);
}
String clientVersion = connect.getClientVersion();
int clientProtocolVersion = connect.getProtocolVersion();
features = new FeatureFlags();
if (connect.hasFeatureFlags()) {
features.copyFrom(connect.getFeatureFlags());
}
if (!service.isAuthenticationEnabled()) {
completeConnect(clientProtocolVersion, clientVersion);
return;
}
try {
byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
AuthData clientData = AuthData.of(authData);
// init authentication
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
} else if (connect.hasAuthMethod()) {
// Legacy client is passing enum
authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
} else {
authMethod = "none";
}
authenticationProvider = getBrokerService()
.getAuthenticationService()
.getAuthenticationProvider(authMethod);
// Not find provider named authMethod. Most used for tests.
// In AuthenticationDisabled, it will set authMethod "none".
if (authenticationProvider == null) {
authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole()
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));
completeConnect(clientProtocolVersion, clientVersion);
return;
}
// init authState and other var
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
authenticationData = authState.getAuthDataSource();
if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate role : {}", remoteAddress,
authState != null ? authState.getAuthRole() : null);
}
state = doAuthentication(clientData, clientProtocolVersion, clientVersion);
// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
// 3. no credentials were passed
if (connect.hasOriginalPrincipal() && service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
// init authentication
String originalAuthMethod;
if (connect.hasOriginalAuthMethod()) {
originalAuthMethod = connect.getOriginalAuthMethod();
} else {
originalAuthMethod = "none";
}
AuthenticationProvider originalAuthenticationProvider = getBrokerService()
.getAuthenticationService()
.getAuthenticationProvider(originalAuthMethod);
if (originalAuthenticationProvider == null) {
throw new AuthenticationException(
String.format("Can't find AuthenticationProvider for original role"
+ " using auth method [%s] is not available", originalAuthMethod));
}
originalAuthState = originalAuthenticationProvider.newAuthState(
AuthData.of(connect.getOriginalAuthData().getBytes()),
remoteAddress,
sslSession);
originalAuthData = originalAuthState.getAuthDataSource();
originalPrincipal = originalAuthState.getAuthRole();
if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role : {}", remoteAddress, originalPrincipal);
}
} else {
originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role (forwarded from proxy): {}",
remoteAddress, originalPrincipal);
}
}
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
String msg = "Unable to authenticate";
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
close();
}
}
@Override
protected void handleAuthResponse(CommandAuthResponse authResponse) {
checkArgument(authResponse.hasResponse());
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
pendingAuthChallengeResponse = false;
if (log.isDebugEnabled()) {
log.debug("Received AuthResponse from {}, auth method: {}",
remoteAddress, authResponse.getResponse().getAuthMethodName());
}
try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData, authResponse.getProtocolVersion(), authResponse.getClientVersion());
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage());
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage()));
close();
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
String msg = "Unable to handleAuthResponse";
log.warn("[{}] {} ", remoteAddress, msg, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg));
close();
}
}
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
checkArgument(state == State.Connected);
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
if (topicName == null) {
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
remoteAddress, authRole, originalPrincipal);
}
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided while subscribing ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
return;
}
final String subscriptionName = subscribe.getSubscription();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.getConsumerName();
final boolean isDurable = subscribe.isDurable();
final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
: null;
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.isReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
final InitialPosition initialPosition = subscribe.getInitialPosition();
final long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec()
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
&& subscribe.isReplicateSubscriptionState();
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
: emptyKeySharedMeta;
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName,
subscriptionName,
TopicOperation.CONSUME
);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to subscribe with role {}",
remoteAddress, getPrincipal());
}
log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
try {
Metadata.validateMetadata(metadata);
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
return null;
}
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
consumerFuture);
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
return null;
}
}
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException("Topic does not exist"));
}
Topic topic = optTopic.get();
boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName);
if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(
ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec,
isReplicated, keySharedMeta));
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition,
startMessageRollbackDurationSec, isReplicated, keySharedMeta);
}
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
} else {
// The consumer future was completed before by a close command
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn(
"[{}] Error closing consumer created"
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
}
})
.exceptionally(exception -> {
if (exception.getCause() instanceof ConsumerBusyException) {
if (log.isDebugEnabled()) {
log.debug(
"[{}][{}][{}] Failed to create consumer because exclusive consumer"
+ " is already connected: {}",
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage(), exception);
}
// If client timed out, the future would have been completed by subsequent close.
// Send error
// back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);
return null;
});
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
}
private SchemaData getSchema(Schema protocolSchema) {
return SchemaData.builder()
.data(protocolSchema.getSchemaData())
.isDeleted(false)
.timestamp(System.currentTimeMillis())
.user(Strings.nullToEmpty(originalPrincipal))
.type(Commands.getSchemaType(protocolSchema.getType()))
.props(protocolSchema.getPropertiesList().stream().collect(
Collectors.toMap(
KeyValue::getKey,
KeyValue::getValue
)
)).build();
}
@Override
protected void handleProducer(final CommandProducer cmdProducer) {
checkArgument(state == State.Connected);
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final long epoch = cmdProducer.getEpoch();
final boolean userProvidedProducerName = cmdProducer.isUserProvidedProducerName();
final boolean isEncrypted = cmdProducer.isEncrypted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;
final ProducerAccessMode producerAccessMode = cmdProducer.getProducerAccessMode();
final Optional<Long> topicEpoch = cmdProducer.hasTopicEpoch()
? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
if (topicName == null) {
return;
}
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided while creating producer ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
return;
}
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE
);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to Produce with role {}",
remoteAddress, getPrincipal());
}
CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId,
producerFuture);
if (existingProducerFuture != null) {
if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
Producer producer = existingProducerFuture.getNow(null);
log.info("[{}] Producer with the same id is already created:"
+ " producerId={}, producer={}", remoteAddress, producerId, producer);
commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
producer.getSchemaVersion());
return null;
} else {
// There was an early request to create a producer with
// same producerId. This can happen when
// client
// timeout is lower the broker timeouts. We need to wait
// until the previous producer creation
// request
// either complete or fails.
ServerError error = null;
if (!existingProducerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingProducerFuture);
// remove producer with producerId as it's already completed with exception
producers.remove(producerId, existingProducerFuture);
}
log.warn("[{}][{}] Producer with id is already present on the connection,"
+ " producerId={}", remoteAddress, topicName, producerId);
commandSender.sendErrorResponse(requestId, error,
"Producer is already present on the connection");
return null;
}
}
log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded
// on topic
if (topic.isBacklogQuotaExceeded(producerName)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
}
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
}
// Check whether the producer will publish encrypted messages or not
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
producers.remove(producerId, producerFuture);
return;
}
disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
schemaVersionFuture.exceptionally(exception -> {
String message = exception.getMessage();
if (exception.getCause() != null) {
message += (" caused by " + exception.getCause());
}
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
message);
producers.remove(producerId, producerFuture);
return null;
});
schemaVersionFuture.thenAccept(schemaVersion -> {
CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>();
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName,
getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, producerAccessMode, topicEpoch);
topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
return;
} else {
// The producer's future was completed before by
// a close command
producer.closeNow(true);
log.info("[{}] Cleared producer created after"
+ " timeout on client side {}",
remoteAddress, producer);
}
} else {
producer.closeNow(true);
log.info("[{}] Cleared producer created after connection was closed: {}",
remoteAddress, producer);
producerFuture.completeExceptionally(
new IllegalStateException(
"Producer created after connection was closed"));
}
producers.remove(producerId, producerFuture);
}).exceptionally(ex -> {
log.error("[{}] Failed to add producer to topic {}: producerId={}, {}",
remoteAddress, topicName, producerId, ex.getMessage());
producer.closeNow(true);
if (producerFuture.completeExceptionally(ex)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
}
return null;
});
producerQueuedFuture.thenRun(() -> {
// If the producer is queued waiting, we will get an immediate notification
// that we need to pass to client
if (isActive()) {
log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
}
});
});
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
if (cause instanceof NoSuchElementException) {
cause = new TopicNotFoundException("Topic Not Found.");
}
if (!Exceptions.areExceptionsPresentInChain(cause,
ServiceUnitNotReadyException.class, ManagedLedgerException.class)) {
// Do not print stack traces for expected exceptions
log.error("[{}] Failed to create topic {}, producerId={}",
remoteAddress, topicName, producerId, exception);
}
// If client timed out, the future would have been completed
// by subsequent close. Send error back to
// client, only if not completed already.
if (producerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
}
producers.remove(producerId, producerFuture);
return null;
});
} else {
String msg = "Client is not authorized to Produce";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "producer", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
}
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
checkArgument(state == State.Connected);
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());
if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId());
return;
}
Producer producer = producerFuture.getNow(null);
if (log.isDebugEnabled()) {
printSendCommandDebug(send, headersAndPayload);
}
if (producer.isNonPersistentTopic()) {
// avoid processing non-persist message if reached max concurrent-message limit
if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
final long highestSequenceId = send.getHighestSequenceId();
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {
commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);
}));
producer.recordMessageDrop(send.getNumMessages());
return;
} else {
nonPersistentPendingMessages++;
}
}
startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(),
send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk());
return;
}
// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages(), send.isIsChunk());
} else {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk());
}
}
private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
if (log.isDebugEnabled()) {
log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {},"
+ " partition key is: {}, ordering key is {}",
remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(),
msgMetadata.getSequenceId(), headersAndPayload.readableBytes(),
msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null,
msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null);
}
}
@Override
protected void handleAck(CommandAck ack) {
checkArgument(state == State.Connected);
CompletableFuture<Consumer> consumerFuture = consumers.get(ack.getConsumerId());
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
}).exceptionally(e -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(requestId,
BrokerServiceException.getClientErrorCode(e),
e.getMessage(), consumerId));
}
return null;
});
}
}
@Override
protected void handleFlow(CommandFlow flow) {
checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("[{}] Received flow from consumer {} permits: {}", remoteAddress, flow.getConsumerId(),
flow.getMessagePermits());
}
CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (consumer != null) {
consumer.flowPermits(flow.getMessagePermits());
} else {
log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());
}
}
}
@Override
protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) {
checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("[{}] Received Resend Command from consumer {} ", remoteAddress, redeliver.getConsumerId());
}
CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
consumer.redeliverUnacknowledgedMessages();
}
}
}
@Override
protected void handleUnsubscribe(CommandUnsubscribe unsubscribe) {
checkArgument(state == State.Connected);
CompletableFuture<Consumer> consumerFuture = consumers.get(unsubscribe.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId());
} else {
commandSender.sendErrorResponse(unsubscribe.getRequestId(), ServerError.MetadataError,
"Consumer not found");
}
}
@Override
protected void handleSeek(CommandSeek seek) {
checkArgument(state == State.Connected);
final long requestId = seek.getRequestId();
CompletableFuture<Consumer> consumerFuture = consumers.get(seek.getConsumerId());
if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
commandSender.sendErrorResponse(requestId, ServerError.MetadataError,
"Message id and message publish time were not present");
return;
}
boolean consumerCreated = consumerFuture != null
&& consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally();
if (consumerCreated && seek.hasMessageId()) {
Consumer consumer = consumerFuture.getNow(null);
Subscription subscription = consumer.getSubscription();
MessageIdData msgIdData = seek.getMessageId();
long[] ackSet = null;
if (msgIdData.getAckSetsCount() > 0) {
ackSet = new long[msgIdData.getAckSetsCount()];
for (int i = 0; i < ackSet.length; i++) {
ackSet[i] = msgIdData.getAckSetAt(i);
}
}
Position position = new PositionImpl(msgIdData.getLedgerId(),
msgIdData.getEntryId(), ackSet);
subscription.resetCursor(position).thenRun(() -> {
log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress,
subscription.getTopic().getName(), subscription.getName(), position);
commandSender.sendSuccessResponse(requestId);
}).exceptionally(ex -> {
log.warn("[{}][{}] Failed to reset subscription: {}",
remoteAddress, subscription, ex.getMessage(), ex);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Error when resetting subscription: " + ex.getCause().getMessage());
return null;
});
} else if (consumerCreated && seek.hasMessagePublishTime()){
Consumer consumer = consumerFuture.getNow(null);
Subscription subscription = consumer.getSubscription();
long timestamp = seek.getMessagePublishTime();
subscription.resetCursor(timestamp).thenRun(() -> {
log.info("[{}] [{}][{}] Reset subscription to publish time {}", remoteAddress,
subscription.getTopic().getName(), subscription.getName(), timestamp);
commandSender.sendSuccessResponse(requestId);
}).exceptionally(ex -> {
log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress,
subscription, ex.getMessage(), ex);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Reset subscription to publish time error: " + ex.getCause().getMessage());
return null;
});
} else {
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
}
}
@Override
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
checkArgument(state == State.Connected);
final long producerId = closeProducer.getProducerId();
final long requestId = closeProducer.getRequestId();
CompletableFuture<Producer> producerFuture = producers.get(producerId);
if (producerFuture == null) {
log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId);
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Producer was not registered on the connection");
return;
}
if (!producerFuture.isDone() && producerFuture
.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
// We have received a request to close the producer before it was actually completed, we have marked the
// producer future as failed and we can tell the client the close operation was successful.
log.info("[{}] Closed producer before its creation was completed. producerId={}",
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
return;
} else if (producerFuture.isCompletedExceptionally()) {
log.info("[{}] Closed producer that already failed to be created. producerId={}",
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
return;
}
// Proceed with normal close, the producer
Producer producer = producerFuture.getNow(null);
log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
producer.getTopic(), producer.getProducerName(), remoteAddress, producerId);
producer.close(true).thenAccept(v -> {
log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
producer.getTopic(), producer.getProducerName(),
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
});
}
@Override
protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
checkArgument(state == State.Connected);
log.info("[{}] Closing consumer: consumerId={}", remoteAddress, closeConsumer.getConsumerId());
long requestId = closeConsumer.getRequestId();
long consumerId = closeConsumer.getConsumerId();
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
return;
}
if (!consumerFuture.isDone() && consumerFuture
.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
// We have received a request to close the consumer before it was actually completed, we have marked the
// consumer future as failed and we can tell the client the close operation was successful. When the actual
// create operation will complete, the new consumer will be discarded.
log.info("[{}] Closed consumer before its creation was completed. consumerId={}",
remoteAddress, consumerId);
commandSender.sendSuccessResponse(requestId);
return;
}
if (consumerFuture.isCompletedExceptionally()) {
log.info("[{}] Closed consumer that already failed to be created. consumerId={}",
remoteAddress, consumerId);
commandSender.sendSuccessResponse(requestId);
return;
}
// Proceed with normal consumer close
Consumer consumer = consumerFuture.getNow(null);
try {
consumer.close();
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
}
}
@Override
protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) {
checkArgument(state == State.Connected);
CompletableFuture<Consumer> consumerFuture = consumers.get(getLastMessageId.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
long requestId = getLastMessageId.getRequestId();
Topic topic = consumer.getSubscription().getTopic();
Position lastPosition = topic.getLastPosition();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
Position markDeletePosition = null;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
}
getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) lastPosition,
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
consumer.getSubscription().getName());
} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
}
}
private void getLargestBatchIndexWhenPossible(
Topic topic,
PositionImpl lastPosition,
PositionImpl markDeletePosition,
int partitionIndex,
long requestId,
String subscriptionName) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
// If it's not pointing to a valid entry, respond messageId of the current position.
if (lastPosition.getEntryId() == -1) {
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
lastPosition.getLedgerId(), lastPosition.getEntryId(), partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
return;
}
// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}
}, null);
CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int batchSize = metadata.getNumMessagesInBatch();
entry.release();
return metadata.hasNumMessagesInBatch() ? batchSize : -1;
});
batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
ctx.writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to get batch size for entry " + e.getMessage()));
} else {
int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, lastPosition, partitionIndex);
}
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
}
});
}
@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
final String namespace = commandGetTopicsOfNamespace.getNamespace();
final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
final NamespaceName namespaceName = NamespaceName.get(namespace);
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
.thenAccept(topics -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
remoteAddress, namespace, requestId, topics.size());
}
commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
})
.exceptionally(ex -> {
log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
remoteAddress, namespace, requestId);
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
ex.getMessage());
return null;
});
}
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
if (log.isDebugEnabled()) {
if (commandGetSchema.hasSchemaVersion()) {
log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
remoteAddress, new String(commandGetSchema.getSchemaVersion()),
commandGetSchema.getTopic(), commandGetSchema.getRequestId());
} else {
log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
remoteAddress, null,
commandGetSchema.getTopic(), commandGetSchema.getRequestId());
}
}
long requestId = commandGetSchema.getRequestId();
SchemaVersion schemaVersion = SchemaVersion.Latest;
if (commandGetSchema.hasSchemaVersion()) {
schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
}
String schemaName;
try {
schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName();
} catch (Throwable t) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
return;
}
schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
if (schemaAndMetadata == null) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
"Topic not found or no-schema");
} else {
commandSender.sendGetSchemaResponse(requestId,
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
}
}).exceptionally(ex -> {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.UnknownError, ex.getMessage());
return null;
});
}
@Override
protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
if (log.isDebugEnabled()) {
log.debug("Received CommandGetOrCreateSchema call from {}", remoteAddress);
}
long requestId = commandGetOrCreateSchema.getRequestId();
String topicName = commandGetOrCreateSchema.getTopic();
SchemaData schemaData = getSchema(commandGetOrCreateSchema.getSchema());
SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
if (topicOpt.isPresent()) {
Topic topic = topicOpt.get();
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
schemaVersionFuture.exceptionally(ex -> {
ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
String message = ex.getMessage();
if (ex.getCause() != null) {
message += (" caused by " + ex.getCause());
}
commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, message);
return null;
}).thenAccept(schemaVersion -> {
commandSender.sendGetOrCreateSchemaResponse(requestId, schemaVersion);
});
} else {
commandSender.sendGetOrCreateSchemaErrorResponse(requestId, ServerError.TopicNotFound,
"Topic not found");
}
}).exceptionally(ex -> {
ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, ex.getMessage());
return null;
});
}
@Override
protected void handleNewTxn(CommandNewTxn command) {
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
if (log.isDebugEnabled()) {
log.debug("Receive new txn request {} to transaction meta store {} from {}.",
requestId, tcId, remoteAddress);
}
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
if (transactionMetadataStoreService == null) {
CoordinatorException.CoordinatorNotFoundException ex =
new CoordinatorException.CoordinatorNotFoundException(
"Transaction manager is not started or not enabled");
ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
return;
}
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds())
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response {} for new txn request {}", tcId.getId(), requestId);
}
ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for new txn request {}", requestId, ex);
}
ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
}
}));
}
@Override
protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long requestId = command.getRequestId();
if (log.isDebugEnabled()) {
command.getPartitionsList().forEach(partion ->
log.debug("Receive add published partition to txn request {} "
+ "from {} with txnId {}, topic: [{}]", requestId, remoteAddress, txnID, partion));
}
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
command.getPartitionsList())
.whenComplete(((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published partition to txn request {}", requestId);
}
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}", requestId,
ex);
}
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
}
}));
}
@Override
protected void handleEndTxn(CommandEndTxn command) {
final long requestId = command.getRequestId();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
service.pulsar().getTransactionMetadataStoreService()
.endTransaction(txnID, txnAction, false)
.thenRun(() -> ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits())))
.exceptionally(throwable -> {
log.error("Send response error for end txn request.", throwable);
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(throwable.getCause()), throwable.getMessage()));
return null; });
}
@Override
protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final long requestId = command.getRequestId();
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic,
txnID, txnAction);
}
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, t) -> {
if (!optionalTopic.isPresent()) {
log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
"Topic " + topic + " is not found."));
return;
}
optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark())
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle endTxnOnPartition {} failed.", topic, throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, BrokerServiceException.getClientErrorCode(throwable),
throwable.getMessage()));
return;
}
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
});
});
} else {
log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}
}
@Override
protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
final long requestId = command.getRequestId();
final long txnidMostBits = command.getTxnidMostBits();
final long txnidLeastBits = command.getTxnidLeastBits();
final String topic = command.getSubscription().getTopic();
final String subName = command.getSubscription().getSubscription();
final int txnAction = command.getTxnAction().getValue();
if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), txnAction);
}
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.thenAccept(optionalTopic -> {
if (!optionalTopic.isPresent()) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
return;
}
Subscription subscription = optionalTopic.get().getSubscription(subName);
if (subscription == null) {
log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Topic " + optionalTopic.get().getName()
+ " subscription " + subName + " is not exist."));
return;
}
CompletableFuture<Void> completableFuture =
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction,
command.getTxnidLeastBitsOfLowWatermark());
completableFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle end txn on subscription failed for request {}", requestId, throwable);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(throwable),
"Handle end txn on subscription failed."));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
}).exceptionally(e -> {
log.error("Handle end txn on subscription failed for request {}", requestId, e);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Handle end txn on subscription failed."));
return null;
});
} else {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}
}
private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
if (schema != null) {
return topic.addSchema(schema);
} else {
return topic.hasSchema().thenCompose((hasSchema) -> {
log.info("[{}] {} configured with schema {}",
remoteAddress, topic.getName(), hasSchema);
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
if (hasSchema && (schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
result.completeExceptionally(new IncompatibleSchemaException(
"Producers cannot connect or send message without a schema to topics with a schema"));
} else {
result.complete(SchemaVersion.Empty);
}
return result;
});
}
}
@Override
protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long requestId = command.getRequestId();
if (log.isDebugEnabled()) {
log.debug("Receive add published partition to txn request {} from {} with txnId {}",
requestId, remoteAddress, txnID);
}
service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
.whenComplete(((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published partition to txn request {}",
requestId);
}
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
log.info("handle add partition to txn finish.");
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}",
requestId, ex);
}
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
}
}));
}
@Override
protected boolean isHandshakeCompleted() {
return state == State.Connected;
}
public ChannelHandlerContext ctx() {
return ctx;
}
@Override
protected void interceptCommand(BaseCommand command) throws InterceptException {
if (getBrokerService().getInterceptor() != null) {
getBrokerService().getInterceptor().onPulsarCommand(command, this);
}
}
public void closeProducer(Producer producer) {
// removes producer-connection from map and send close command to producer
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
} else {
close();
}
}
@Override
public void closeConsumer(Consumer consumer) {
// removes consumer-connection from map and send close command to consumer
safelyRemoveConsumer(consumer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
} else {
close();
}
}
/**
* It closes the connection with client which triggers {@code channelInactive()} which clears all producers and
* consumers from connection-map.
*/
protected void close() {
ctx.close();
}
@Override
public SocketAddress clientAddress() {
return remoteAddress;
}
@Override
public void removedConsumer(Consumer consumer) {
safelyRemoveConsumer(consumer);
}
@Override
public void removedProducer(Producer producer) {
safelyRemoveProducer(producer);
}
private void safelyRemoveProducer(Producer producer) {
long producerId = producer.getProducerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Removed producer: producerId={}, producer={}", remoteAddress, producerId, producer);
}
CompletableFuture<Producer> future = producers.get(producerId);
if (future != null) {
future.whenComplete((producer2, exception) -> {
if (exception != null || producer2 == producer) {
producers.remove(producerId, future);
}
});
}
}
private void safelyRemoveConsumer(Consumer consumer) {
long consumerId = consumer.consumerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Removed consumer: consumerId={}, consumer={}", remoteAddress, consumerId, consumer);
}
CompletableFuture<Consumer> future = consumers.get(consumerId);
if (future != null) {
future.whenComplete((consumer2, exception) -> {
if (exception != null || consumer2 == consumer) {
consumers.remove(consumerId, future);
}
});
}
}
@Override
public boolean isActive() {
return isActive;
}
@Override
public boolean isWritable() {
return ctx.channel().isWritable();
}
private static final Gauge throttledConnections = Gauge.build()
.name("pulsar_broker_throttled_connections")
.help("Counter of connections throttled because of per-connection limit")
.register();
private static final Gauge throttledConnectionsGlobal = Gauge.build()
.name("pulsar_broker_throttled_connections_global_limit")
.help("Counter of connections throttled because of per-connection limit")
.register();
public void startSendOperation(Producer producer, int msgSize, int numMessages) {
boolean isPublishRateExceeded = false;
if (preciseTopicPublishRateLimitingEnable) {
boolean isPreciseTopicPublishRateExceeded =
producer.getTopic().isTopicPublishRateExceeded(numMessages, msgSize);
if (isPreciseTopicPublishRateExceeded) {
producer.getTopic().disableCnxAutoRead();
return;
}
isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded();
} else {
isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
}
if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
ctx.channel().config().setAutoRead(false);
autoReadDisabledRateLimiting = isPublishRateExceeded;
throttledConnections.inc();
}
if (pendingBytesPerThread.get().addAndGet(msgSize) >= maxPendingBytesPerThread
&& !autoReadDisabledPublishBufferLimiting
&& maxPendingBytesPerThread > 0) {
// Disable reading from all the connections associated with this thread
MutableInt pausedConnections = new MutableInt();
cnxsPerThread.get().forEach(cnx -> {
if (cnx.hasProducers() && !cnx.autoReadDisabledPublishBufferLimiting) {
cnx.disableCnxAutoRead();
cnx.autoReadDisabledPublishBufferLimiting = true;
pausedConnections.increment();
}
});
getBrokerService().pausedConnections(pausedConnections.intValue());
}
}
@Override
public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
if (pendingBytesPerThread.get().addAndGet(-msgSize) < resumeThresholdPendingBytesPerThread
&& autoReadDisabledPublishBufferLimiting) {
// Re-enable reading on all the blocked connections
MutableInt resumedConnections = new MutableInt();
cnxsPerThread.get().forEach(cnx -> {
if (cnx.autoReadDisabledPublishBufferLimiting) {
cnx.autoReadDisabledPublishBufferLimiting = false;
cnx.enableCnxAutoRead();
resumedConnections.increment();
}
});
getBrokerService().resumedConnections(resumedConnections.intValue());
}
if (--pendingSendRequest == resumeReadsThreshold) {
enableCnxAutoRead();
}
if (isNonPersistentTopic) {
nonPersistentPendingMessages--;
}
}
@Override
public void enableCnxAutoRead() {
// we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires
// pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if
// throttling is enable on the topic. so, avoid pendingSendRequest check will be fine.
if (ctx != null && !ctx.channel().config().isAutoRead()
&& !autoReadDisabledRateLimiting && !autoReadDisabledPublishBufferLimiting) {
// Resume reading from socket if pending-request is not reached to threshold
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
throttledConnections.dec();
}
}
@Override
public void disableCnxAutoRead() {
if (ctx != null && ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
}
}
@Override
public void cancelPublishRateLimiting() {
if (autoReadDisabledRateLimiting) {
autoReadDisabledRateLimiting = false;
}
}
@Override
public void cancelPublishBufferLimiting() {
if (autoReadDisabledPublishBufferLimiting) {
autoReadDisabledPublishBufferLimiting = false;
throttledConnectionsGlobal.dec();
}
}
private <T> ServerError getErrorCode(CompletableFuture<T> future) {
ServerError error = ServerError.UnknownError;
try {
future.getNow(null);
} catch (Exception e) {
if (e.getCause() instanceof BrokerServiceException) {
error = BrokerServiceException.getClientErrorCode(e.getCause());
}
}
return error;
}
private void disableTcpNoDelayIfNeeded(String topic, String producerName) {
if (producerName != null && producerName.startsWith(replicatorPrefix)) {
// Re-enable nagle algorithm on connections used for replication purposes
try {
if (ctx.channel().config().getOption(ChannelOption.TCP_NODELAY)) {
ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, false);
}
} catch (Throwable t) {
log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", topic, producerName,
ctx.channel());
}
}
}
private TopicName validateTopicName(String topic, long requestId, Object requestCommand) {
try {
return TopicName.get(topic);
} catch (Throwable t) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t);
}
if (requestCommand instanceof CommandLookupTopic) {
ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName,
"Invalid topic name: " + t.getMessage(), requestId));
} else if (requestCommand instanceof CommandPartitionedTopicMetadata) {
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName,
"Invalid topic name: " + t.getMessage(), requestId));
} else {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.InvalidTopicName,
"Invalid topic name: " + t.getMessage()));
}
return null;
}
}
public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long entryId, int partition,
int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, String topic) {
BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount,
ackSet);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
try {
getBrokerService().getInterceptor().onPulsarCommand(command, this);
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
}
return res;
}
private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
/**
* Helper method for testability.
*
* @return the connection state
*/
public State getState() {
return state;
}
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
public BrokerService getBrokerService() {
return service;
}
public String getRole() {
return authRole;
}
@Override
public Promise<Void> newPromise() {
return ctx.newPromise();
}
@Override
public HAProxyMessage getHAProxyMessage() {
return proxyMessage;
}
@Override
public boolean hasHAProxyMessage() {
return proxyMessage != null;
}
boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}
@Override
public boolean isBatchMessageCompatibleVersion() {
return getRemoteEndpointProtocolVersion() >= ProtocolVersion.v4.getValue();
}
boolean supportsAuthenticationRefresh() {
return features != null && features.isSupportsAuthRefresh();
}
boolean supportBrokerMetadata() {
return features != null && features.isSupportsBrokerEntryMetadata();
}
@Override
public String getClientVersion() {
return clientVersion;
}
@VisibleForTesting
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
}
@Override
public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}
public AuthenticationState getAuthState() {
return authState;
}
@Override
public AuthenticationDataSource getAuthenticationData() {
return originalAuthData != null ? originalAuthData : authenticationData;
}
public String getPrincipal() {
return originalPrincipal != null ? originalPrincipal : authRole;
}
public AuthenticationProvider getAuthenticationProvider() {
return authenticationProvider;
}
@Override
public String getAuthRole() {
return authRole;
}
public String getAuthMethod() {
return authMethod;
}
public ConcurrentLongHashMap<CompletableFuture<Consumer>> getConsumers() {
return consumers;
}
public ConcurrentLongHashMap<CompletableFuture<Producer>> getProducers() {
return producers;
}
@Override
public PulsarCommandSender getCommandSender() {
return commandSender;
}
@Override
public void execute(Runnable runnable) {
ctx.channel().eventLoop().execute(runnable);
}
@Override
public String clientSourceAddress() {
if (proxyMessage != null) {
return proxyMessage.sourceAddress();
} else if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress inetAddress = (InetSocketAddress) remoteAddress;
return inetAddress.getAddress().getHostAddress();
} else {
return null;
}
}
private static void logAuthException(SocketAddress remoteAddress, String operation,
String principal, Optional<TopicName> topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
if (ex instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
remoteAddress, operation, principal, topicString, ex.getMessage());
} else {
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
remoteAddress, operation, principal, topicString, ex);
}
}
public boolean hasProducers() {
return !producers.isEmpty();
}
}