Pulsar Log4j2 Appender: allow configuration of the client (#17)

Fixes #15, #16

### Motivation

When Pulsar security is enabled, the current implementation is unusable. This PR also covers the feature request #16 to allow configuring most client and producer configuration properties.
The issue #15 makes the Pulsar Log4j2 Appender unusable in many use cases. This PR fixes the issue by removing the hard coded producer name.
The current build for Pulsar Log4j2 Appender is broken. This PR fixes the build by properly configuring Log4j2 plugin annotation processor for the maven build. There were broken tests which were unnecessary and testing Log4j2 internals instead of the appender. Those tests have been removed

### Modifications

- add support for configuring Pulsar client and producer configuration properties (#16)
- fix maven build (Log4j2 plugin annotation processor)
- remove broken tests which were unnecessary
- remove the hard coded producer name which prevented using the appender in many use cases (#15)
diff --git a/pulsar-log4j2-appender/pom.xml b/pulsar-log4j2-appender/pom.xml
index 7058d72..6d8f1a8 100644
--- a/pulsar-log4j2-appender/pom.xml
+++ b/pulsar-log4j2-appender/pom.xml
@@ -102,6 +102,7 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+
   <build>
     <plugins>
       <plugin>
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
index 45aeca9..7a965b1 100644
--- a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -21,7 +21,6 @@
 import java.io.Serializable;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.logging.log4j.core.AbstractLifeCycle;
 import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.Filter;
@@ -186,7 +185,7 @@
         try {
             manager.startup();
         } catch (Exception e) {
-            // fail to start the manager
+            LOGGER.error("Failed to start pulsar manager", e);
         }
     }
 
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
index 9051171..e74b0f3 100644
--- a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pulsar.log4j2.appender;
 
+import java.util.Arrays;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-
+import java.util.stream.Collectors;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.appender.AbstractManager;
 import org.apache.logging.log4j.core.config.Property;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -34,6 +38,7 @@
 
 public class PulsarManager extends AbstractManager {
 
+    public static final String PRODUCER_PROPERTY_PREFIX = "producer.";
     static Supplier<ClientBuilder> PULSAR_CLIENT_BUILDER = PulsarClient::builder;
 
     private PulsarClient client;
@@ -41,6 +46,7 @@
 
     private final String serviceUrl;
     private final String topic;
+    private final Property[] properties;
     private final String key;
     private final boolean syncSend;
 
@@ -55,6 +61,7 @@
         this.serviceUrl = Objects.requireNonNull(serviceUrl, "serviceUrl");
         this.topic = Objects.requireNonNull(topic, "topic");
         this.syncSend = syncSend;
+        this.properties = properties;
         this.key = key;
     }
 
@@ -66,17 +73,17 @@
             } catch (Exception e) {
                 // exceptions on closing
                 LOGGER.warn("Failed to close producer within {} milliseconds",
-                    timeUnit.toMillis(timeout), e);
+                        timeUnit.toMillis(timeout), e);
             }
         }
         return true;
     }
 
