blob: a754c7f635928e757b595a11782bc6d0dfcc9657 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServletWithClassLoader;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServlets;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServlet;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.servlet.ServletHolder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.servlet.Servlet;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
@Slf4j
public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest {
private final String BASE_PATH = "/metrics/broker";
private final String QUERY_PARAM = "param";
private final String DUMMY_VALUE = "DUMMY_VALUE";
private ProxyService proxyService;
private WebServer proxyWebServer;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
// enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.of(2));
// this is for nar package test
// addServletNar();
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
Optional<Integer> proxyLogLevel = Optional.of(2);
assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
proxyService.start();
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
mockAdditionalServlet();
proxyWebServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null);
proxyWebServer.start();
}
// this is for nar package test
private void addServletNar() {
Properties properties = new Properties();
properties.setProperty("basePath", "/metrics-prometheus/broker");
properties.setProperty("mappedPath", "/federate");
properties.setProperty("query", "match[]={job=\"prometheus\"}");
proxyConfig.setProperties(properties);
// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("proxy-additional-servlet-plugin-1.0-SNAPSHOT.nar");
Path handlerPath;
try {
handlerPath = Paths.get(testHandlerUrl.toURI());
} catch (Exception e) {
log.error("failed to get handler Path, handlerUrl: {}. Exception: ", testHandlerUrl, e);
return;
}
String servletDirectory = handlerPath.toFile().getParent();
proxyConfig.setProxyAdditionalServletDirectory(servletDirectory);
proxyConfig.setProxyAdditionalServlets(Sets.newHashSet("prometheus-proxy-servlet"));
}
private void mockAdditionalServlet() {
Servlet servlet = new Servlet() {
@Override
public void init(ServletConfig servletConfig) throws ServletException {
log.info("[init]");
}
@Override
public ServletConfig getServletConfig() {
log.info("[getServletConfig]");
return null;
}
@Override
public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException, IOException {
log.info("[service] path: {}", ((Request) servletRequest).getOriginalURI());
String value = servletRequest.getParameterMap().get(QUERY_PARAM)[0];
ServletOutputStream servletOutputStream = servletResponse.getOutputStream();
servletResponse.setContentLength(value.getBytes().length);
servletOutputStream.write(value.getBytes());
servletOutputStream.flush();
}
@Override
public String getServletInfo() {
log.info("[getServletInfo]");
return null;
}
@Override
public void destroy() {
log.info("[destroy]");
}
};
ProxyAdditionalServlet proxyAdditionalServlet = Mockito.mock(ProxyAdditionalServlet.class);
Mockito.when(proxyAdditionalServlet.getBasePath()).thenReturn(BASE_PATH);
Mockito.when(proxyAdditionalServlet.getServletHolder()).thenReturn(new ServletHolder(servlet));
ProxyAdditionalServlets proxyAdditionalServlets = Mockito.mock(ProxyAdditionalServlets.class);
Map<String, ProxyAdditionalServletWithClassLoader> map = new HashMap<>();
map.put("prometheus-proxy-servlet", new ProxyAdditionalServletWithClassLoader(proxyAdditionalServlet, null));
Mockito.when(proxyAdditionalServlets.getServlets()).thenReturn(map);
Mockito.when(proxyService.getProxyAdditionalServlets()).thenReturn(proxyAdditionalServlets);
}
@Override
@AfterClass
protected void cleanup() throws Exception {
internalCleanup();
proxyService.close();
}
@Test
public void test() throws IOException {
int httpPort = proxyWebServer.getListenPortHTTP().get();
log.info("proxy service httpPort {}", httpPort);
String paramValue = "value - " + RandomUtils.nextInt();
String response = httpGet("http://localhost:" + httpPort + BASE_PATH + "?" + QUERY_PARAM + "=" + paramValue);
Assert.assertEquals(response, paramValue);
}
String httpGet(String url) throws IOException {
OkHttpClient client = new OkHttpClient();
okhttp3.Request request = new okhttp3.Request.Builder()
.get()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
}