Change ``ContextClassLoader`` to ``NarClassLoader`` in BrokerInterceptor (#13589)

### Motivation

It's ``BrokerInterceptor`` side change, like #13501 

### Modifications

Change context class loader through Thread.currentThread().setContextClassLoader(classLoader) before every interceptor handler callback, and change it back to original class loader afterwards.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index 6725f67..0f8c182 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -28,6 +28,7 @@
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.ClassLoaderSwitcher;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Producer;
@@ -56,76 +57,102 @@
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata) {
-        this.interceptor.beforeSendMessage(
-            subscription, entry, ackSet, msgMetadata);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.beforeSendMessage(
+                    subscription, entry, ackSet, msgMetadata);
+        }
     }
 
     @Override
     public void producerCreated(ServerCnx cnx, Producer producer,
                                 Map<String, String> metadata){
-        this.interceptor.producerCreated(cnx, producer, metadata);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.producerCreated(cnx, producer, metadata);
+        }
     }
 
     @Override
     public void consumerCreated(ServerCnx cnx,
                                 Consumer consumer,
                                 Map<String, String> metadata) {
-        this.interceptor.consumerCreated(
-                cnx, consumer, metadata);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.consumerCreated(
+                    cnx, consumer, metadata);
+        }
     }
 
     @Override
     public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
                                 long entryId, Topic.PublishContext publishContext) {
-        this.interceptor.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
+        }
     }
 
     @Override
     public  void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
                                    long entryId, ByteBuf headersAndPayload) {
-        this.interceptor.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
+        }
     }
 
     @Override
     public void messageAcked(ServerCnx cnx, Consumer consumer,
                              CommandAck ackCmd) {
-        this.interceptor.messageAcked(cnx, consumer, ackCmd);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.messageAcked(cnx, consumer, ackCmd);
+        }
     }
 
     @Override
     public void onConnectionCreated(ServerCnx cnx) {
-        this.interceptor.onConnectionCreated(cnx);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onConnectionCreated(cnx);
+        }
     }
 
     @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
-        this.interceptor.onPulsarCommand(command, cnx);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onPulsarCommand(command, cnx);
+        }
     }
 
     @Override
     public void onConnectionClosed(ServerCnx cnx) {
-        this.interceptor.onConnectionClosed(cnx);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onConnectionClosed(cnx);
+        }
     }
 
     @Override
     public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
-        this.interceptor.onWebserviceRequest(request);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onWebserviceRequest(request);
+        }
     }
 
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse response)
             throws IOException, ServletException {
-        this.interceptor.onWebserviceResponse(request, response);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onWebserviceResponse(request, response);
+        }
     }
 
     @Override
     public void initialize(PulsarService pulsarService) throws Exception {
-        this.interceptor.initialize(pulsarService);
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.initialize(pulsarService);
+        }
     }
 
     @Override
     public void close() {
-        interceptor.close();
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            interceptor.close();
+        }
         try {
             classLoader.close();
         } catch (IOException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
index ae66937..668323b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
@@ -18,14 +18,30 @@
  */
 package org.apache.pulsar.broker.intercept;
 
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.testng.annotations.Test;
-
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.testng.annotations.Test;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import java.util.Map;
 
 /**
  * Unit test {@link BrokerInterceptorWithClassLoader}.
@@ -44,4 +60,127 @@
         verify(h, times(1)).initialize(same(pulsarService));
     }
 
+
+    @Test
+    public void testClassLoaderSwitcher() throws Exception {
+        NarClassLoader narLoader = mock(NarClassLoader.class);
+        BrokerInterceptor interceptor = new BrokerInterceptor() {
+            @Override
+            public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void onConnectionCreated(ServerCnx cnx) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String, String> metadata) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs,
+                                        long ledgerId, long entryId, Topic.PublishContext publishContext) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
+                                          long entryId, ByteBuf headersAndPayload) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void onConnectionClosed(ServerCnx cnx) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void onWebserviceRequest(ServletRequest request) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void initialize(PulsarService pulsarService) throws Exception {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+            @Override
+            public void close() {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
+        };
+
+        BrokerInterceptorWithClassLoader brokerInterceptorWithClassLoader =
+                new BrokerInterceptorWithClassLoader(interceptor, narLoader);
+        ClassLoader curClassLoader = Thread.currentThread().getContextClassLoader();
+        // test class loader
+        assertEquals(brokerInterceptorWithClassLoader.getClassLoader(), narLoader);
+        // test initialize
+        brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test onFilter
+        brokerInterceptorWithClassLoader.onFilter(mock(ServletRequest.class)
+                , mock(ServletResponse.class), mock(FilterChain.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test onWebserviceResponse
+        brokerInterceptorWithClassLoader.onWebserviceResponse(mock(ServletRequest.class)
+                , mock(ServletResponse.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test onWebserviceRequest
+        brokerInterceptorWithClassLoader.onWebserviceRequest(mock(ServletRequest.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test onConnectionClosed
+        brokerInterceptorWithClassLoader.onConnectionClosed(mock(ServerCnx.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test onPulsarCommand
+        brokerInterceptorWithClassLoader.onPulsarCommand(null, mock(ServerCnx.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test messageAcked
+        brokerInterceptorWithClassLoader
+                .messageAcked(mock(ServerCnx.class), mock(Consumer.class), null);
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test messageDispatched
+        brokerInterceptorWithClassLoader
+                .messageDispatched(mock(ServerCnx.class), mock(Consumer.class), 1, 1, null);
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test messageProduced
+        brokerInterceptorWithClassLoader
+                .messageProduced(mock(ServerCnx.class), mock(Producer.class), 1, 1, 1, null);
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test consumerCreated
+        brokerInterceptorWithClassLoader
+                .consumerCreated(mock(ServerCnx.class), mock(Consumer.class), Maps.newHashMap());
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test producerCreated
+        brokerInterceptorWithClassLoader
+                .producerCreated(mock(ServerCnx.class), mock(Producer.class), Maps.newHashMap());
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test onConnectionCreated
+        brokerInterceptorWithClassLoader
+                .onConnectionCreated(mock(ServerCnx.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test beforeSendMessage
+        brokerInterceptorWithClassLoader
+                .beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null);
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+        // test close
+        brokerInterceptorWithClassLoader.close();
+        assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
+
+    }
 }