blob: f0e45aa734afb5761cc517272017ae728499efeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.FileWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.Configuration;
import lombok.Cleanup;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
public static File kdcDir;
public static File kerberosWorkDir;
public static File brokerSecretKeyFile;
public static File proxySecretKeyFile;
private static MiniKdc kdc;
private static Properties properties;
private static String localHostname = "localhost";
@BeforeClass
public static void startMiniKdc() throws Exception {
kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
kerberosWorkDir = Files.createTempDirectory("test-kerberos-work-dir").toFile();
properties = MiniKdc.createConf();
kdc = new MiniKdc(properties, kdcDir);
kdc.start();
String principalBrokerNoRealm = "broker/" + localHostname;
String principalBroker = "broker/" + localHostname + "@" + kdc.getRealm();
log.info("principalBroker: " + principalBroker);
String principalClientNoRealm = "client/" + localHostname;
String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
log.info("principalClient: " + principalClient);
String principalProxyNoRealm = "proxy/" + localHostname;
String principalProxy = principalProxyNoRealm + "@" + kdc.getRealm();
log.info("principalProxy: " + principalProxy);
File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
kdc.createPrincipal(keytabClient, principalClientNoRealm);
File keytabBroker = new File(kerberosWorkDir, "pulsarbroker.keytab");
kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);
File keytabProxy = new File(kerberosWorkDir, "pulsarproxy.keytab");
kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
File jaasFile = new File(kerberosWorkDir, "jaas.conf");
try (FileWriter writer = new FileWriter(jaasFile)) {
writer.write("\n"
+ "PulsarBroker {\n"
+ " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ " useKeyTab=true\n"
+ " keyTab=\"" + keytabBroker.getAbsolutePath() + "\n"
+ " storeKey=true\n"
+ " useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
+ " principal=\"" + principalBroker + "\";\n"
+ "};\n"
+ "\n"
+ "\n"
+ "\n"
+ "PulsarProxy{\n"
+ " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ " useKeyTab=true\n"
+ " keyTab=\"" + keytabProxy.getAbsolutePath() + "\n"
+ " storeKey=true\n"
+ " useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
+ " principal=\"" + principalProxy + "\";\n"
+ "};\n"
+ "\n"
+ "\n"
+ "\n"
+ "PulsarClient {\n"
+ " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ " useKeyTab=true\n"
+ " keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
+ " storeKey=true\n"
+ " useTicketCache=false\n"
+ " principal=\"" + principalClient + "\";\n"
+ "};\n"
);
}
File krb5file = new File(kerberosWorkDir, "krb5.conf");
try (FileWriter writer = new FileWriter(krb5file)) {
String conf = "[libdefaults]\n"
+ " default_realm = " + kdc.getRealm() + "\n"
+ " udp_preference_limit = 1\n" // force use TCP
+ "\n"
+ "\n"
+ "[realms]\n"
+ " " + kdc.getRealm() + " = {\n"
+ " kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+ " }";
writer.write(conf);
log.info("krb5.conf:\n" + conf);
}
System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
Configuration.getConfiguration().refresh();
// Client config
log.info("created AuthenticationSasl");
}
@AfterClass(alwaysRun = true)
public static void stopMiniKdc() {
System.clearProperty("java.security.auth.login.config");
System.clearProperty("java.security.krb5.conf");
if (kdc != null) {
kdc.stop();
}
FileUtils.deleteQuietly(kdcDir);
FileUtils.deleteQuietly(kerberosWorkDir);
Assert.assertFalse(kdcDir.exists());
Assert.assertFalse(kerberosWorkDir.exists());
}
@BeforeMethod
@Override
protected void setup() throws Exception {
log.info("-- {} --, start at host: {}", methodName, localHostname);
isTcpLookup = true;
conf.setAdvertisedAddress(localHostname);
conf.setAuthenticationEnabled(true);
conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
conf.setSaslJaasServerSectionName("PulsarBroker");
brokerSecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret", ".key");
Files.write(Paths.get(brokerSecretKeyFile.toString()), "PulsarSecret".getBytes());
conf.setSaslJaasServerRoleTokenSignerSecretPath(brokerSecretKeyFile.toString());
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));
// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
super.init();
lookupUrl = new URI(pulsar.getBrokerServiceUrl());
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
.build();
super.producerBaseSetup();
log.info("-- {} --, end.", methodName);
}
@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
FileUtils.deleteQuietly(brokerSecretKeyFile);
Assert.assertFalse(brokerSecretKeyFile.exists());
FileUtils.deleteQuietly(proxySecretKeyFile);
Assert.assertFalse(proxySecretKeyFile.exists());
super.internalCleanup();
}
@Test
void testAuthentication() throws Exception {
log.info("-- Starting {} test --", methodName);
// Step 1: Create Admin Client
// create a client which connects to proxy and pass authData
String topicName = "persistent://my-property/my-ns/my-topic1";
ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
// proxy connect to broker
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
"\"serverType\": " + "\"broker\"}");
proxySecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret", ".key");
Files.write(Paths.get(proxySecretKeyFile.toString()), "PulsarSecret".getBytes());
proxyConfig.setSaslJaasServerRoleTokenSignerSecretPath(proxySecretKeyFile.toString());
// proxy as a server, it will use sasl to authn
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
proxyConfig.setAuthenticationProviders(providers);
proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);
proxyService.start();
final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get();
log.info("1 proxy service started {}", proxyService);
// Step 3: Pass correct client params
@Cleanup
PulsarClient proxyClient = createProxyClient(proxyServiceUrl, 1);
log.info("2 create proxy client {}, {}", proxyServiceUrl, proxyClient);
Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
log.info("3 created producer.");
Consumer<byte[]> consumer = proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
log.info("4 created consumer.");
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
log.info("Produced message: [{}]", message);
}
Message<byte[]> msg = null;
Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
proxyService.close();
}
private PulsarClient createProxyClient(String proxyServiceUrl, int numberOfConnections) throws PulsarClientException {
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "proxy");
log.info("set client jaas section name: PulsarClient, serverType: proxy");
Authentication authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);
return PulsarClient.builder().serviceUrl(proxyServiceUrl)
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
}
}