-    public void send(final byte[] msg)  {
+    public void send(final byte[] msg) {
         if (producer != null) {
             String newKey = null;
 
-            if(key != null && key.contains("${")) {
+            if (key != null && key.contains("${")) {
                 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key);
             } else if (key != null) {
                 newKey = key;
@@ -98,37 +105,99 @@
                 }
             } else {
                 messageBuilder.sendAsync()
-                    .exceptionally(cause -> {
-                        LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", cause);
-                        return null;
-                    });
+                        .exceptionally(cause -> {
+                            LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", cause);
+                            return null;
+                        });
             }
         }
     }
 
     public void startup() throws Exception {
-        try {
-            client = PULSAR_CLIENT_BUILDER.get()
-                .serviceUrl(serviceUrl)
-                .build();
-            ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+        createClient();
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer()
                 .topic(topic)
-                .producerName("pulsar-log4j2-appender-" + topic)
                 .blockIfQueueFull(false);
-            if (syncSend) {
-                // disable batching for sync send
-                producerBuilder = producerBuilder.enableBatching(false);
-            } else {
-                // enable batching in 10 ms for async send
-                producerBuilder = producerBuilder
+        if (syncSend) {
+            // disable batching for sync send
+            producerBuilder = producerBuilder.enableBatching(false);
+        } else {
+            // enable batching in 10 ms for async send
+            producerBuilder = producerBuilder
                     .enableBatching(true)
                     .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
-            }
-            producer = producerBuilder.create();
-        } catch (Exception t) {
-            LOGGER.error("Failed to start pulsar manager", t);
-            throw t;
         }
+        Map<String, Object> producerConfiguration = propertiesToProducerConfiguration();
+        if (!producerConfiguration.isEmpty()) {
+            producerBuilder.loadConf(producerConfiguration);
+        }
+        producer = producerBuilder.create();
+    }
+
+    private void createClient() throws PulsarClientException {
+        Map<String, Object> configuration = propertiesToClientConfiguration();
+
+        // must be the same as
+        // https://pulsar.apache.org/docs/en/security-tls-keystore/#configuring-clients
+        String authPluginClassName = getAndRemoveString("authPlugin", "", configuration);
+        String authParamsString = getAndRemoveString("authParams", "", configuration);
+        Authentication authentication =
+                AuthenticationFactory.create(authPluginClassName, authParamsString);
+        boolean tlsAllowInsecureConnection =
+                Boolean.parseBoolean(
+                        getAndRemoveString("tlsAllowInsecureConnection", "false", configuration));
+
+        boolean tlsEnableHostnameVerification =
+                Boolean.parseBoolean(
+                        getAndRemoveString("tlsEnableHostnameVerification", "false", configuration));
+        final String tlsTrustCertsFilePath =
+                getAndRemoveString("tlsTrustCertsFilePath", "", configuration);
+
+        boolean useKeyStoreTls =
+                Boolean.parseBoolean(getAndRemoveString("useKeyStoreTls", "false", configuration));
+        String tlsTrustStoreType = getAndRemoveString("tlsTrustStoreType", "JKS", configuration);
+        String tlsTrustStorePath = getAndRemoveString("tlsTrustStorePath", "", configuration);
+        String tlsTrustStorePassword =
+                getAndRemoveString("tlsTrustStorePassword", "", configuration);
+
+        ClientBuilder clientBuilder =
+                PULSAR_CLIENT_BUILDER.get()
+                        .loadConf(configuration)
+                        .tlsTrustStorePassword(tlsTrustStorePassword)
+                        .tlsTrustStorePath(tlsTrustStorePath)
+                        .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                        .tlsTrustStoreType(tlsTrustStoreType)
+                        .useKeyStoreTls(useKeyStoreTls)
+                        .enableTlsHostnameVerification(tlsEnableHostnameVerification)
+                        .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                        .authentication(authentication);
+        if (!serviceUrl.isEmpty()) {
+            clientBuilder.serviceUrl(serviceUrl);
+        }
+        client = clientBuilder.build();
+    }
+
+    private Map<String, Object> propertiesToClientConfiguration() {
+        return propertiesToConfiguration(false);
+    }
+
+    private Map<String, Object> propertiesToProducerConfiguration() {
+        return propertiesToConfiguration(true);
+    }
+
+    private Map<String, Object> propertiesToConfiguration(boolean producerProperties) {
+        return Arrays.stream(properties).filter(property -> property.getValue() != null
+                && property.getName().startsWith(PRODUCER_PROPERTY_PREFIX) == producerProperties)
+                .collect(Collectors.toMap(
+                        property -> producerProperties ?
+                                property.getName().substring(PRODUCER_PROPERTY_PREFIX.length()) : property.getName(),
+                        Property::getValue));
+    }
+
+    private static String getAndRemoveString(
+            String name, String defaultValue, Map<String, Object> properties) {
+        Object value = properties.remove(name);
+        return value != null ? value.toString() : defaultValue;
     }
 
     public String getServiceUrl() {
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/AbstractPulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/AbstractPulsarAppenderTest.java
new file mode 100644
index 0000000..9feaf7b
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/AbstractPulsarAppenderTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.testng.annotations.BeforeMethod;
+
+abstract class AbstractPulsarAppenderTest {
+    static final String LOG_MESSAGE = "Hello, world!";
+    private final String resourceName;
+    protected ProducerBuilder<byte[]> producerBuilder;
+    protected ClientBuilderImpl clientBuilder;
+    protected PulsarClient client;
+    protected Producer<byte[]> producer;
+    protected List<Message<byte[]>> history;
+
+    protected LoggerContext ctx;
+
+    protected AbstractPulsarAppenderTest(String resourceName) {
+        this.resourceName = resourceName;
+    }
+
+    class MockedMessageBuilder extends TypedMessageBuilderImpl<byte[]> {
+
+        MockedMessageBuilder() {
+            super(null, Schema.BYTES);
+        }
+
+        @Override
+        public MessageId send() {
+            synchronized (history) {
+                history.add(getMessage());
+            }
+
+            return mock(MessageId.class);
+        }
+
+        @Override
+        public CompletableFuture<MessageId> sendAsync() {
+            synchronized (history) {
+                history.add(getMessage());
+            }
+
+            return CompletableFuture.completedFuture(mock(MessageId.class));
+        }
+    }
+
+    static Log4jLogEvent createLogEvent() {
+        return Log4jLogEvent.newBuilder()
+                .setLoggerName(AbstractPulsarAppenderTest.class.getName())
+                .setLoggerFqcn(AbstractPulsarAppenderTest.class.getName())
+                .setLevel(Level.INFO)
+                .setMessage(new SimpleMessage(LOG_MESSAGE))
+                .build();
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        history = new LinkedList<>();
+
+        client = mock(PulsarClient.class);
+        producer = mock(Producer.class);
+        clientBuilder = mock(ClientBuilderImpl.class);
+
+        doReturn(client).when(clientBuilder).build();
+        doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+        doReturn(clientBuilder).when(clientBuilder).loadConf(any());
+        doReturn(clientBuilder).when(clientBuilder).authentication(any());
+        doReturn(clientBuilder).when(clientBuilder).tlsTrustStorePassword(anyString());
+        doReturn(clientBuilder).when(clientBuilder).tlsTrustStorePath(anyString());
+        doReturn(clientBuilder).when(clientBuilder).tlsTrustCertsFilePath(anyString());
+        doReturn(clientBuilder).when(clientBuilder).tlsTrustStoreType(anyString());
+        doReturn(clientBuilder).when(clientBuilder).useKeyStoreTls(anyBoolean());
+        doReturn(clientBuilder).when(clientBuilder).enableTlsHostnameVerification(anyBoolean());
+        doReturn(clientBuilder).when(clientBuilder).allowTlsInsecureConnection(anyBoolean());
+
+        producerBuilder = mock(ProducerBuilder.class);
+        when(client.newProducer()).thenReturn(producerBuilder);
+        doReturn(producerBuilder).when(producerBuilder).topic(anyString());
+        doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
+        doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
+        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyLong(), any(TimeUnit.class));
+        doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
+        doReturn(producerBuilder).when(producerBuilder).loadConf(any());
+        doReturn(producer).when(producerBuilder).create();
+        doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
+
+        when(producer.newMessage()).then(invocation -> new MockedMessageBuilder());
+        when(producer.send(any(byte[].class)))
+                .thenAnswer(invocationOnMock -> {
+                    Message<byte[]> msg = invocationOnMock.getArgument(0);
+                    synchronized (history) {
+                        history.add(msg);
+                    }
+                    return null;
+                });
+
+        when(producer.sendAsync(any(byte[].class)))
+                .thenAnswer(invocationOnMock -> {
+                    Message<byte[]> msg = invocationOnMock.getArgument(0);
+                    synchronized (history) {
+                        history.add(msg);
+                    }
+                    CompletableFuture<MessageId> future = new CompletableFuture<>();
+                    future.complete(mock(MessageId.class));
+                    return future;
+                });
+
+        PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
+
+        ctx = Configurator.initialize(
+                "PulsarAppenderTest",
+                getClass().getClassLoader(),
+                getClass().getClassLoader().getResource(resourceName).toURI());
+    }
+
+    protected LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException {
+        final ByteArrayInputStream bis = new ByteArrayInputStream(data);
+        try (ObjectInput ois = new ObjectInputStream(bis)) {
+            return (LogEvent) ois.readObject();
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderClientConfTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderClientConfTest.java
new file mode 100644
index 0000000..0cf25b3
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderClientConfTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Message;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+public class PulsarAppenderClientConfTest extends AbstractPulsarAppenderTest {
+    public PulsarAppenderClientConfTest() {
+        super("PulsarAppenderClientConfTest.xml");
+    }
+
+    @Test
+    public void testAppendWithClientConf() throws Exception {
+        final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithClientConf");
+        final LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        String msgKey = item.getKey();
+        assertEquals(msgKey, "key");
+        assertEquals(new String(item.getData(), StandardCharsets.UTF_8), LOG_MESSAGE);
+
+        // verify authPlugin & authParams
+        ArgumentCaptor<Authentication> authenticationCaptor = ArgumentCaptor.forClass(Authentication.class);
+        verify(clientBuilder).authentication(authenticationCaptor.capture());
+        Authentication authentication = authenticationCaptor.getValue();
+        assertEquals(authentication.getAuthMethodName(), "token");
+        AuthenticationDataProvider authData = authentication.getAuthData();
+        assertTrue(authData.hasDataForHttp());
+        Map<String, String> headers =
+                authData.getHttpHeaders().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        assertEquals(headers.size(), 1);
+        assertEquals(headers.get("Authorization"), "Bearer TOKEN");
+
+        // verify tlsAllowInsecureConnection
+        verify(clientBuilder).allowTlsInsecureConnection(true);
+
+        // verify tlsEnableHostnameVerification
+        verify(clientBuilder).enableTlsHostnameVerification(true);
+
+        // verify tlsTrustStorePassword
+        ArgumentCaptor<String> tlsTrustStorePasswordCaptor = ArgumentCaptor.forClass(String.class);
+        verify(clientBuilder).tlsTrustStorePassword(tlsTrustStorePasswordCaptor.capture());
+        assertEquals(tlsTrustStorePasswordCaptor.getValue(), "_tlsTrustStorePassword_");
+
+        // verify tlsTrustStorePath
+        ArgumentCaptor<String> tlsTrustStorePathCaptor = ArgumentCaptor.forClass(String.class);
+        verify(clientBuilder).tlsTrustStorePath(tlsTrustStorePathCaptor.capture());
+        assertEquals(tlsTrustStorePathCaptor.getValue(), "_tlsTrustStorePath_");
+
+        // verify tlsTrustCertsFilePath
+        ArgumentCaptor<String> tlsTrustCertsFilePathCaptor = ArgumentCaptor.forClass(String.class);
+        verify(clientBuilder).tlsTrustCertsFilePath(tlsTrustCertsFilePathCaptor.capture());
+        assertEquals(tlsTrustCertsFilePathCaptor.getValue(), "_tlsTrustCertsFilePath_");
+
+        // verify tlsTrustStoreType
+        ArgumentCaptor<String> tlsTrustStoreTypeCaptor = ArgumentCaptor.forClass(String.class);
+        verify(clientBuilder).tlsTrustStoreType(tlsTrustStoreTypeCaptor.capture());
+        assertEquals(tlsTrustStoreTypeCaptor.getValue(), "_tlsTrustStoreType_");
+
+        // verify useKeyStoreTls
+        verify(clientBuilder).useKeyStoreTls(true);
+
+        // verify loadConf
+        ArgumentCaptor<Map<String, Object>> loadConfCaptor = ArgumentCaptor.forClass(Map.class);
+        verify(clientBuilder).loadConf(loadConfCaptor.capture());
+        Map<String, Object> conf = loadConfCaptor.getValue();
+        assertEquals(conf.size(), 3);
+        assertEquals(conf.get("numIoThreads"), "8");
+        assertEquals(conf.get("connectionsPerBroker"), "5");
+        assertEquals(conf.get("enableTransaction"), "true");
+
+        // verify producer builder loadConf
+        ArgumentCaptor<Map<String, Object>> producerLoadConfCaptor = ArgumentCaptor.forClass(Map.class);
+        verify(producerBuilder).loadConf(producerLoadConfCaptor.capture());
+        Map<String, Object> producerConf = producerLoadConfCaptor.getValue();
+        assertEquals(producerConf.size(), 2);
+        assertEquals(producerConf.get("maxPendingMessages"), "20000");
+        assertEquals(producerConf.get("blockIfQueueFull"), "true");
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
index 73cbadf..ca7df68 100644
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -24,6 +24,7 @@
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
 import static org.testng.AssertJUnit.assertEquals;
@@ -48,6 +49,7 @@
 import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
 import org.apache.logging.log4j.message.SimpleMessage;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -56,101 +58,13 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.mockito.ArgumentCaptor;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-public class PulsarAppenderTest {
-
-    private static final String LOG_MESSAGE = "Hello, world!";
-
-    private static Log4jLogEvent createLogEvent() {
-        return Log4jLogEvent.newBuilder()
-            .setLoggerName(PulsarAppenderTest.class.getName())
-            .setLoggerFqcn(PulsarAppenderTest.class.getName())
-            .setLevel(Level.INFO)
-            .setMessage(new SimpleMessage(LOG_MESSAGE))
-            .build();
-    }
-
-    private ClientBuilderImpl clientBuilder;
-    private PulsarClient client;
-    private Producer<byte[]> producer;
-    private List<Message<byte[]>> history;
-
-    private LoggerContext ctx;
-
-    private class MockedMessageBuilder extends TypedMessageBuilderImpl<byte[]> {
-
-        MockedMessageBuilder() {
-            super(null, Schema.BYTES);
-        }
-
-        @Override
-        public MessageId send() {
-            synchronized (history) {
-                history.add(getMessage());
-            }
-
-            return mock(MessageId.class);
-        }
-
-        @Override
-        public CompletableFuture<MessageId> sendAsync() {
-            synchronized (history) {
-                history.add(getMessage());
-            }
-
-            return CompletableFuture.completedFuture(mock(MessageId.class));
-        }
-    }
-
-    @BeforeMethod
-    public void setUp() throws Exception {
-        history = new LinkedList<>();
-
-        client = mock(PulsarClient.class);
-        producer = mock(Producer.class);
-        clientBuilder = mock(ClientBuilderImpl.class);
-
-        doReturn(client).when(clientBuilder).build();
-        doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
-
-        ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class);
-        when(client.newProducer()).thenReturn(producerBuilder);
-        doReturn(producerBuilder).when(producerBuilder).topic(anyString());
-        doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
-        doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
-        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyLong(), any(TimeUnit.class));
-        doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
-        doReturn(producer).when(producerBuilder).create();
-
-        when(producer.newMessage()).then(invocation -> new MockedMessageBuilder());
-        when(producer.send(any(byte[].class)))
-            .thenAnswer(invocationOnMock -> {
-                Message<byte[]> msg = invocationOnMock.getArgument(0);
-                synchronized (history) {
-                    history.add(msg);
-                }
-                return null;
-            });
-
-        when(producer.sendAsync(any(byte[].class)))
-            .thenAnswer(invocationOnMock -> {
-                Message<byte[]> msg = invocationOnMock.getArgument(0);
-                synchronized (history) {
-                    history.add(msg);
-                }
-                CompletableFuture<MessageId> future = new CompletableFuture<>();
-                future.complete(mock(MessageId.class));
-                return future;
-            });
-
-        PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
-
-        ctx = Configurator.initialize(
-            "PulsarAppenderTest",
-            getClass().getClassLoader(),
-            getClass().getClassLoader().getResource("PulsarAppenderTest.xml").toURI());
+public class PulsarAppenderTest extends AbstractPulsarAppenderTest {
+    public PulsarAppenderTest() {
+        super("PulsarAppenderTest.xml");
     }
 
     @Test
@@ -229,12 +143,4 @@
         assertEquals(item.getKey(), keyValue);
         assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8));
     }
-
-    private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException {
-        final ByteArrayInputStream bis = new ByteArrayInputStream(data);
-        try (ObjectInput ois = new ObjectInputStream(bis)) {
-            return (LogEvent) ois.readObject();
-        }
-    }
-
 }
\ No newline at end of file
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
deleted file mode 100644
index c4c09f0..0000000
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.log4j2.appender.builder;
-
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.core.Appender;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.LifeCycle;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.ConfigurationFactory;
-import org.apache.logging.log4j.core.config.Configurator;
-import org.apache.logging.log4j.core.config.CustomLevelConfig;
-import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
-import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
-import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
-import org.apache.logging.log4j.core.filter.ThresholdFilter;
-import org.apache.logging.log4j.core.layout.GelfLayout;
-import org.apache.logging.log4j.core.util.Constants;
-import org.apache.pulsar.log4j2.appender.PulsarAppender;
-import org.testng.annotations.Test;
-
-public class ConfigurationAssemblerTest {
-
-    @Test
-    public void testBuildConfiguration() {
-        try {
-            System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
-                    "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
-            final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory
-                    .newConfigurationBuilder();
-            CustomConfigurationFactory.addTestFixtures("config name", builder);
-            final Configuration configuration = builder.build();
-            try (LoggerContext ctx = Configurator.initialize(configuration)) {
-                validate(configuration);
-            }
-        } finally {
-            System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
-        }
-    }
-
-    @Test
-    public void testCustomConfigurationFactory() {
-        try {
-            System.setProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY,
-                    "org.apache.pulsar.log4j2.appender.builder.CustomConfigurationFactory");
-            System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
-                    "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
-            final Configuration config = ((LoggerContext) LogManager.getContext(false)).getConfiguration();
-            validate(config);
-        } finally {
-            System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
-            System.getProperties().remove(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY);
-        }
-    }
-
-    private void validate(final Configuration config) {
-        assertNotNull(config);
-        assertNotNull(config.getName());
-        assertFalse(config.getName().isEmpty());
-        assertNotNull(config, "No configuration created");
-        assertEquals("Incorrect State: " + config.getState(), config.getState(), LifeCycle.State.STARTED);
-        final Map<String, Appender> appenders = config.getAppenders();
-        assertNotNull(appenders);
-        assertEquals("Incorrect number of Appenders: " + appenders.size(), appenders.size(), 2);
-        final PulsarAppender pulsarAppender = (PulsarAppender) appenders.get("Pulsar");
-        final GelfLayout gelfLayout = (GelfLayout) pulsarAppender.getLayout();
-        final Map<String, LoggerConfig> loggers = config.getLoggers();
-        assertNotNull(loggers);
-        assertEquals("Incorrect number of LoggerConfigs: " + loggers.size(), loggers.size(), 2);
-        final LoggerConfig rootLoggerConfig = loggers.get("");
-        assertEquals(Level.ERROR, rootLoggerConfig.getLevel());
-        assertFalse(rootLoggerConfig.isIncludeLocation());
-        final LoggerConfig loggerConfig = loggers.get("org.apache.logging.log4j");
-        assertEquals(Level.DEBUG, loggerConfig.getLevel());
-        assertTrue(loggerConfig.isIncludeLocation());
-        final Filter filter = config.getFilter();
-        assertNotNull(filter, "No Filter");
-        assertTrue("Not a Threshold Filter", filter instanceof ThresholdFilter);
-        final List<CustomLevelConfig> customLevels = config.getCustomLevels();
-        assertNotNull(filter, "No CustomLevels");
-        assertEquals(1, customLevels.size());
-        final CustomLevelConfig customLevel = customLevels.get(0);
-        assertEquals("Panic", customLevel.getLevelName());
-        assertEquals(17, customLevel.getIntLevel());
-        final Logger logger = LogManager.getLogger(getClass());
-        logger.info("Welcome to Log4j!");
-    }
-}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
deleted file mode 100644
index 0d99471..0000000
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.log4j2.appender.builder;
-
-import static org.testng.Assert.assertEquals;
-
-import java.util.concurrent.TimeUnit;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.appender.ConsoleAppender;
-import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
-import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
-import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
-import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
-import org.testng.annotations.Test;
-
-public class ConfigurationBuilderTest {
-
-    private static final String INDENT = "  ";
-    private static final String EOL = System.lineSeparator();
-
-    private void addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) {
-        builder.setConfigurationName(name);
-        builder.setStatusLevel(Level.ERROR);
-        builder.setShutdownTimeout(5000, TimeUnit.MILLISECONDS);
-        builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
-        builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL)
-                .addAttribute("level", Level.DEBUG));
-
-        final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
-        appenderBuilder.add(builder.newLayout("PatternLayout").
-                addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
-        appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY,
-                Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
-        builder.add(appenderBuilder);
-
-        final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar")
-            .addAttribute("serviceUrl", "pulsar://localhost:6650")
-            .addAttribute("topic", "my-topic");
-        appenderBuilder2.add(builder.newLayout("GelfLayout").
-            addAttribute("host", "my-host").
-            addComponent(builder.newKeyValuePair("extraField", "extraValue")));
-        builder.add(appenderBuilder2);
-
-        builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true).
-                    add(builder.newAppenderRef("Stdout")).
-                    addAttribute("additivity", false));
-        builder.add(builder.newLogger("org.apache.logging.log4j.core", Level.DEBUG).
-                    add(builder.newAppenderRef("Stdout")));
-        builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
-
-        builder.addProperty("MyKey", "MyValue");
-        builder.add(builder.newCustomLevel("Panic", 17));
-        builder.setPackages("foo,bar");
-    }
-
-    private final static String expectedXml =
-            "<?xml version=\"1.0\" ?>" + EOL +
-            "<Configuration name=\"config name\" status=\"ERROR\" packages=\"foo,bar\" shutdownTimeout=\"5000\">" + EOL +
-                INDENT + "<Properties>" + EOL +
-                INDENT + INDENT + "<Property name=\"MyKey\">MyValue</Property>" + EOL +
-                INDENT + "</Properties>" + EOL +
-                INDENT + "<Scripts>" + EOL +
-                INDENT + INDENT + "<ScriptFile name=\"target/test-classes/scripts/filter.groovy\" path=\"target/test-classes/scripts/filter.groovy\" isWatched=\"true\"/>" + EOL +
-                INDENT + "</Scripts>" + EOL +
-                INDENT + "<CustomLevels>" + EOL +
-                INDENT + INDENT + "<CustomLevel name=\"Panic\" intLevel=\"17\"/>" + EOL +
-                INDENT + "</CustomLevels>" + EOL +
-                INDENT + "<ThresholdFilter onMatch=\"ACCEPT\" onMismatch=\"NEUTRAL\" level=\"DEBUG\"/>" + EOL +
-                INDENT + "<Appenders>" + EOL +
-                INDENT + INDENT + "<CONSOLE name=\"Stdout\" target=\"SYSTEM_OUT\">" + EOL +
-                INDENT + INDENT + INDENT + "<PatternLayout pattern=\"%d [%t] %-5level: %msg%n%throwable\"/>" + EOL +
-                INDENT + INDENT + INDENT + "<MarkerFilter onMatch=\"DENY\" onMismatch=\"NEUTRAL\" marker=\"FLOW\"/>" + EOL +
-                INDENT + INDENT + "</CONSOLE>" + EOL +
-                INDENT + INDENT + "<Pulsar name=\"Pulsar\" serviceUrl=\"pulsar://localhost:6650\" topic=\"my-topic\">" + EOL +
-                INDENT + INDENT + INDENT + "<GelfLayout host=\"my-host\">" + EOL +
-                INDENT + INDENT + INDENT + INDENT + "<KeyValuePair key=\"extraField\" value=\"extraValue\"/>" + EOL +
-                INDENT + INDENT + INDENT + "</GelfLayout>" + EOL +
-                INDENT + INDENT + "</Pulsar>" + EOL +
-                INDENT + "</Appenders>" + EOL +
-                INDENT + "<Loggers>" + EOL +
-                INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j\" level=\"DEBUG\" includeLocation=\"true\" additivity=\"false\">" + EOL +
-                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
-                INDENT + INDENT + "</Logger>" + EOL +
-                INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j.core\" level=\"DEBUG\">" + EOL +
-                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
-                INDENT + INDENT + "</Logger>" + EOL +
-                INDENT + INDENT + "<Root level=\"ERROR\">" + EOL +
-                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
-                INDENT + INDENT + "</Root>" + EOL +
-                INDENT + "</Loggers>" + EOL +
-            "</Configuration>" + EOL;
-
-    // TODO make test run properly on Windows
-    @Test
-    public void testXmlConstructing() {
-        //assumeTrue(System.lineSeparator().length() == 1); // Only run test on platforms with single character line endings (such as Linux), not on Windows
-        final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory.newConfigurationBuilder();
-        addTestFixtures("config name", builder);
-        final String xmlConfiguration = builder.toXmlConfiguration();
-        assertEquals(expectedXml, xmlConfiguration);
-    }
-
-}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
deleted file mode 100644
index 15052a6..0000000
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.log4j2.appender.builder;
-
-import java.net.URI;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.appender.ConsoleAppender;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.ConfigurationFactory;
-import org.apache.logging.log4j.core.config.ConfigurationSource;
-import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
-import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
-import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
-
-/**
- * Normally this would be a plugin. However, we don't want it used for everything so it will be defined
- * via a system property.
- */
-public class CustomConfigurationFactory extends ConfigurationFactory {
-
-    static Configuration addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) {
-        builder.setConfigurationName(name);
-        builder.setStatusLevel(Level.ERROR);
-        builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
-        builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL)
-                .addAttribute("level", Level.DEBUG));
-
-        final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
-        appenderBuilder.add(builder.newLayout("PatternLayout").
-                addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
-        appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY,
-                Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
-        builder.add(appenderBuilder);
-
-        final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar")
-            .addAttribute("serviceUrl", "pulsar://localhost:6650")
-            .addAttribute("topic", "my-topic");
-        appenderBuilder2.add(builder.newLayout("GelfLayout").
-            addAttribute("host", "my-host").
-            addComponent(builder.newKeyValuePair("extraField", "extraValue")));
-        builder.add(appenderBuilder2);
-
-        builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true).
-                    add(builder.newAppenderRef("Stdout")).
-                    addAttribute("additivity", false));
-        builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
-
-        builder.add(builder.newCustomLevel("Panic", 17));
-
-        return builder.build();
-    }
-
-    @Override
-    public Configuration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) {
-        return getConfiguration(loggerContext, source.toString(), null);
-    }
-
-    @Override
-    public Configuration getConfiguration(final LoggerContext loggerContext, final String name, final URI configLocation) {
-        final ConfigurationBuilder<BuiltConfiguration> builder = newConfigurationBuilder();
-        return addTestFixtures(name, builder);
-    }
-
-    @Override
-    protected String[] getSupportedTypes() {
-        return new String[] {"*"};
-    }
-}
diff --git a/pulsar-log4j2-appender/src/test/resources/PulsarAppenderClientConfTest.xml b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderClientConfTest.xml
new file mode 100644
index 0000000..848cc3e
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderClientConfTest.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration name="PulsarAppenderTest" status="info" packages="org.apache.pulsar.log4j2.appender">
+  <Appenders>
+    <Pulsar name="PulsarAppenderWithClientConf" serviceUrl="pulsar+ssl://localhost:6651" topic="persistent://t/c/n/pulsar-topic" key="key" avoidRecursive="false">
+      <Property name="authPlugin">org.apache.pulsar.client.impl.auth.AuthenticationToken</Property>
+      <Property name="authParams">token:TOKEN</Property>
+      <Property name="tlsAllowInsecureConnection">true</Property>
+      <Property name="tlsEnableHostnameVerification">true</Property>
+      <Property name="useKeyStoreTls">true</Property>
+      <Property name="tlsTrustStorePassword">_tlsTrustStorePassword_</Property>
+      <Property name="tlsTrustStorePath">_tlsTrustStorePath_</Property>
+      <Property name="tlsTrustCertsFilePath">_tlsTrustCertsFilePath_</Property>
+      <Property name="tlsTrustStoreType">_tlsTrustStoreType_</Property>
+      <Property name="numIoThreads">8</Property>
+      <Property name="connectionsPerBroker">5</Property>
+      <Property name="enableTransaction">true</Property>
+      <Property name="producer.maxPendingMessages">20000</Property>
+      <Property name="producer.blockIfQueueFull">true</Property>
+      <PatternLayout pattern="%m"/>
+    </Pulsar>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="PulsarAppenderWithClientConf"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
index a5743fb..4367e37 100644
--- a/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
+++ b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
@@ -19,7 +19,7 @@
     under the License.
 
 -->
-<Configuration name="PulsarAppenderTest" status="OFF">
+<Configuration name="PulsarAppenderTest" status="info" packages="org.apache.pulsar.log4j2.appender">
   <Appenders>
     <Pulsar name="PulsarAppenderWithLayout" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
       <PatternLayout pattern="[%m]"/>