blob: ae611d7e4eb98b33be5fe9e06cec9a2af04fc772 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import com.google.common.io.CharStreams;
import com.google.common.io.Closeables;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import lombok.Cleanup;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.MockedBookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.MockZooKeeper;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
/**
* Tests for the {@code WebService} class. Note that this test only covers the newly added ApiVersionFilter related
* tests for now as this test class was added quite a bit after the class was written.
*/
@Test(groups = "broker")
public class WebServiceTest {
private PulsarService pulsar;
private String BROKER_LOOKUP_URL;
private String BROKER_LOOKUP_URL_TLS;
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";
/**
* Test that the {@WebService} class properly passes the allowUnversionedClients value. We do this by setting
* allowUnversionedClients to true, then making a request with no version, which should go through.
*
*/
@Test
public void testDefaultClientVersion() throws Exception {
setupEnv(true, "1.0", true, false, false, false, -1, false);
try {
// Make an HTTP request to lookup a namespace. The request should
// succeed
makeHttpRequest(false, false);
} catch (Exception e) {
Assert.fail("HTTP request to lookup a namespace shouldn't fail ", e);
}
}
/**
* Test that if enableTls option is enabled, WebServcie is available both on HTTP and HTTPS.
*
* @throws Exception
*/
@Test
public void testTlsEnabled() throws Exception {
setupEnv(false, "1.0", false, true, false, false, -1, false);
// Make requests both HTTP and HTTPS. The requests should succeed
try {
makeHttpRequest(false, false);
} catch (Exception e) {
Assert.fail("HTTP request shouldn't fail ", e);
}
try {
makeHttpRequest(true, false);
} catch (Exception e) {
Assert.fail("HTTPS request shouldn't fail ", e);
}
}
/**
* Test that if enableTls option is disabled, WebServcie is available only on HTTP.
*
* @throws Exception
*/
@Test
public void testTlsDisabled() throws Exception {
setupEnv(false, "1.0", false, false, false, false, -1, false);
// Make requests both HTTP and HTTPS. Only the HTTP request should succeed
try {
makeHttpRequest(false, false);
} catch (Exception e) {
Assert.fail("HTTP request shouldn't fail ", e);
}
try {
makeHttpRequest(true, false);
Assert.fail("HTTPS request should fail ");
} catch (Exception e) {
// Expected
}
}
/**
* Test that if enableAuth option and allowInsecure option are enabled, WebServcie requires trusted/untrusted client
* certificate.
*
* @throws Exception
*/
@Test
public void testTlsAuthAllowInsecure() throws Exception {
setupEnv(false, "1.0", false, true, true, true, -1, false);
// Only the request with client certificate should succeed
try {
makeHttpRequest(true, false);
Assert.fail("Request without client certficate should fail");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("HTTP response code: 401"));
}
try {
makeHttpRequest(true, true);
} catch (Exception e) {
Assert.fail("Request with client certificate shouldn't fail", e);
}
}
/**
* Test that if enableAuth option is enabled, WebServcie requires trusted client certificate.
*
* @throws Exception
*/
@Test
public void testTlsAuthDisallowInsecure() throws Exception {
setupEnv(false, "1.0", false, true, true, false, -1, false);
// Only the request with trusted client certificate should succeed
try {
makeHttpRequest(true, false);
Assert.fail("Request without client certficate should fail");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("HTTP response code: 401"));
}
try {
makeHttpRequest(true, true);
} catch (Exception e) {
Assert.fail("Request with client certificate shouldn't fail", e);
}
}
@Test
public void testRateLimiting() throws Exception {
setupEnv(false, "1.0", false, false, false, false, 10.0, false);
// Make requests without exceeding the max rate
for (int i = 0; i < 5; i++) {
makeHttpRequest(false, false);
Thread.sleep(200);
}
try {
for (int i = 0; i < 500; i++) {
makeHttpRequest(false, false);
}
fail("Some request should have failed");
} catch (IOException e) {
assertTrue(e.getMessage().contains("429"));
}
}
@Test
public void testSplitPath() {
String result = PulsarWebResource.splitPath("prop/cluster/ns/topic1", 4);
Assert.assertEquals(result, "topic1");
}
@Test
public void testDisableHttpTraceAndTrackMethods() throws Exception {
setupEnv(true, "1.0", true, false, false, false, -1, true);
String url = pulsar.getWebServiceAddress() + "/admin/v2/tenants/my-tenant" + System.currentTimeMillis();
@Cleanup
AsyncHttpClient client = new DefaultAsyncHttpClient();
BoundRequestBuilder builder = client.prepare("TRACE", url);
Response res = builder.execute().get();
// This should have failed
assertEquals(res.getStatusCode(), 405);
builder = client.prepare("TRACK", url);
res = builder.execute().get();
// This should have failed
assertEquals(res.getStatusCode(), 405);
}
@Test
public void testMaxRequestSize() throws Exception {
setupEnv(true, "1.0", true, false, false, false, -1, false);
String url = pulsar.getWebServiceAddress() + "/admin/v2/tenants/my-tenant" + System.currentTimeMillis();
@Cleanup
AsyncHttpClient client = new DefaultAsyncHttpClient();
BoundRequestBuilder builder = client.preparePut(url)
.setHeader("Accept", "application/json")
.setHeader("Content-Type", "application/json");
// HTTP server is configured to reject everything > 10K
TenantInfo info1 = TenantInfo.builder()
.adminRoles(Collections.singleton(StringUtils.repeat("*", 20 * 1024)))
.build();
builder.setBody(ObjectMapperFactory.getThreadLocal().writeValueAsBytes(info1));
Response res = builder.execute().get();
// This should have failed
assertEquals(res.getStatusCode(), 400);
// Create local cluster
String localCluster = "test";
pulsar.getPulsarResources().getClusterResources().createCluster(localCluster, ClusterDataImpl.builder().build());
TenantInfo info2 = TenantInfo.builder()
.adminRoles(Collections.singleton(StringUtils.repeat("*", 1 * 1024)))
.allowedClusters(Sets.newHashSet(localCluster))
.build();
builder.setBody(ObjectMapperFactory.getThreadLocal().writeValueAsBytes(info2));
Response res2 = builder.execute().get();
assertEquals(res2.getStatusCode(), 204);
// Simple GET without content size should go through
Response res3 = client.prepareGet(url)
.setHeader("Accept", "application/json")
.setHeader("Content-Type", "application/json")
.execute()
.get();
assertEquals(res3.getStatusCode(), 200);
}
@Test
public void testBrokerReady() throws Exception {
setupEnv(true, "1.0", true, false, false, false, -1, false);
String url = pulsar.getWebServiceAddress() + "/admin/v2/brokers/ready";
@Cleanup
AsyncHttpClient client = new DefaultAsyncHttpClient();
Response res = client.prepareGet(url).execute().get();
assertEquals(res.getStatusCode(), 200);
assertEquals(res.getResponseBody(), "ok");
}
private String makeHttpRequest(boolean useTls, boolean useAuth) throws Exception {
InputStream response = null;
try {
if (useTls) {
KeyManager[] keyManagers = null;
if (useAuth) {
Certificate[] tlsCert = SecurityUtility.loadCertificatesFromPemFile(TLS_CLIENT_CERT_FILE_PATH);
PrivateKey tlsKey = SecurityUtility.loadPrivateKeyFromPemFile(TLS_CLIENT_KEY_FILE_PATH);
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setKeyEntry("private", tlsKey, "".toCharArray(), tlsCert);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, "".toCharArray());
keyManagers = kmf.getKeyManagers();
}
TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
SSLContext sslCtx = SSLContext.getInstance("TLS");
sslCtx.init(keyManagers, trustManagers, new SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sslCtx.getSocketFactory());
response = new URL(BROKER_LOOKUP_URL_TLS).openStream();
} else {
response = new URL(BROKER_LOOKUP_URL).openStream();
}
String resp = CharStreams.toString(new InputStreamReader(response));
log.info("Response: {}", resp);
return resp;
} finally {
Closeables.close(response, false);
}
}
private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowUnversionedClients,
boolean enableTls, boolean enableAuth, boolean allowInsecure, double rateLimit,
boolean disableTrace) throws Exception {
if (pulsar != null) {
throw new Exception("broker already started");
}
Set<String> providers = new HashSet<>();
providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
Set<String> roles = new HashSet<>();
roles.add("client");
ServiceConfiguration config = new ServiceConfiguration();
config.setAdvertisedAddress("localhost");
config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
if (enableTls) {
config.setWebServicePortTls(Optional.of(0));
}
config.setClientLibraryVersionCheckEnabled(enableFilter);
config.setAuthenticationEnabled(enableAuth);
config.setAuthenticationProviders(providers);
config.setAuthorizationEnabled(false);
config.setSuperUserRoles(roles);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(allowInsecure);
config.setTlsTrustCertsFilePath(allowInsecure ? "" : TLS_CLIENT_CERT_FILE_PATH);
config.setClusterName("local");
config.setAdvertisedAddress("localhost"); // TLS certificate expects localhost
config.setZookeeperServers("localhost:2181");
config.setHttpMaxRequestSize(10 * 1024);
config.setDisableHttpDebugMethods(disableTrace);
if (rateLimit > 0) {
config.setHttpRequestsLimitEnabled(true);
config.setHttpRequestsMaxPerSecond(rateLimit);
}
pulsar = spyWithClassAndConstructorArgs(PulsarService.class, config);
// mock zk
MockZooKeeper mockZooKeeper = MockedPulsarServiceBaseTest.createMockZooKeeper();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
pulsar.start();
try {
pulsar.getLocalMetadataStore().delete("/minApiVersion", Optional.empty()).join();
} catch (Exception ex) {
}
pulsar.getLocalMetadataStore().put("/minApiVersion", minApiVersion.getBytes(), Optional.of(-1L)).join();
String BROKER_URL_BASE = "http://localhost:" + pulsar.getListenPortHTTP().get();
String BROKER_URL_BASE_TLS = "https://localhost:" + pulsar.getListenPortHTTPS().orElse(-1);
String serviceUrl = BROKER_URL_BASE;
PulsarAdminBuilder adminBuilder = PulsarAdmin.builder();
if (enableTls && enableAuth) {
serviceUrl = BROKER_URL_BASE_TLS;
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
adminBuilder.authentication(AuthenticationTls.class.getName(), authParams).allowTlsInsecureConnection(true);
}
BROKER_LOOKUP_URL = BROKER_URL_BASE
+ "/lookup/v2/destination/persistent/my-property/local/my-namespace/my-topic";
BROKER_LOOKUP_URL_TLS = BROKER_URL_BASE_TLS
+ "/lookup/v2/destination/persistent/my-property/local/my-namespace/my-topic";
PulsarAdmin pulsarAdmin = adminBuilder.serviceHttpUrl(serviceUrl).build();
try {
pulsarAdmin.clusters().createCluster(config.getClusterName(),
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
} catch (ConflictException ce) {
// This is OK.
} finally {
pulsarAdmin.close();
}
}
@AfterMethod(alwaysRun = true)
void teardown() {
if (pulsar != null) {
try {
pulsar.close();
pulsar = null;
} catch (Exception e) {
Assert.fail("Got exception while closing the pulsar instance ", e);
}
}
}
private static final Logger log = LoggerFactory.getLogger(WebServiceTest.class);
}