blob: 48c1a9d39ac65cd42e1b64f26003d4697eacdcd5 [file] [log] [blame] [view]
---
title: Batch Processor
---
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->
The batch processor can be used to aggregate entries(logs/any data) and process them in a batch.
When the batch_max_size is set to 1 the processor will execute each entry immediately. Setting the batch max size more
than 1 will start aggregating the entries until it reaches the max size or the timeout expires.
## Configurations
The only mandatory parameter to create a batch processor is a function. The function will be executed when the batch reaches the max size
or when the buffer duration exceeds.
| Name | Type | Requirement | Default | Valid | Description |
| ---------------- | ------- | ----------- | ------- | ------- | ------------------------------------------------------------ |
| name | string | optional | logger's name | ["http logger",...] | A unique identifier used to identify the batch processor, which defaults to the name of the logger plug-in that calls the batch processor, such as plug-in "http logger" 's `name` is "http logger. |
| batch_max_size | integer | optional | 1000 | [1,...] | Sets the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the HTTP/HTTPS service. |
| inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the HTTP/HTTPS service regardless of whether the number of logs in the buffer reaches the maximum number set. |
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed. |
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing the entry from the processing pipeline when an error occurs. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
The following code shows an example of how to use batch processor in your plugin:
```lua
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
...
local plugin_name = "xxx-logger"
local batch_processor_manager = bp_manager_mod.new(plugin_name)
local schema = {...}
local _M = {
...
name = plugin_name,
schema = batch_processor_manager:wrap_schema(schema),
}
...
function _M.log(conf, ctx)
local entry = {...} -- data to log
if batch_processor_manager:add_entry(conf, entry) then
return
end
-- create a new processor if not found
-- entries is an array table of entry, which can be processed in batch
local func = function(entries)
-- serialize to json array core.json.encode(entries)
-- process/send data
return true
-- return false, err_msg, first_fail if failed
-- first_fail(optional) indicates first_fail-1 entries have been successfully processed
-- and during processing of entries[first_fail], the error occurred. So the batch processor
-- only retries for the entries having index >= first_fail as per the retry policy.
end
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end
```
The batch processor's configuration will be set inside the plugin's configuration.
For example:
:::note
You can fetch the `admin_key` from `config.yaml` and save to an environment variable with the following command:
```bash
admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"//g')
```
:::
```shell
curl http://127.0.0.1:9180/apisix/admin/routes/1 -H "X-API-KEY: $admin_key" -X PUT -d '
{
"plugins": {
"http-logger": {
"uri": "http://mockbin.org/bin/:ID",
"batch_max_size": 10,
"max_retry_count": 1
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
},
"uri": "/hello"
}'
```
If your plugin only uses one global batch processor,
you can also use the processor directly:
```lua
local entry = {...} -- data to log
if log_buffer then
log_buffer:push(entry)
return
end
local config_bat = {
name = config.name,
retry_delay = config.retry_delay,
...
}
local err
-- entries is an array table of entry, which can be processed in batch
local func = function(entries)
...
return true
-- return false, err_msg, first_fail if failed
end
log_buffer, err = batch_processor:new(func, config_bat)
if not log_buffer then
core.log.warn("error when creating the batch processor: ", err)
return
end
log_buffer:push(entry)
```
Note: Please make sure the batch max size (entry count) is within the limits of the function execution.
The timer to flush the batch runs based on the `inactive_timeout` configuration. Thus, for optimal usage,
keep the `inactive_timeout` smaller than the `buffer_duration`.