blob: b16fac427aa7c42cd27b8a05c937e332de5082c4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandAckHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseConsumerHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseProducerHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandConnectHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandFlowHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandPartitionLookupHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandProducerHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSendHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSubscribeHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandTopicLookupHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandUnsubscribeHook;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
public class MockBrokerService {
private LookupData lookupData;
private class genericResponseHandler extends AbstractHandler {
private final ObjectMapper objectMapper = new ObjectMapper();
private final String lookupURI = "/lookup/v2/destination/persistent";
private final String partitionMetadataURI = "/admin/persistent";
private final PartitionedTopicMetadata singlePartitionedTopicMetadata = new PartitionedTopicMetadata(1);
private final PartitionedTopicMetadata multiPartitionedTopicMetadata = new PartitionedTopicMetadata(4);
private final PartitionedTopicMetadata nonPartitionedTopicMetadata = new PartitionedTopicMetadata();
// regex to find a partitioned topic
private final Pattern singlePartPattern = Pattern.compile(".*/part-.*");
private final Pattern multiPartPattern = Pattern.compile(".*/multi-part-.*");
@Override
public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
String responseString;
log.info("Received HTTP request {}", baseRequest.getRequestURI());
if (baseRequest.getRequestURI().startsWith(lookupURI)) {
response.setContentType("application/json;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
responseString = objectMapper.writeValueAsString(lookupData);
} else if (baseRequest.getRequestURI().startsWith(partitionMetadataURI)) {
response.setContentType("application/json;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
if (singlePartPattern.matcher(baseRequest.getRequestURI()).matches()) {
responseString = objectMapper.writeValueAsString(singlePartitionedTopicMetadata);
} else if (multiPartPattern.matcher(baseRequest.getRequestURI()).matches()) {
responseString = objectMapper.writeValueAsString(multiPartitionedTopicMetadata);
} else {
responseString = objectMapper.writeValueAsString(nonPartitionedTopicMetadata);
}
} else {
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
responseString = "URI NOT DEFINED";
}
baseRequest.setHandled(true);
response.getWriter().println(responseString);
log.info("Sent response: {}", responseString);
}
}
private class MockServerCnx extends PulsarDecoder {
// local state
ChannelHandlerContext ctx;
long producerId = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
protected void messageReceived() {
}
@Override
protected void handleConnect(CommandConnect connect) {
if (handleConnect != null) {
handleConnect.apply(ctx, connect);
return;
}
// default
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata request) {
if (handlePartitionlookup != null) {
handlePartitionlookup.apply(ctx, request);
return;
}
// default
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(0, request.getRequestId()));
}
@Override
protected void handleLookup(CommandLookupTopic lookup) {
if (handleTopiclookup != null) {
handleTopiclookup.apply(ctx, lookup);
return;
}
// default
ctx.writeAndFlush(Commands.newLookupResponse(getBrokerAddress(), null, true,
LookupType.Connect, lookup.getRequestId(), false));
}
@Override
protected void handleSubscribe(CommandSubscribe subscribe) {
if (handleSubscribe != null) {
handleSubscribe.apply(ctx, subscribe);
return;
}
// default
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
}
@Override
protected void handleProducer(CommandProducer producer) {
producerId = producer.getProducerId();
if (handleProducer != null) {
handleProducer.apply(ctx, producer);
return;
}
// default
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
}
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
if (handleSend != null) {
handleSend.apply(ctx, send, headersAndPayload);
return;
}
// default
ctx.writeAndFlush(Commands.newSendReceipt(producerId, send.getSequenceId(), 0, 0, 0));
}
@Override
protected void handleAck(CommandAck ack) {
if (handleAck != null) {
handleAck.apply(ctx, ack);
}
// default: do nothing
}
@Override
protected void handleFlow(CommandFlow flow) {
if (handleFlow != null) {
handleFlow.apply(ctx, flow);
}
// default: do nothing
}
@Override
protected void handleUnsubscribe(CommandUnsubscribe unsubscribe) {
if (handleUnsubscribe != null) {
handleUnsubscribe.apply(ctx, unsubscribe);
return;
}
// default
ctx.writeAndFlush(Commands.newSuccess(unsubscribe.getRequestId()));
}
@Override
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
if (handleCloseProducer != null) {
handleCloseProducer.apply(ctx, closeProducer);
return;
}
// default
ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
}
@Override
protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
if (handleCloseConsumer != null) {
handleCloseConsumer.apply(ctx, closeConsumer);
return;
}
// default
ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("Got exception", cause);
ctx.close();
}
@Override
final protected void handlePing(CommandPing ping) {
// Immediately reply success to ping requests
ctx.writeAndFlush(Commands.newPong());
}
@Override
final protected void handlePong(CommandPong pong) {
}
}
private final Server server;
EventLoopGroup workerGroup;
private Channel listenChannel;
private CommandConnectHook handleConnect = null;
private CommandTopicLookupHook handleTopiclookup = null;
private CommandPartitionLookupHook handlePartitionlookup = null;
private CommandSubscribeHook handleSubscribe = null;
private CommandProducerHook handleProducer = null;
private CommandSendHook handleSend = null;
private CommandAckHook handleAck = null;
private CommandFlowHook handleFlow = null;
private CommandUnsubscribeHook handleUnsubscribe = null;
private CommandCloseProducerHook handleCloseProducer = null;
private CommandCloseConsumerHook handleCloseConsumer = null;
public MockBrokerService() {
server = new Server(0);
server.setHandler(new genericResponseHandler());
}
public void start() {
try {
server.start();
log.info("Started web service on {}", getHttpAddress());
startMockBrokerService();
log.info("Started mock Pulsar service on {}", getBrokerAddress());
lookupData = new LookupData(getBrokerAddress(), null,
getHttpAddress(), null);
} catch (Exception e) {
log.error("Error starting mock service", e);
}
}
public void stop() {
try {
server.stop();
workerGroup.shutdownGracefully();
} catch (Exception e) {
log.error("Error stopping mock service", e);
}
}
public void startMockBrokerService() throws Exception {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("mock-pulsar-%s").build();
final int numThreads = 2;
final int MaxMessageSize = 5 * 1024 * 1024;
try {
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, threadFactory);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(workerGroup, workerGroup);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new MockServerCnx());
}
});
// Bind and start to accept incoming connections.
listenChannel = bootstrap.bind(0).sync().channel();
} catch (Exception e) {
throw e;
}
}
public void setHandleConnect(CommandConnectHook hook) {
handleConnect = hook;
}
public void resetHandleConnect() {
handleConnect = null;
}
public void setHandlePartitionLookup(CommandPartitionLookupHook hook) {
handlePartitionlookup = hook;
}
public void resetHandlePartitionLookup() {
handlePartitionlookup = null;
}
public void setHandleLookup(CommandTopicLookupHook hook) {
handleTopiclookup = hook;
}
public void resetHandleLookup() {
handleTopiclookup = null;
}
public void setHandleSubscribe(CommandSubscribeHook hook) {
handleSubscribe = hook;
}
public void resetHandleSubscribe() {
handleSubscribe = null;
}
public void setHandleProducer(CommandProducerHook hook) {
handleProducer = hook;
}
public void resetHandleProducer() {
handleProducer = null;
}
public void setHandleSend(CommandSendHook hook) {
handleSend = hook;
}
public void resetHandleSend() {
handleSend = null;
}
public void setHandleAck(CommandAckHook hook) {
handleAck = hook;
}
public void resetHandleAck() {
handleAck = null;
}
public void setHandleFlow(CommandFlowHook hook) {
handleFlow = hook;
}
public void resetHandleFlow() {
handleFlow = null;
}
public void setHandleUnsubscribe(CommandUnsubscribeHook hook) {
handleUnsubscribe = hook;
}
public void resetHandleUnsubscribe() {
handleUnsubscribe = null;
}
public void setHandleCloseProducer(CommandCloseProducerHook hook) {
handleCloseProducer = hook;
}
public void resetHandleCloseProducer() {
handleCloseProducer = null;
}
public void setHandleCloseConsumer(CommandCloseConsumerHook hook) {
handleCloseConsumer = hook;
}
public void resetHandleCloseConsumer() {
handleCloseConsumer = null;
}
public String getHttpAddress() {
return String.format("http://localhost:%d", server.getURI().getPort());
}
public String getBrokerAddress() {
return String.format("pulsar://localhost:%d", ((InetSocketAddress) listenChannel.localAddress()).getPort());
}
private static final Logger log = LoggerFactory.getLogger(MockBrokerService.class);
}