| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| use Cwd qw(cwd); |
| use t::APISIX 'no_plan'; |
| |
| repeat_each(1); |
| no_long_string(); |
| no_root_location(); |
| |
| my $apisix_home = $ENV{APISIX_HOME} // cwd(); |
| |
| add_block_preprocessor(sub { |
| my ($block) = @_; |
| |
| my $block_init = <<_EOC_; |
| `ln -sf $apisix_home/apisix $apisix_home/t/servroot/apisix`; |
| _EOC_ |
| |
| $block->set_value("init", $block_init); |
| |
| if (!defined $block->request) { |
| $block->set_value("request", "GET /t"); |
| } |
| }); |
| |
| add_test_cleanup_handler(sub { |
| `rm -f $apisix_home/t/servroot/apisix`; |
| }); |
| |
| run_tests(); |
| |
| __DATA__ |
| |
| === TEST 1: setup all-in-one test |
| --- config |
| location /t { |
| content_by_lua_block { |
| local data = { |
| { |
| url = "/apisix/admin/routes/kafka", |
| data = [[{ |
| "upstream": { |
| "nodes": { |
| "127.0.0.1:9092": 1 |
| }, |
| "type": "none", |
| "scheme": "kafka" |
| }, |
| "uri": "/kafka" |
| }]], |
| }, |
| { |
| url = "/apisix/admin/routes/kafka-invalid", |
| data = [[{ |
| "upstream": { |
| "nodes": { |
| "127.0.0.1:59092": 1 |
| }, |
| "type": "none", |
| "scheme": "kafka" |
| }, |
| "uri": "/kafka-invalid" |
| }]], |
| }, |
| { |
| url = "/apisix/admin/routes/kafka-tlsv", |
| data = [[{ |
| "upstream": { |
| "nodes": { |
| "127.0.0.1:9093": 1 |
| }, |
| "type": "none", |
| "scheme": "kafka", |
| "tls": { |
| "verify": true |
| } |
| }, |
| "uri": "/kafka-tlsv" |
| }]], |
| }, |
| { |
| url = "/apisix/admin/routes/kafka-tls", |
| data = [[{ |
| "upstream": { |
| "nodes": { |
| "127.0.0.1:9093": 1 |
| }, |
| "type": "none", |
| "scheme": "kafka", |
| "tls": { |
| "verify": false |
| } |
| }, |
| "uri": "/kafka-tls" |
| }]], |
| }, |
| { |
| url = "/apisix/admin/routes/kafka-sasl", |
| data = [[{ |
| "upstream": { |
| "nodes": { |
| "127.0.0.1:9094": 1 |
| }, |
| "type": "none", |
| "scheme": "kafka" |
| }, |
| "uri": "/kafka-sasl", |
| "plugins": { |
| "kafka-proxy": { |
| "sasl": { |
| "username": "admin", |
| "password": "admin-secret" |
| } |
| } |
| } |
| }]], |
| }, |
| } |
| |
| local t = require("lib.test_admin").test |
| |
| for _, data in ipairs(data) do |
| local code, body = t(data.url, ngx.HTTP_PUT, data.data) |
| ngx.say(body) |
| end |
| } |
| } |
| --- response_body eval |
| "passed\n"x5 |
| |
| |
| |
| === TEST 2: hit route (with HTTP request) |
| --- request |
| GET /kafka |
| --- error_code: 400 |
| --- error_log |
| failed to initialize pubsub module, err: bad "upgrade" request header: nil |
| |
| |
| |
| === TEST 3: hit route (Kafka) |
| --- config |
| # The messages used in this test are produced in the linux-ci-init-service.sh |
| # script that prepares the CI environment |
| location /t { |
| content_by_lua_block { |
| local pb = require("pb") |
| local lib_pubsub = require("lib.pubsub") |
| local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka") |
| local data = { |
| { |
| sequence = 0, |
| cmd_kafka_list_offset = { |
| topic = "not-exist", |
| partition = 0, |
| timestamp = -1, |
| }, |
| }, |
| { |
| sequence = 1, |
| cmd_kafka_fetch = { |
| topic = "not-exist", |
| partition = 0, |
| offset = 0, |
| }, |
| }, |
| { |
| -- Query first message offset |
| sequence = 2, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = -2, |
| }, |
| }, |
| { |
| -- Query last message offset |
| sequence = 3, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = -1, |
| }, |
| }, |
| { |
| -- Query by timestamp, 9999999999999 later than the |
| -- production time of any message |
| sequence = 4, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = "9999999999999", |
| }, |
| }, |
| { |
| -- Query by timestamp, 1500000000000 ms earlier than the |
| -- production time of any message |
| sequence = 5, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = "1500000000000", |
| }, |
| }, |
| { |
| sequence = 6, |
| cmd_kafka_fetch = { |
| topic = "test-consumer", |
| partition = 0, |
| offset = 14, |
| }, |
| }, |
| { |
| sequence = 7, |
| cmd_kafka_fetch = { |
| topic = "test-consumer", |
| partition = 0, |
| offset = 999, |
| }, |
| }, |
| } |
| |
| for i = 1, #data do |
| -- force clear state |
| pb.state(nil) |
| local data = test_pubsub:send_recv_ws_binary(data[i]) |
| if data.error_resp then |
| ngx.say(data.sequence..data.error_resp.message) |
| end |
| if data.kafka_list_offset_resp then |
| ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) |
| end |
| if data.kafka_fetch_resp then |
| ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset.. |
| " msg: "..data.kafka_fetch_resp.messages[1].value) |
| end |
| end |
| test_pubsub:close_ws() |
| } |
| } |
| --- response_body |
| 0failed to list offset, topic: not-exist, partition: 0, err: not found topic |
| 1failed to fetch message, topic: not-exist, partition: 0, err: not found topic |
| 2offset: 0 |
| 3offset: 30 |
| 4offset: -1 |
| 5offset: 0 |
| 6offset: 14 msg: testmsg15 |
| 7failed to fetch message, topic: test-consumer, partition: 0, err: OFFSET_OUT_OF_RANGE |
| |
| |
| |
| === TEST 4: hit route (Kafka with invalid node ip) |
| --- config |
| # The messages used in this test are produced in the linux-ci-init-service.sh |
| # script that prepares the CI environment |
| location /t { |
| content_by_lua_block { |
| local lib_pubsub = require("lib.pubsub") |
| local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-invalid") |
| |
| local data = test_pubsub:send_recv_ws_binary({ |
| sequence = 0, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = -2, |
| }, |
| }) |
| if data.error_resp then |
| ngx.say(data.sequence..data.error_resp.message) |
| end |
| test_pubsub:close_ws() |
| } |
| } |
| --- response_body |
| 0failed to list offset, topic: test-consumer, partition: 0, err: not found topic |
| --- error_log |
| all brokers failed in fetch topic metadata |
| |
| |
| |
| === TEST 5: hit route (Kafka with TLS) |
| --- config |
| location /t { |
| content_by_lua_block { |
| local lib_pubsub = require("lib.pubsub") |
| local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-tls") |
| |
| local data = test_pubsub:send_recv_ws_binary({ |
| sequence = 0, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = -1, |
| }, |
| }) |
| if data.kafka_list_offset_resp then |
| ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) |
| end |
| test_pubsub:close_ws() |
| } |
| } |
| --- response_body |
| 0offset: 30 |
| |
| |
| |
| === TEST 6: hit route (Kafka with TLS + ssl verify) |
| --- config |
| location /t { |
| content_by_lua_block { |
| local lib_pubsub = require("lib.pubsub") |
| local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-tlsv") |
| |
| local data = test_pubsub:send_recv_ws_binary({ |
| sequence = 0, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = -1, |
| }, |
| }) |
| if data.kafka_list_offset_resp then |
| ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) |
| end |
| test_pubsub:close_ws() |
| } |
| } |
| --- error_log eval |
| qr/self[- ]signed certificate/ |
| |
| |
| |
| === TEST 7: hit route (Kafka with SASL) |
| --- config |
| location /t { |
| content_by_lua_block { |
| local lib_pubsub = require("lib.pubsub") |
| local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-sasl") |
| |
| local data = test_pubsub:send_recv_ws_binary({ |
| sequence = 0, |
| cmd_kafka_list_offset = { |
| topic = "test-consumer", |
| partition = 0, |
| timestamp = -1, |
| }, |
| }) |
| if data.kafka_list_offset_resp then |
| ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) |
| end |
| test_pubsub:close_ws() |
| } |
| } |
| --- response_body |
| 0offset: 30 |