blob: 9c327a0ea6e788da809d46c71d179c789621be93 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.server.Response;
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
private final AtomicInteger beforeSendCount = new AtomicInteger();
private final AtomicInteger beforeSendCountAtConsumerLevel = new AtomicInteger();
private final AtomicInteger count = new AtomicInteger();
private final AtomicInteger connectionCreationCount = new AtomicInteger();
private final AtomicInteger producerCount = new AtomicInteger();
private final AtomicInteger consumerCount = new AtomicInteger();
private final AtomicInteger messagePublishCount = new AtomicInteger();
private final AtomicInteger messageCount = new AtomicInteger();
private final AtomicInteger messageDispatchCount = new AtomicInteger();
private final AtomicInteger messageAckCount = new AtomicInteger();
private final AtomicInteger handleAckCount = new AtomicInteger();
private final AtomicInteger txnCount = new AtomicInteger();
private final AtomicInteger committedTxnCount = new AtomicInteger();
private final AtomicInteger abortedTxnCount = new AtomicInteger();
public void reset() {
beforeSendCount.set(0);
count.set(0);
connectionCreationCount.set(0);
producerCount.set(0);
consumerCount.set(0);
messageCount.set(0);
messageDispatchCount.set(0);
messageAckCount.set(0);
handleAckCount.set(0);
txnCount.set(0);
committedTxnCount.set(0);
abortedTxnCount.set(0);
}
private final List<ResponseEvent> responseList = new CopyOnWriteArrayList<>();
@Data
@AllArgsConstructor
public class ResponseEvent {
private String requestUri;
private int responseStatus;
}
@Override
public void onConnectionCreated(ServerCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("Connection created {}", cnx);
}
connectionCreationCount.incrementAndGet();
}
@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata) {
if (log.isDebugEnabled()) {
log.debug("Producer created with name={}, id={}",
producer.getProducerName(), producer.getProducerId());
}
producerCount.incrementAndGet();
}
@Override
public void producerClosed(ServerCnx cnx, Producer producer,
Map<String, String> metadata) {
if (log.isDebugEnabled()) {
log.debug("Producer with name={}, id={} closed",
producer.getProducerName(), producer.getProducerId());
}
producerCount.decrementAndGet();
}
@Override
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
if (log.isDebugEnabled()) {
log.debug("Consumer created with name={}, id={}",
consumer.consumerName(), consumer.consumerId());
}
consumerCount.incrementAndGet();
}
@Override
public void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
if (log.isDebugEnabled()) {
log.debug("Consumer with name={}, id={} closed",
consumer.consumerName(), consumer.consumerId());
}
consumerCount.decrementAndGet();
}
@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
if (log.isDebugEnabled()) {
log.debug("Message broker received topic={}, producer={}",
producer.getTopic().getName(), producer.getProducerName());
}
messagePublishCount.incrementAndGet();
}
@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId,
Topic.PublishContext publishContext) {
if (log.isDebugEnabled()) {
log.debug("Message published topic={}, producer={}",
producer.getTopic().getName(), producer.getProducerName());
}
messageCount.incrementAndGet();
}
@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
long entryId, ByteBuf headersAndPayload) {
if (log.isDebugEnabled()) {
log.debug("Message dispatched topic={}, consumer={}",
consumer.getSubscription().getTopic().getName(), consumer.consumerName());
}
messageDispatchCount.incrementAndGet();
}
@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ack) {
messageAckCount.incrementAndGet();
}
@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
if (log.isDebugEnabled()) {
log.debug("Send message to topic {}, subscription {}",
subscription.getTopic(), subscription.getName());
}
beforeSendCount.incrementAndGet();
}
@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
if (log.isDebugEnabled()) {
log.debug("Send message to topic {}, subscription {}, consumer {}",
subscription.getTopic(), subscription.getName(), consumer.consumerName());
}
beforeSendCountAtConsumerLevel.incrementAndGet();
}
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Pulsar command", count, command.getType().name());
}
if (command.getType().equals(BaseCommand.Type.ACK)) {
handleAckCount.incrementAndGet();
}
count.incrementAndGet();
}
@Override
public void onConnectionClosed(ServerCnx cnx) {
// np-op
}
@Override
public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
count.incrementAndGet();
String url = ((HttpServletRequest) request).getRequestURL().toString();
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice request", count, url);
}
if (url.contains("/admin/v2/tenants/test-interceptor-failed-tenant")) {
throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create tenant failed");
}
if (url.contains("/admin/v2/namespaces/public/test-interceptor-failed-namespace")) {
throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create namespace failed");
}
if (url.contains("/admin/v2/persistent/public/default/test-interceptor-failed-topic")) {
throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create topic failed");
}
}
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
count.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice response {}",
count, ((HttpServletRequest) request).getRequestURL().toString(), response);
}
if (response instanceof Response) {
Response res = (Response) response;
responseList.add(new ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), res.getStatus()));
}
}
@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
count.set(100);
chain.doFilter(request, response);
}
@Override
public void txnOpened(long tcId, String txnID) {
txnCount.incrementAndGet();
}
@Override
public void txnEnded(String txnID, long txnAction) {
if(txnAction == TxnAction.COMMIT_VALUE) {
committedTxnCount.incrementAndGet();
} else {
abortedTxnCount.incrementAndGet();
}
}
@Override
public void initialize(PulsarService pulsarService) throws Exception {
}
@Override
public void close() {
}
public int getHandleAckCount() {
return handleAckCount.get();
}
public int getCount() {
return count.get();
}
public int getProducerCount() {
return producerCount.get();
}
public int getConsumerCount() {
return consumerCount.get();
}
public int getMessagePublishCount() {
return messagePublishCount.get();
}
public int getMessageProducedCount() {
return messageCount.get();
}
public int getMessageDispatchCount() {
return messageDispatchCount.get();
}
public int getMessageAckCount() {
return messageAckCount.get();
}
public int getBeforeSendCount() {
return beforeSendCount.get();
}
public int getBeforeSendCountAtConsumerLevel() {
return beforeSendCountAtConsumerLevel.get();
}
public int getConnectionCreationCount() {
return connectionCreationCount.get();
}
public void clearResponseList() {
responseList.clear();
}
public List<ResponseEvent> getResponseList() {
return responseList;
}
public int getTxnCount() {
return txnCount.get();
}
public int getCommittedTxnCount() {
return committedTxnCount.get();
}
public int getAbortedTxnCount() {
return abortedTxnCount.get();
}
}