blob: 841bdd77db3f0b13456a50246e19645a80fef5d4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.log4j2.appender;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.Message;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
public class PulsarAppenderClientConfTest extends AbstractPulsarAppenderTest {
public PulsarAppenderClientConfTest() {
super("PulsarAppenderClientConfTest.xml");
}
@Test
public void testAppendWithClientConf() throws Exception {
final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithClientConf");
final LogEvent logEvent = createLogEvent();
appender.append(logEvent);
Message<byte[]> item;
synchronized (history) {
assertEquals(1, history.size());
item = history.get(0);
}
assertNotNull(item);
String msgKey = item.getKey();
assertEquals(msgKey, "key");
assertEquals(new String(item.getData(), StandardCharsets.UTF_8), LOG_MESSAGE);
// verify authPlugin & authParams
ArgumentCaptor<Authentication> authenticationCaptor = ArgumentCaptor.forClass(Authentication.class);
verify(clientBuilder).authentication(authenticationCaptor.capture());
Authentication authentication = authenticationCaptor.getValue();
assertEquals(authentication.getAuthMethodName(), "token");
AuthenticationDataProvider authData = authentication.getAuthData();
assertTrue(authData.hasDataForHttp());
Map<String, String> headers =
authData.getHttpHeaders().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
assertEquals(headers.size(), 2);
assertEquals(headers.get("X-Pulsar-Auth-Method-Name"), "token");
assertEquals(headers.get("Authorization"), "Bearer TOKEN");
// verify tlsAllowInsecureConnection
verify(clientBuilder).allowTlsInsecureConnection(true);
// verify tlsEnableHostnameVerification
verify(clientBuilder).enableTlsHostnameVerification(true);
// verify tlsTrustStorePassword
ArgumentCaptor<String> tlsTrustStorePasswordCaptor = ArgumentCaptor.forClass(String.class);
verify(clientBuilder).tlsTrustStorePassword(tlsTrustStorePasswordCaptor.capture());
assertEquals(tlsTrustStorePasswordCaptor.getValue(), "_tlsTrustStorePassword_");
// verify tlsTrustStorePath
ArgumentCaptor<String> tlsTrustStorePathCaptor = ArgumentCaptor.forClass(String.class);
verify(clientBuilder).tlsTrustStorePath(tlsTrustStorePathCaptor.capture());
assertEquals(tlsTrustStorePathCaptor.getValue(), "_tlsTrustStorePath_");
// verify tlsTrustCertsFilePath
ArgumentCaptor<String> tlsTrustCertsFilePathCaptor = ArgumentCaptor.forClass(String.class);
verify(clientBuilder).tlsTrustCertsFilePath(tlsTrustCertsFilePathCaptor.capture());
assertEquals(tlsTrustCertsFilePathCaptor.getValue(), "_tlsTrustCertsFilePath_");
// verify tlsTrustStoreType
ArgumentCaptor<String> tlsTrustStoreTypeCaptor = ArgumentCaptor.forClass(String.class);
verify(clientBuilder).tlsTrustStoreType(tlsTrustStoreTypeCaptor.capture());
assertEquals(tlsTrustStoreTypeCaptor.getValue(), "_tlsTrustStoreType_");
// verify useKeyStoreTls
verify(clientBuilder).useKeyStoreTls(true);
// verify loadConf
ArgumentCaptor<Map<String, Object>> loadConfCaptor = ArgumentCaptor.forClass(Map.class);
verify(clientBuilder).loadConf(loadConfCaptor.capture());
Map<String, Object> conf = loadConfCaptor.getValue();
assertEquals(conf.size(), 3);
assertEquals(conf.get("numIoThreads"), "8");
assertEquals(conf.get("connectionsPerBroker"), "5");
assertEquals(conf.get("enableTransaction"), "true");
// verify producer builder loadConf
ArgumentCaptor<Map<String, Object>> producerLoadConfCaptor = ArgumentCaptor.forClass(Map.class);
verify(producerBuilder).loadConf(producerLoadConfCaptor.capture());
Map<String, Object> producerConf = producerLoadConfCaptor.getValue();
assertEquals(producerConf.size(), 2);
assertEquals(producerConf.get("maxPendingMessages"), "20000");
assertEquals(producerConf.get("blockIfQueueFull"), "true");
}
}