blob: c894b7d77c43a26a6028851dd76e1f78683831e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.common.util.PortManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
@Slf4j
@Test(groups = "broker")
public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
public static final class MyProtocolHandler implements ProtocolHandler {
private ServiceConfiguration conf;
private final List<Integer> ports = new ArrayList<>();
@Override
public String protocolName() {
return "test";
}
@Override
public boolean accept(String protocol) {
return "test".equals(protocol);
}
@Override
public void initialize(ServiceConfiguration conf) throws Exception {
this.conf = conf;
}
@Override
public String getProtocolDataToAdvertise() {
return "test";
}
@Override
public void start(BrokerService service) {
}
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
int port = nextLockedFreePort();
this.ports.add(port);
return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port),
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
final ByteBuf resp = ctx.alloc().buffer();
resp.writeBytes("ok".getBytes(StandardCharsets.UTF_8));
final ChannelFuture f = ctx.writeAndFlush(resp);
f.addListener((ChannelFutureListener) future -> ctx.close());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("error", cause);
ctx.close();
}
});
}
});
}
@Override
public void close() {
ports.removeIf(PortManager::releaseLockedPort);
}
}
private File tempDirectory;
private boolean useSeparateThreadPool;
public SimpleProtocolHandlerTestsBase(boolean useSeparateThreadPool) {
this.useSeparateThreadPool = useSeparateThreadPool;
}
@BeforeClass
@Override
protected void setup() throws Exception {
tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
conf.setMessagingProtocols(Collections.singleton("test"));
buildMockNarFile(tempDirectory);
super.baseSetup();
}
@Test
public void testBootstrapProtocolHandler() throws Exception {
SocketAddress address =
pulsar.getProtocolHandlers()
.getEndpoints()
.entrySet()
.stream()
.filter(e -> e.getValue().equals("test"))
.map(Map.Entry::getKey)
.findAny()
.get();
try (Socket socket = new Socket();) {
socket.connect(address);
String res = IOUtils.toString(socket.getInputStream(), StandardCharsets.UTF_8);
assertEquals(res, "ok");
}
}
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
}
}
private static void buildMockNarFile(File tempDirectory) throws Exception {
File file = new File(tempDirectory, "temp.nar");
try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {
zipfile.putNextEntry(new ZipEntry("META-INF/"));
zipfile.putNextEntry(new ZipEntry("META-INF/services/"));
zipfile.putNextEntry(new ZipEntry("META-INF/bundled-dependencies/"));
ZipEntry manifest = new ZipEntry("META-INF/services/"
+ ProtocolHandlerUtils.PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE);
zipfile.putNextEntry(manifest);
String yaml = "name: test\n" +
"description: this is a test\n" +
"handlerClass: " + MyProtocolHandler.class.getName() + "\n";
zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
zipfile.closeEntry();
}
}
}