blob: 4b4c8dbb98300c373516b8007bad7300892db8c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.Servlet;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithPulsarService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.servlet.ServletHolder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
public class BrokerAdditionalServletTest extends MockedPulsarServiceBaseTest {
private final String BASE_PATH = "/additional/servlet";
private final String WITH_PULSAR_SERVICE_BASE_PATH = "/additional/servlet/with/pulsar/service";
private final String QUERY_PARAM = "param";
@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
}
@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
}
@Override
protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
mockAdditionalServlet(pulsar);
}
private void mockAdditionalServlet(PulsarService pulsar) {
Servlet servlet = new OrdinaryServlet();
AdditionalServlet brokerAdditionalServlet = Mockito.mock(AdditionalServlet.class);
Mockito.when(brokerAdditionalServlet.getBasePath()).thenReturn(BASE_PATH);
Mockito.when(brokerAdditionalServlet.getServletHolder()).thenReturn(new ServletHolder(servlet));
AdditionalServletWithPulsarService brokerAdditionalServletWithPulsarService =
new AdditionalServletWithPulsarService() {
private PulsarService pulsarService;
@Override
public void setPulsarService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}
@Override
public void loadConfig(PulsarConfiguration pulsarConfiguration) {
// No-op
}
@Override
public String getBasePath() {
return WITH_PULSAR_SERVICE_BASE_PATH;
}
@Override
public ServletHolder getServletHolder() {
return new ServletHolder(new WithPulsarServiceServlet(pulsarService));
}
@Override
public void close() {
// No-op
}
};
AdditionalServlets brokerAdditionalServlets = Mockito.mock(AdditionalServlets.class);
Map<String, AdditionalServletWithClassLoader> map = new HashMap<>();
map.put("broker-additional-servlet", new AdditionalServletWithClassLoader(brokerAdditionalServlet, null));
map.put("broker-additional-servlet-with-pulsar-service", new AdditionalServletWithClassLoader(brokerAdditionalServletWithPulsarService, null));
Mockito.when(brokerAdditionalServlets.getServlets()).thenReturn(map);
Mockito.when(pulsar.getBrokerAdditionalServlets()).thenReturn(brokerAdditionalServlets);
}
@Test
public void test() throws IOException {
int httpPort = pulsar.getWebService().getListenPortHTTP().get();
log.info("pulsar webService httpPort {}", httpPort);
String paramValue = "value - " + RandomUtils.nextInt();
String response = httpGet("http://localhost:" + httpPort + BASE_PATH + "?" + QUERY_PARAM + "=" + paramValue);
Assert.assertEquals(response, paramValue);
String WithPulsarServiceParamValue = PulsarService.class.getName();
String withPulsarServiceResponse = httpGet("http://localhost:" + httpPort + WITH_PULSAR_SERVICE_BASE_PATH);
Assert.assertEquals(WithPulsarServiceParamValue, withPulsarServiceResponse);
}
private class OrdinaryServlet implements Servlet {
@Override
public void init(ServletConfig servletConfig) throws ServletException {
log.info("[init]");
}
@Override
public ServletConfig getServletConfig() {
log.info("[getServletConfig]");
return null;
}
@Override
public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException,
IOException {
log.info("[service] path: {}", ((Request) servletRequest).getOriginalURI());
String value = servletRequest.getParameterMap().get(QUERY_PARAM)[0];
ServletOutputStream servletOutputStream = servletResponse.getOutputStream();
servletResponse.setContentLength(value.getBytes().length);
servletOutputStream.write(value.getBytes());
servletOutputStream.flush();
}
@Override
public String getServletInfo() {
log.info("[getServletInfo]");
return null;
}
@Override
public void destroy() {
log.info("[destroy]");
}
}
private class WithPulsarServiceServlet extends OrdinaryServlet {
private final PulsarService pulsarService;
public WithPulsarServiceServlet(PulsarService pulsar) {
this.pulsarService = pulsar;
}
@Override
public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException,
IOException {
log.info("[service] path: {}", ((Request) servletRequest).getOriginalURI());
String value = pulsarService == null ? "null" : PulsarService.class.getName();
ServletOutputStream servletOutputStream = servletResponse.getOutputStream();
servletResponse.setContentLength(value.getBytes().length);
servletOutputStream.write(value.getBytes());
servletOutputStream.flush();
}
}
String httpGet(String url) throws IOException {
OkHttpClient client = new OkHttpClient();
okhttp3.Request request = new okhttp3.Request.Builder()
.get()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
}