| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.commons.lang3.StringUtils.isNotBlank; |
| import static org.apache.pulsar.broker.admin.PersistentTopics.getPartitionedTopicMetadata; |
| import static org.apache.pulsar.broker.lookup.DestinationLookup.lookupDestinationAsync; |
| import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse; |
| import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5; |
| |
| import java.net.SocketAddress; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.naming.AuthenticationException; |
| import javax.net.ssl.SSLSession; |
| |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.bookkeeper.mledger.util.SafeRun; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; |
| import org.apache.pulsar.broker.authentication.AuthenticationDataSource; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; |
| import org.apache.pulsar.broker.web.RestException; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.impl.BatchMessageIdImpl; |
| import org.apache.pulsar.client.impl.ClientCnx; |
| import org.apache.pulsar.client.impl.MessageIdImpl; |
| import org.apache.pulsar.common.api.CommandUtils; |
| import org.apache.pulsar.common.api.Commands; |
| import org.apache.pulsar.common.api.PulsarHandler; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; |
| import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; |
| import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; |
| import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; |
| import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; |
| import org.apache.pulsar.common.naming.DestinationName; |
| import org.apache.pulsar.common.naming.Metadata; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.ConsumerStats; |
| import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Sets; |
| import com.google.protobuf.GeneratedMessageLite; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.channel.ChannelHandler; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.ChannelOption; |
| import io.netty.handler.ssl.SslHandler; |
| |
| public class ServerCnx extends PulsarHandler { |
| private final BrokerService service; |
| private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers; |
| private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers; |
| private State state; |
| private volatile boolean isActive = true; |
| String authRole = null; |
| AuthenticationDataSource authenticationData; |
| |
| // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow |
| // control done by a single producer might not be enough to prevent write spikes on the broker. |
| private static final int MaxPendingSendRequests = 1000; |
| private static final int ResumeReadsThreshold = MaxPendingSendRequests / 2; |
| private int pendingSendRequest = 0; |
| private final String replicatorPrefix; |
| private String clientVersion = null; |
| private int nonPersistentPendingMessages = 0; |
| private final int MaxNonPersistentPendingMessages; |
| private String originalPrincipal = null; |
| private Set<String> proxyRoles; |
| private boolean authenticateOriginalAuthData; |
| |
| enum State { |
| Start, Connected, Failed |
| } |
| |
| public ServerCnx(BrokerService service) { |
| super(service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); |
| this.service = service; |
| this.state = State.Start; |
| |
| // This maps are not heavily contended since most accesses are within the cnx thread |
| this.producers = new ConcurrentLongHashMap<>(8, 1); |
| this.consumers = new ConcurrentLongHashMap<>(8, 1); |
| this.replicatorPrefix = service.pulsar().getConfiguration().getReplicatorPrefix(); |
| this.MaxNonPersistentPendingMessages = service.pulsar().getConfiguration() |
| .getMaxConcurrentNonPersistentMessagePerConnection(); |
| this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles(); |
| this.authenticateOriginalAuthData = service.pulsar().getConfiguration().authenticateOriginalAuthData(); |
| } |
| |
| @Override |
| public void channelActive(ChannelHandlerContext ctx) throws Exception { |
| super.channelActive(ctx); |
| log.info("New connection from {}", remoteAddress); |
| this.ctx = ctx; |
| } |
| |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
| super.channelInactive(ctx); |
| isActive = false; |
| log.info("Closed connection from {}", remoteAddress); |
| |
| // Connection is gone, close the producers immediately |
| producers.values().forEach((producerFuture) -> { |
| if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) { |
| Producer producer = producerFuture.getNow(null); |
| producer.closeNow(); |
| } |
| }); |
| |
| consumers.values().forEach((consumerFuture) -> { |
| Consumer consumer; |
| if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { |
| consumer = consumerFuture.getNow(null); |
| } else { |
| return; |
| } |
| |
| try { |
| consumer.close(); |
| } catch (BrokerServiceException e) { |
| log.warn("Consumer {} was already closed: {}", consumer, e.getMessage(), e); |
| } |
| }); |
| } |
| |
| @Override |
| public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { |
| if (log.isDebugEnabled()) { |
| log.debug("Channel writability has changed to: {}", ctx.channel().isWritable()); |
| } |
| } |
| |
| @Override |
| public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
| if (state != State.Failed) { |
| // No need to report stack trace for known exceptions that happen in disconnections |
| log.warn("[{}] Got exception {} : {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), |
| ClientCnx.isKnownException(cause) ? null : cause); |
| state = State.Failed; |
| } else { |
| // At default info level, suppress all subsequent exceptions that are thrown when the connection has already |
| // failed |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Got exception: {}", remoteAddress, cause.getMessage(), cause); |
| } |
| } |
| ctx.close(); |
| } |
| |
| /* |
| * If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce |
| * - the originalPrincipal is given while connecting |
| * - originalPrincipal is not blank |
| * - originalPrincipal is not a proxy principal |
| */ |
| private boolean invalidOriginalPrincipal(String originalPrincipal) { |
| return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled() && proxyRoles.contains(authRole) |
| && (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal))); |
| } |
| |
| // //// |
| // // Incoming commands handling |
| // //// |
| |
| @Override |
| protected void handleLookup(CommandLookupTopic lookup) { |
| final long requestId = lookup.getRequestId(); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId); |
| } |
| |
| DestinationName topicName = validateTopicName(lookup.getTopic(), requestId, lookup); |
| if (topicName == null) { |
| return; |
| } |
| |
| String originalPrincipal = null; |
| if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) { |
| originalPrincipal = validateOriginalPrincipal( |
| lookup.hasOriginalAuthData() ? lookup.getOriginalAuthData() : null, |
| lookup.hasOriginalAuthMethod() ? lookup.getOriginalAuthMethod() : null, |
| lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal, requestId, |
| lookup); |
| |
| if (originalPrincipal == null) { |
| return; |
| } |
| } else { |
| originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal; |
| } |
| |
| final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); |
| if (lookupSemaphore.tryAcquire()) { |
| if (invalidOriginalPrincipal(originalPrincipal)) { |
| final String msg = "Valid Proxy Client role should be provided for lookup "; |
| log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, |
| originalPrincipal, topicName); |
| ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId)); |
| lookupSemaphore.release(); |
| return; |
| } |
| CompletableFuture<Boolean> isProxyAuthorizedFuture; |
| if (service.isAuthorizationEnabled() && originalPrincipal != null) { |
| isProxyAuthorizedFuture = service.getAuthorizationService().canLookupAsync(topicName, authRole, |
| authenticationData); |
| } else { |
| isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); |
| } |
| String finalOriginalPrincipal = originalPrincipal; |
| isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { |
| if (isProxyAuthorized) { |
| lookupDestinationAsync(getBrokerService().pulsar(), topicName, lookup.getAuthoritative(), |
| finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData, |
| lookup.getRequestId()).handle((lookupResponse, ex) -> { |
| if (ex == null) { |
| ctx.writeAndFlush(lookupResponse); |
| } else { |
| // it should never happen |
| log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topicName, |
| ex.getMessage(), ex); |
| ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady, |
| ex.getMessage(), requestId)); |
| } |
| lookupSemaphore.release(); |
| return null; |
| }); |
| } else { |
| final String msg = "Proxy Client is not authorized to Lookup"; |
| log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); |
| ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId)); |
| lookupSemaphore.release(); |
| } |
| return null; |
| }).exceptionally(ex -> { |
| final String msg = "Exception occured while trying to authorize lookup"; |
| log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); |
| ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId)); |
| lookupSemaphore.release(); |
| return null; |
| }); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topicName); |
| } |
| ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, |
| "Failed due to too many pending lookup requests", requestId)); |
| } |
| } |
| |
| @Override |
| protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { |
| final long requestId = partitionMetadata.getRequestId(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(), |
| remoteAddress, requestId); |
| } |
| |
| DestinationName topicName = validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata); |
| if (topicName == null) { |
| return; |
| } |
| String originalPrincipal = null; |
| if (authenticateOriginalAuthData && partitionMetadata.hasOriginalAuthData()) { |
| originalPrincipal = validateOriginalPrincipal( |
| partitionMetadata.hasOriginalAuthData() ? partitionMetadata.getOriginalAuthData() : null, |
| partitionMetadata.hasOriginalAuthMethod() ? partitionMetadata.getOriginalAuthMethod() : null, |
| partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() |
| : this.originalPrincipal, |
| requestId, partitionMetadata); |
| |
| if (originalPrincipal == null) { |
| return; |
| } |
| } else { |
| originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal; |
| } |
| |
| final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); |
| if (lookupSemaphore.tryAcquire()) { |
| if (invalidOriginalPrincipal(originalPrincipal)) { |
| final String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest "; |
| log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, |
| originalPrincipal, topicName); |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, |
| msg, requestId)); |
| lookupSemaphore.release(); |
| return; |
| } |
| CompletableFuture<Boolean> isProxyAuthorizedFuture; |
| if (service.isAuthorizationEnabled() && originalPrincipal != null) { |
| isProxyAuthorizedFuture = service.getAuthorizationService() |
| .canLookupAsync(topicName, authRole, authenticationData); |
| } else { |
| isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); |
| } |
| String finalOriginalPrincipal = originalPrincipal; |
| isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { |
| if (isProxyAuthorized) { |
| getPartitionedTopicMetadata(getBrokerService().pulsar(), |
| finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData, |
| topicName).handle((metadata, ex) -> { |
| if (ex == null) { |
| int partitions = metadata.partitions; |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); |
| } else { |
| if (ex instanceof PulsarClientException) { |
| log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), |
| remoteAddress, topicName, ex.getMessage()); |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse( |
| ServerError.AuthorizationError, ex.getMessage(), requestId)); |
| } else { |
| log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, |
| topicName, ex.getMessage(), ex); |
| ServerError error = (ex instanceof RestException) |
| && ((RestException) ex).getResponse().getStatus() < 500 |
| ? ServerError.MetadataError : ServerError.ServiceNotReady; |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error, |
| ex.getMessage(), requestId)); |
| } |
| } |
| lookupSemaphore.release(); |
| return null; |
| }); |
| } else { |
| final String msg = "Proxy Client is not authorized to Get Partition Metadata"; |
| log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); |
| ctx.writeAndFlush( |
| Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId)); |
| lookupSemaphore.release(); |
| } |
| return null; |
| }).exceptionally(ex -> { |
| final String msg = "Exception occured while trying to authorize get Partition Metadata"; |
| log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId)); |
| lookupSemaphore.release(); |
| return null; |
| }); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", remoteAddress, |
| topicName); |
| } |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TooManyRequests, |
| "Failed due to too many pending lookup requests", requestId)); |
| } |
| } |
| |
| @Override |
| protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) { |
| if (log.isDebugEnabled()) { |
| log.debug("Received CommandConsumerStats call from {}", remoteAddress); |
| } |
| |
| final long requestId = commandConsumerStats.getRequestId(); |
| final long consumerId = commandConsumerStats.getConsumerId(); |
| CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId); |
| Consumer consumer = consumerFuture.getNow(null); |
| ByteBuf msg = null; |
| |
| if (consumer == null) { |
| log.error( |
| "Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", |
| remoteAddress, requestId, consumerId); |
| msg = Commands.newConsumerStatsResponse(ServerError.ConsumerNotFound, |
| "Consumer " + consumerId + " not found", requestId); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", requestId, consumer); |
| } |
| msg = Commands.newConsumerStatsResponse(createConsumerStatsResponse(consumer, requestId)); |
| } |
| |
| ctx.writeAndFlush(msg); |
| } |
| |
| CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consumer, long requestId) { |
| CommandConsumerStatsResponse.Builder commandConsumerStatsResponseBuilder = CommandConsumerStatsResponse |
| .newBuilder(); |
| ConsumerStats consumerStats = consumer.getStats(); |
| commandConsumerStatsResponseBuilder.setRequestId(requestId); |
| commandConsumerStatsResponseBuilder.setMsgRateOut(consumerStats.msgRateOut); |
| commandConsumerStatsResponseBuilder.setMsgThroughputOut(consumerStats.msgThroughputOut); |
| commandConsumerStatsResponseBuilder.setMsgRateRedeliver(consumerStats.msgRateRedeliver); |
| commandConsumerStatsResponseBuilder.setConsumerName(consumerStats.consumerName); |
| commandConsumerStatsResponseBuilder.setAvailablePermits(consumerStats.availablePermits); |
| commandConsumerStatsResponseBuilder.setUnackedMessages(consumerStats.unackedMessages); |
| commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs); |
| commandConsumerStatsResponseBuilder.setAddress(consumerStats.address); |
| commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.connectedSince); |
| |
| Subscription subscription = consumer.getSubscription(); |
| commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog()); |
| commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate()); |
| commandConsumerStatsResponseBuilder.setType(subscription.getTypeString()); |
| |
| return commandConsumerStatsResponseBuilder; |
| } |
| |
| private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) { |
| ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER); |
| SSLSession sslSession = null; |
| if (sslHandler != null) { |
| sslSession = ((SslHandler) sslHandler).engine().getSession(); |
| } |
| try { |
| return getOriginalPrincipal(originalAuthData, originalAuthMethod, originalPrincipal, sslSession); |
| } catch (AuthenticationException e) { |
| String msg = "Unable to authenticate original authdata "; |
| log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage()); |
| if (request instanceof CommandLookupTopic) { |
| ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthenticationError, msg, requestId)); |
| } else if (request instanceof CommandPartitionedTopicMetadata) { |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthenticationError, msg, requestId)); |
| } |
| return null; |
| } |
| } |
| |
| private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, |
| SSLSession sslSession) throws AuthenticationException { |
| if (authenticateOriginalAuthData) { |
| if (originalAuthData != null) { |
| originalPrincipal = getBrokerService().getAuthenticationService().authenticate( |
| new AuthenticationDataCommand(originalAuthData, remoteAddress, sslSession), originalAuthMethod); |
| } else { |
| originalPrincipal = null; |
| } |
| } |
| return originalPrincipal; |
| } |
| |
| @Override |
| protected void handleConnect(CommandConnect connect) { |
| checkArgument(state == State.Start); |
| if (service.isAuthenticationEnabled()) { |
| try { |
| String authMethod = "none"; |
| if (connect.hasAuthMethodName()) { |
| authMethod = connect.getAuthMethodName(); |
| } else if (connect.hasAuthMethod()) { |
| // Legacy client is passing enum |
| authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); |
| } |
| |
| String authData = connect.getAuthData().toStringUtf8(); |
| ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER); |
| SSLSession sslSession = null; |
| if (sslHandler != null) { |
| sslSession = ((SslHandler) sslHandler).engine().getSession(); |
| } |
| originalPrincipal = getOriginalPrincipal( |
| connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null, |
| connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null, |
| connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null, |
| sslSession); |
| authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession); |
| authRole = getBrokerService().getAuthenticationService() |
| .authenticate(authenticationData, authMethod); |
| |
| log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal); |
| } catch (AuthenticationException e) { |
| String msg = "Unable to authenticate"; |
| log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage()); |
| ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); |
| close(); |
| return; |
| } |
| } |
| if (log.isDebugEnabled()) { |
| log.debug("Received CONNECT from {}", remoteAddress); |
| } |
| ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion())); |
| state = State.Connected; |
| remoteEndpointProtocolVersion = connect.getProtocolVersion(); |
| String version = connect.hasClientVersion() ? connect.getClientVersion() : null; |
| if (isNotBlank(version) && !version.contains(" ") /* ignore default version: pulsar client */) { |
| this.clientVersion = version.intern(); |
| } |
| } |
| |
| @Override |
| protected void handleSubscribe(final CommandSubscribe subscribe) { |
| checkArgument(state == State.Connected); |
| final long requestId = subscribe.getRequestId(); |
| final long consumerId = subscribe.getConsumerId(); |
| DestinationName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe); |
| if (topicName == null) { |
| return; |
| } |
| |
| if (invalidOriginalPrincipal(originalPrincipal)) { |
| final String msg = "Valid Proxy Client role should be provided while subscribing "; |
| log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, |
| originalPrincipal, topicName); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); |
| return; |
| } |
| |
| final String subscriptionName = subscribe.getSubscription(); |
| final SubType subType = subscribe.getSubType(); |
| final String consumerName = subscribe.getConsumerName(); |
| final boolean isDurable = subscribe.getDurable(); |
| final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl( |
| subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), |
| subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) |
| : null; |
| |
| final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; |
| final boolean readCompacted = subscribe.getReadCompacted(); |
| final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe); |
| |
| CompletableFuture<Boolean> isProxyAuthorizedFuture; |
| if (service.isAuthorizationEnabled() && originalPrincipal != null) { |
| isProxyAuthorizedFuture = service.getAuthorizationService().canConsumeAsync(topicName, authRole, |
| authenticationData, subscribe.getSubscription()); |
| } else { |
| isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); |
| } |
| isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { |
| if (isProxyAuthorized) { |
| CompletableFuture<Boolean> authorizationFuture; |
| if (service.isAuthorizationEnabled()) { |
| authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName, |
| originalPrincipal != null ? originalPrincipal : authRole, authenticationData, |
| subscribe.getSubscription()); |
| } else { |
| authorizationFuture = CompletableFuture.completedFuture(true); |
| } |
| |
| authorizationFuture.thenApply(isAuthorized -> { |
| if (isAuthorized) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole); |
| } |
| |
| log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); |
| try { |
| Metadata.validateMetadata(metadata); |
| } catch (IllegalArgumentException iae) { |
| final String msg = iae.getMessage(); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg)); |
| return null; |
| } |
| CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>(); |
| CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, |
| consumerFuture); |
| |
| if (existingConsumerFuture != null) { |
| if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { |
| Consumer consumer = existingConsumerFuture.getNow(null); |
| log.info("[{}] Consumer with the same id is already created: {}", remoteAddress, |
| consumer); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| return null; |
| } else { |
| // There was an early request to create a consumer with same consumerId. This can happen |
| // when |
| // client timeout is lower the broker timeouts. We need to wait until the previous |
| // consumer |
| // creation request either complete or fails. |
| log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, |
| topicName, subscriptionName); |
| ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady |
| : getErrorCode(existingConsumerFuture); |
| ctx.writeAndFlush(Commands.newError(requestId, error, |
| "Consumer is already present on the connection")); |
| return null; |
| } |
| } |
| |
| service.getTopic(topicName.toString()) |
| .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, |
| subType, priorityLevel, consumerName, isDurable, |
| startMessageId, metadata, readCompacted)) |
| .thenAccept(consumer -> { |
| if (consumerFuture.complete(consumer)) { |
| log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, |
| subscriptionName); |
| ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise()); |
| } else { |
| // The consumer future was completed before by a close command |
| try { |
| consumer.close(); |
| log.info("[{}] Cleared consumer created after timeout on client side {}", |
| remoteAddress, consumer); |
| } catch (BrokerServiceException e) { |
| log.warn( |
| "[{}] Error closing consumer created after timeout on client side {}: {}", |
| remoteAddress, consumer, e.getMessage()); |
| } |
| consumers.remove(consumerId, consumerFuture); |
| } |
| |
| }) // |
| .exceptionally(exception -> { |
| if (exception.getCause() instanceof ConsumerBusyException) { |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}", |
| remoteAddress, topicName, subscriptionName, |
| exception.getCause().getMessage()); |
| } |
| } else { |
| log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, |
| subscriptionName, exception.getCause().getMessage(), exception); |
| } |
| |
| // If client timed out, the future would have been completed by subsequent close. |
| // Send error |
| // back to client, only if not completed already. |
| if (consumerFuture.completeExceptionally(exception)) { |
| ctx.writeAndFlush(Commands.newError(requestId, |
| BrokerServiceException.getClientErrorCode(exception.getCause()), |
| exception.getCause().getMessage())); |
| } |
| consumers.remove(consumerId, consumerFuture); |
| |
| return null; |
| |
| }); |
| } else { |
| String msg = "Client is not authorized to subscribe"; |
| log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); |
| } |
| return null; |
| }).exceptionally(e -> { |
| String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole); |
| log.warn(msg); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage())); |
| return null; |
| }); |
| } else { |
| final String msg = "Proxy Client is not authorized to subscribe"; |
| log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); |
| } |
| return null; |
| }).exceptionally(ex -> { |
| String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole); |
| if (ex.getCause() instanceof PulsarServerException) { |
| log.info(msg); |
| } else { |
| log.warn(msg); |
| } |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage())); |
| return null; |
| }); |
| } |
| |
| @Override |
| protected void handleProducer(final CommandProducer cmdProducer) { |
| checkArgument(state == State.Connected); |
| final long producerId = cmdProducer.getProducerId(); |
| final long requestId = cmdProducer.getRequestId(); |
| // Use producer name provided by client if present |
| final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() |
| : service.generateUniqueProducerName(); |
| final boolean isEncrypted = cmdProducer.getEncrypted(); |
| final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer); |
| |
| DestinationName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer); |
| if (topicName == null) { |
| return; |
| } |
| |
| if (invalidOriginalPrincipal(originalPrincipal)) { |
| final String msg = "Valid Proxy Client role should be provided while creating producer "; |
| log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, |
| originalPrincipal, topicName); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); |
| return; |
| } |
| |
| CompletableFuture<Boolean> isProxyAuthorizedFuture; |
| if (service.isAuthorizationEnabled() && originalPrincipal != null) { |
| isProxyAuthorizedFuture = service.getAuthorizationService().canProduceAsync(topicName, |
| authRole, authenticationData); |
| } else { |
| isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); |
| } |
| isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { |
| if (isProxyAuthorized) { |
| CompletableFuture<Boolean> authorizationFuture; |
| if (service.isAuthorizationEnabled()) { |
| authorizationFuture = service.getAuthorizationService().canProduceAsync(topicName, |
| originalPrincipal != null ? originalPrincipal : authRole, authenticationData); |
| } else { |
| authorizationFuture = CompletableFuture.completedFuture(true); |
| } |
| |
| authorizationFuture.thenApply(isAuthorized -> { |
| if (isAuthorized) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole); |
| } |
| CompletableFuture<Producer> producerFuture = new CompletableFuture<>(); |
| CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, |
| producerFuture); |
| |
| if (existingProducerFuture != null) { |
| if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) { |
| Producer producer = existingProducerFuture.getNow(null); |
| log.info("[{}] Producer with the same id is already created: {}", remoteAddress, |
| producer); |
| ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName())); |
| return null; |
| } else { |
| // There was an early request to create a producer with |
| // same producerId. This can happen when |
| // client |
| // timeout is lower the broker timeouts. We need to wait |
| // until the previous producer creation |
| // request |
| // either complete or fails. |
| ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady |
| : getErrorCode(existingProducerFuture); |
| log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, |
| topicName); |
| ctx.writeAndFlush(Commands.newError(requestId, error, |
| "Producer is already present on the connection")); |
| return null; |
| } |
| } |
| |
| log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); |
| |
| service.getTopic(topicName.toString()).thenAccept((Topic topic) -> { |
| // Before creating producer, check if backlog quota exceeded |
| // on topic |
| if (topic.isBacklogQuotaExceeded(producerName)) { |
| IllegalStateException illegalStateException = new IllegalStateException( |
| "Cannot create producer on topic with backlog quota exceeded"); |
| BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy(); |
| if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { |
| ctx.writeAndFlush( |
| Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError, |
| illegalStateException.getMessage())); |
| } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { |
| ctx.writeAndFlush(Commands.newError(requestId, |
| ServerError.ProducerBlockedQuotaExceededException, |
| illegalStateException.getMessage())); |
| } |
| producerFuture.completeExceptionally(illegalStateException); |
| producers.remove(producerId, producerFuture); |
| return; |
| } |
| |
| // Check whether the producer will publish encrypted messages or not |
| if (topic.isEncryptionRequired() && !isEncrypted) { |
| String msg = String.format("Encryption is required in %s", topicName); |
| log.warn("[{}] {}", remoteAddress, msg); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg)); |
| return; |
| } |
| |
| disableTcpNoDelayIfNeeded(topicName.toString(), producerName); |
| |
| Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, |
| isEncrypted, metadata); |
| |
| try { |
| topic.addProducer(producer); |
| |
| if (isActive()) { |
| if (producerFuture.complete(producer)) { |
| log.info("[{}] Created new producer: {}", remoteAddress, producer); |
| ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName, |
| producer.getLastSequenceId())); |
| return; |
| } else { |
| // The producer's future was completed before by |
| // a close command |
| producer.closeNow(); |
| log.info("[{}] Cleared producer created after timeout on client side {}", |
| remoteAddress, producer); |
| } |
| } else { |
| producer.closeNow(); |
| log.info("[{}] Cleared producer created after connection was closed: {}", |
| remoteAddress, producer); |
| producerFuture.completeExceptionally( |
| new IllegalStateException("Producer created after connection was closed")); |
| } |
| } catch (BrokerServiceException ise) { |
| log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName, |
| ise.getMessage()); |
| ctx.writeAndFlush(Commands.newError(requestId, |
| BrokerServiceException.getClientErrorCode(ise), ise.getMessage())); |
| producerFuture.completeExceptionally(ise); |
| } |
| |
| producers.remove(producerId, producerFuture); |
| }).exceptionally(exception -> { |
| Throwable cause = exception.getCause(); |
| if (!(cause instanceof ServiceUnitNotReadyException)) { |
| // Do not print stack traces for expected exceptions |
| log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception); |
| } |
| |
| // If client timed out, the future would have been completed |
| // by subsequent close. Send error back to |
| // client, only if not completed already. |
| if (producerFuture.completeExceptionally(exception)) { |
| ctx.writeAndFlush(Commands.newError(requestId, |
| BrokerServiceException.getClientErrorCode(cause), cause.getMessage())); |
| } |
| producers.remove(producerId, producerFuture); |
| |
| return null; |
| }); |
| } else { |
| String msg = "Client is not authorized to Produce"; |
| log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); |
| } |
| return null; |
| }).exceptionally(e -> { |
| String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole); |
| log.warn(msg); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage())); |
| return null; |
| }); |
| } else { |
| final String msg = "Proxy Client is not authorized to Produce"; |
| log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); |
| } |
| return null; |
| }).exceptionally(ex -> { |
| String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole); |
| log.warn(msg); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage())); |
| return null; |
| }); |
| } |
| |
| @Override |
| protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { |
| checkArgument(state == State.Connected); |
| |
| CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId()); |
| |
| if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) { |
| log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId()); |
| return; |
| } |
| |
| Producer producer = producerFuture.getNow(null); |
| if (log.isDebugEnabled()) { |
| printSendCommandDebug(send, headersAndPayload); |
| } |
| |
| if (producer.isNonPersistentTopic()) { |
| // avoid processing non-persist message if reached max concurrent-message limit |
| if (nonPersistentPendingMessages > MaxNonPersistentPendingMessages) { |
| final long producerId = send.getProducerId(); |
| final long sequenceId = send.getSequenceId(); |
| service.getTopicOrderedExecutor().submitOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> { |
| ctx.writeAndFlush(Commands.newSendReceipt(producerId, sequenceId, -1, -1), ctx.voidPromise()); |
| })); |
| producer.recordMessageDrop(send.getNumMessages()); |
| return; |
| } else { |
| nonPersistentPendingMessages++; |
| } |
| } |
| |
| startSendOperation(); |
| |
| // Persist the message |
| producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages()); |
| } |
| |
| private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) { |
| headersAndPayload.markReaderIndex(); |
| MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); |
| headersAndPayload.resetReaderIndex(); |
| |
| log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}", remoteAddress, |
| send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), |
| headersAndPayload.readableBytes()); |
| msgMetadata.recycle(); |
| } |
| |
| @Override |
| protected void handleAck(CommandAck ack) { |
| checkArgument(state == State.Connected); |
| CompletableFuture<Consumer> consumerFuture = consumers.get(ack.getConsumerId()); |
| |
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { |
| consumerFuture.getNow(null).messageAcked(ack); |
| } |
| } |
| |
| @Override |
| protected void handleFlow(CommandFlow flow) { |
| checkArgument(state == State.Connected); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Received flow from consumer {} permits: {}", remoteAddress, flow.getConsumerId(), |
| flow.getMessagePermits()); |
| } |
| |
| CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId()); |
| |
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { |
| Consumer consumer = consumerFuture.getNow(null); |
| if (consumer != null) { |
| consumer.flowPermits(flow.getMessagePermits()); |
| } else { |
| log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId()); |
| } |
| } |
| } |
| |
| @Override |
| protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) { |
| checkArgument(state == State.Connected); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Received Resend Command from consumer {} ", remoteAddress, redeliver.getConsumerId()); |
| } |
| |
| CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId()); |
| |
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { |
| Consumer consumer = consumerFuture.getNow(null); |
| if (redeliver.getMessageIdsCount() > 0 && consumer.subType() == SubType.Shared) { |
| consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList()); |
| } else { |
| consumer.redeliverUnacknowledgedMessages(); |
| } |
| } |
| } |
| |
| @Override |
| protected void handleUnsubscribe(CommandUnsubscribe unsubscribe) { |
| checkArgument(state == State.Connected); |
| |
| CompletableFuture<Consumer> consumerFuture = consumers.get(unsubscribe.getConsumerId()); |
| |
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { |
| consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId()); |
| } else { |
| ctx.writeAndFlush( |
| Commands.newError(unsubscribe.getRequestId(), ServerError.MetadataError, "Consumer not found")); |
| } |
| } |
| |
| @Override |
| protected void handleSeek(CommandSeek seek) { |
| checkArgument(state == State.Connected); |
| |
| CompletableFuture<Consumer> consumerFuture = consumers.get(seek.getConsumerId()); |
| |
| // Currently only seeking on a message id is supported |
| if (!seek.hasMessageId()) { |
| ctx.writeAndFlush( |
| Commands.newError(seek.getRequestId(), ServerError.MetadataError, "Message id was not present")); |
| return; |
| } |
| |
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { |
| Consumer consumer = consumerFuture.getNow(null); |
| Subscription subscription = consumer.getSubscription(); |
| MessageIdData msgIdData = seek.getMessageId(); |
| |
| Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId()); |
| long requestId = seek.getRequestId(); |
| |
| subscription.resetCursor(position).thenRun(() -> { |
| log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress, |
| subscription.getTopic().getName(), subscription.getName(), position); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| }).exceptionally(ex -> { |
| log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex); |
| ctx.writeAndFlush(Commands.newError(seek.getRequestId(), ServerError.UnknownError, |
| "Error when resetting subscription: " + ex.getCause().getMessage())); |
| return null; |
| }); |
| } else { |
| ctx.writeAndFlush(Commands.newError(seek.getRequestId(), ServerError.MetadataError, "Consumer not found")); |
| } |
| } |
| |
| @Override |
| protected void handleCloseProducer(CommandCloseProducer closeProducer) { |
| checkArgument(state == State.Connected); |
| |
| final long producerId = closeProducer.getProducerId(); |
| final long requestId = closeProducer.getRequestId(); |
| |
| CompletableFuture<Producer> producerFuture = producers.get(producerId); |
| if (producerFuture == null) { |
| log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, |
| "Producer was not registered on the connection")); |
| return; |
| } |
| |
| if (!producerFuture.isDone() && producerFuture |
| .completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) { |
| // We have received a request to close the producer before it was actually completed, we have marked the |
| // producer future as failed and we can tell the client the close operation was successful. When the actual |
| // create operation will complete, the new producer will be discarded. |
| log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| return; |
| } else if (producerFuture.isCompletedExceptionally()) { |
| log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| return; |
| } |
| |
| // Proceed with normal close, the producer |
| Producer producer = producerFuture.getNow(null); |
| log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress); |
| |
| producer.close().thenAccept(v -> { |
| log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(), |
| remoteAddress); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| producers.remove(producerId, producerFuture); |
| }); |
| } |
| |
| @Override |
| protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { |
| checkArgument(state == State.Connected); |
| log.info("[{}] Closing consumer: {}", remoteAddress, closeConsumer.getConsumerId()); |
| |
| long requestId = closeConsumer.getRequestId(); |
| long consumerId = closeConsumer.getConsumerId(); |
| |
| CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId); |
| if (consumerFuture == null) { |
| log.warn("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress); |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Consumer not found")); |
| return; |
| } |
| |
| if (!consumerFuture.isDone() && consumerFuture |
| .completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) { |
| // We have received a request to close the consumer before it was actually completed, we have marked the |
| // consumer future as failed and we can tell the client the close operation was successful. When the actual |
| // create operation will complete, the new consumer will be discarded. |
| log.info("[{}] Closed consumer {} before its creation was completed", remoteAddress, consumerId); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| return; |
| } |
| |
| if (consumerFuture.isCompletedExceptionally()) { |
| log.info("[{}] Closed consumer {} that already failed to be created", remoteAddress, consumerId); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| return; |
| } |
| |
| // Proceed with normal consumer close |
| Consumer consumer = consumerFuture.getNow(null); |
| try { |
| consumer.close(); |
| consumers.remove(consumerId, consumerFuture); |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); |
| log.info("[{}] Closed consumer {}", remoteAddress, consumer); |
| } catch (BrokerServiceException e) { |
| log.warn("[{]] Error closing consumer: ", remoteAddress, consumer, e); |
| ctx.writeAndFlush( |
| Commands.newError(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage())); |
| } |
| } |
| |
| @Override |
| protected boolean isHandshakeCompleted() { |
| return state == State.Connected; |
| } |
| |
| ChannelHandlerContext ctx() { |
| return ctx; |
| } |
| |
| public void closeProducer(Producer producer) { |
| // removes producer-connection from map and send close command to producer |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Removed producer: {}", remoteAddress, producer); |
| } |
| long producerId = producer.getProducerId(); |
| producers.remove(producerId); |
| if (remoteEndpointProtocolVersion >= v5.getNumber()) { |
| ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L)); |
| } else { |
| close(); |
| } |
| |
| } |
| |
| public void closeConsumer(Consumer consumer) { |
| // removes consumer-connection from map and send close command to consumer |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Removed consumer: {}", remoteAddress, consumer); |
| } |
| long consumerId = consumer.consumerId(); |
| consumers.remove(consumerId); |
| if (remoteEndpointProtocolVersion >= v5.getNumber()) { |
| ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L)); |
| } else { |
| close(); |
| } |
| } |
| |
| /** |
| * It closes the connection with client which triggers {@code channelInactive()} which clears all producers and |
| * consumers from connection-map |
| */ |
| protected void close() { |
| ctx.close(); |
| } |
| |
| public SocketAddress clientAddress() { |
| return remoteAddress; |
| } |
| |
| public void removedConsumer(Consumer consumer) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Removed consumer: {}", remoteAddress, consumer); |
| } |
| |
| consumers.remove(consumer.consumerId()); |
| } |
| |
| public void removedProducer(Producer producer) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Removed producer: {}", remoteAddress, producer); |
| } |
| producers.remove(producer.getProducerId()); |
| } |
| |
| public boolean isActive() { |
| return isActive; |
| } |
| |
| public boolean isWritable() { |
| return ctx.channel().isWritable(); |
| } |
| |
| public void startSendOperation() { |
| if (++pendingSendRequest == MaxPendingSendRequests) { |
| // When the quota of pending send requests is reached, stop reading from socket to cause backpressure on |
| // client connection, possibly shared between multiple producers |
| ctx.channel().config().setAutoRead(false); |
| } |
| } |
| |
| public void completedSendOperation(boolean isNonPersistentTopic) { |
| if (--pendingSendRequest == ResumeReadsThreshold) { |
| // Resume reading from socket |
| ctx.channel().config().setAutoRead(true); |
| } |
| if (isNonPersistentTopic) { |
| nonPersistentPendingMessages--; |
| } |
| } |
| |
| private <T> ServerError getErrorCode(CompletableFuture<T> future) { |
| ServerError error = ServerError.UnknownError; |
| try { |
| future.getNow(null); |
| } catch (Exception e) { |
| if (e.getCause() instanceof BrokerServiceException) { |
| error = BrokerServiceException.getClientErrorCode((BrokerServiceException) e.getCause()); |
| } |
| } |
| return error; |
| } |
| |
| private final void disableTcpNoDelayIfNeeded(String topic, String producerName) { |
| if (producerName != null && producerName.startsWith(replicatorPrefix)) { |
| // Re-enable nagle algorithm on connections used for replication purposes |
| try { |
| if (ctx.channel().config().getOption(ChannelOption.TCP_NODELAY).booleanValue() == true) { |
| ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, false); |
| } |
| } catch (Throwable t) { |
| log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", topic, producerName, |
| ctx.channel()); |
| } |
| } |
| } |
| |
| private DestinationName validateTopicName(String topic, long requestId, GeneratedMessageLite requestCommand) { |
| try { |
| return DestinationName.get(topic); |
| } catch (Throwable t) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t); |
| } |
| |
| if (requestCommand instanceof CommandLookupTopic) { |
| ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName, |
| "Invalid topic name: " + t.getMessage(), requestId)); |
| } else if (requestCommand instanceof CommandPartitionedTopicMetadata) { |
| ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName, |
| "Invalid topic name: " + t.getMessage(), requestId)); |
| } else { |
| ctx.writeAndFlush(Commands.newError(requestId, ServerError.InvalidTopicName, |
| "Invalid topic name: " + t.getMessage())); |
| } |
| |
| return null; |
| } |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(ServerCnx.class); |
| |
| /** |
| * Helper method for testability |
| * |
| * @return |
| */ |
| public State getState() { |
| return state; |
| } |
| |
| public BrokerService getBrokerService() { |
| return service; |
| } |
| |
| public String getRole() { |
| return authRole; |
| } |
| |
| boolean hasConsumer(long consumerId) { |
| return consumers.containsKey(consumerId); |
| } |
| |
| public boolean isBatchMessageCompatibleVersion() { |
| return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber(); |
| } |
| |
| public String getClientVersion() { |
| return clientVersion; |
| } |
| } |