blob: e1ce83b76d2a3a098e63601c71f8fbb49a84e893 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';
log_level('debug');
repeat_each(1);
no_long_string();
no_root_location();
run_tests;
__DATA__
=== TEST 1: send invalid arguments for constructor
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 1,
retry_delay = 0,
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new("", config)
if log_buffer then
log_buffer:push({hello='world'})
ngx.say("done")
end
if not log_buffer then
ngx.say("failed")
end
}
}
--- request
GET /t
--- response_body
failed
--- wait: 0.5
=== TEST 2: sanity
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local func_to_send = function(elements)
return true
end
local config = {
max_retry_count = 2,
batch_max_size = 1,
retry_delay = 0,
}
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
Batch Processor[log buffer] successfully processed the entries
--- wait: 0.5
=== TEST 3: batch processor timeout exceeded
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
inactive_timeout = 1
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] successfully processed the entries
--- wait: 3
=== TEST 4: batch processor batch max size exceeded
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
--- error_log
Batch Processor[log buffer] batch max size has exceeded
Batch Processor[log buffer] successfully processed the entries
--- wait: 1
=== TEST 5: first failed to process and second try success
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local core = require("apisix.core")
local retry = false
local config = {
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
}
local func_to_send = function(elements)
if not retry then
retry = true
return false
end
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
Batch Processor[log buffer] failed to process entries
Batch Processor[log buffer] successfully processed the entries
--- wait: 0.5
=== TEST 6: Exceeding max retry count
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
}
local func_to_send = function(elements)
return false
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
--- error_log
Batch Processor[log buffer] failed to process entries
Batch Processor[log buffer] exceeded the max_retry_count
--- wait: 0.5
=== TEST 7: two batches
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local core = require("apisix.core")
local count = 0
local config = {
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
}
local func_to_send = function(elements)
count = count + 1
core.log.info("batch[", count , "] sent")
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
Batch Processor[log buffer] activating flush due to no activity
--- error_log
batch[1] sent
batch[2] sent
--- wait: 0.5
=== TEST 8: batch processor retry count 0 and fail processing
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local config = {
max_retry_count = 0,
batch_max_size = 2,
retry_delay = 0,
}
local func_to_send = function(elements)
return false
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
Batch Processor[log buffer] activating flush due to no activity
--- error_log
Batch Processor[log buffer] exceeded the max_retry_count
--- wait: 0.5
=== TEST 9: batch processor timeout exceeded
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local config = {
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
buffer_duration = 60,
inactive_timeout = 1,
}
local func_to_send = function(elements)
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({hello='world'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] successfully processed the entries
--- wait: 3
=== TEST 10: json encode and log elements
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local core = require("apisix.core")
local config = {
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
}
local func_to_send = function(elements)
core.log.info(require("toolkit.json").encode(elements))
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({msg='1'})
log_buffer:push({msg='2'})
log_buffer:push({msg='3'})
log_buffer:push({msg='4'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
Batch Processor[log buffer] activating flush due to no activity
--- error_log
[{"msg":"1"},{"msg":"2"}]
[{"msg":"3"},{"msg":"4"}]
--- wait: 0.5
=== TEST 11: extend timer
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local core = require("apisix.core")
local config = {
max_retry_count = 1,
batch_max_size = 3,
retry_delay = 0,
inactive_timeout = 1
}
local func_to_send = function(elements)
core.log.info(require("toolkit.json").encode(elements))
return true
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({msg='1'})
ngx.sleep(0.3)
log_buffer:push({msg='2'})
log_buffer:push({msg='3'})
log_buffer:push({msg='4'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
Batch Processor[log buffer] activating flush due to no activity
--- error_log
Batch Processor[log buffer] extending buffer timer
--- wait: 3
=== TEST 12: partially consumed entries
--- config
location /t {
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local core = require("apisix.core")
local config = {
max_retry_count = 1,
batch_max_size = 3,
retry_delay = 0,
inactive_timeout = 1
}
local func_to_send = function(elements)
core.log.info(require("toolkit.json").encode(elements))
return false, "error after consuming single entry", 2
end
local log_buffer, err = Batch:new(func_to_send, config)
if not log_buffer then
ngx.say(err)
end
log_buffer:push({msg='1'})
log_buffer:push({msg='2'})
log_buffer:push({msg='3'})
log_buffer:push({msg='4'})
ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- error_log
[{"msg":"1"},{"msg":"2"},{"msg":"3"}]
Batch Processor[log buffer] failed to process entries [2/3]: error after consuming single entry
[{"msg":"2"},{"msg":"3"}]
Batch Processor[log buffer] failed to process entries [1/2]: error after consuming single entry
[{"msg":"4"}]
--- wait: 2