perf(ai-proxy): optimize SSE decoder - remove PCRE, add decode_buf, fix comment lines (#13391)
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index 4f7410a..2e664e6 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -1471,52 +1471,45 @@
-- can detect client disconnection. Defaults to false (async flush).
-- @return boolean, string|nil Always returns (ok, err). On success returns true.
-- On flush failure or print failure returns false, err.
-function _M.lua_response_filter(api_ctx, headers, body, wait)
+function _M.lua_response_filter(api_ctx, headers, body, no_flush, wait)
local plugins = api_ctx.plugins
- if not plugins or #plugins == 0 then
- -- if there is no any plugin, just print the original body to downstream
- local ok, err = ngx_print(body)
- if not ok then
- return false, err
- end
- ok, err = ngx_flush(wait == true)
- if not ok then
- return false, err
- end
- return true
- end
- for i = 1, #plugins, 2 do
- local phase_func = plugins[i]["lua_body_filter"]
- if phase_func then
- local conf = plugins[i + 1]
- if not meta_filter(api_ctx, plugins[i]["name"], conf)then
- goto CONTINUE
- end
-
- run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
- local code, new_body = phase_func(conf, api_ctx, headers, body)
- if code then
- if code ~= ngx_ok then
- ngx.status = code
+ if plugins and #plugins > 0 then
+ for i = 1, #plugins, 2 do
+ local phase_func = plugins[i]["lua_body_filter"]
+ if phase_func then
+ local conf = plugins[i + 1]
+ if not meta_filter(api_ctx, plugins[i]["name"], conf)then
+ goto CONTINUE
end
- ngx_print(new_body)
- ngx_exit(ngx_ok)
- end
- if new_body then
- body = new_body
- end
- end
+ run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
+ local code, new_body = phase_func(conf, api_ctx, headers, body)
+ if code then
+ if code ~= ngx_ok then
+ ngx.status = code
+ end
- ::CONTINUE::
+ ngx_print(new_body)
+ ngx_exit(ngx_ok)
+ end
+ if new_body then
+ body = new_body
+ end
+ end
+
+ ::CONTINUE::
+ end
end
local ok, err = ngx_print(body)
if not ok then
return false, err
end
- ok, err = ngx_flush(wait == true)
- if not ok then
- return false, err
+ if not no_flush then
+ core.log.debug("lua_response_filter: flushing chunk to client")
+ ok, err = ngx_flush(wait == true)
+ if not ok then
+ return false, err
+ end
end
return true
end
diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua
index 8cad693..f7cde8a 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -37,8 +37,8 @@
local deep_merge = require("apisix.plugins.ai-proxy.merge").deep_merge
local ngx = ngx
local ngx_now = ngx.now
-local tonumber = tonumber
local require = require
+local tonumber = tonumber
local table = table
local pairs = pairs
@@ -420,7 +420,12 @@
local body_reader = res.body_reader
local contents = {}
local sse_state = { is_first = true }
- local sse_buf = ""
+ -- SSE framing buffer: accumulate chunks with table.insert to avoid
+ -- allocating a new string on every append; reset to {remainder} after
+ -- each split so the table never grows beyond two elements.
+ -- Initialized with "" so the fast-path (sse_parts[1] == "") activates
+ -- immediately on the first chunk, avoiding an unnecessary table.concat.
+ local sse_parts = {""}
-- Track whether any output was sent to the client.
-- When a converter is active but the upstream returns a different SSE format,
-- all events may be skipped and no output produced, leaving the response
@@ -436,59 +441,146 @@
end
local bytes_read = 0
+ -- streaming_flush_interval_ms controls both flush strategy and the thread:
+ -- == 0 : no thread; lua_response_filter flushes synchronously
+ -- per chunk via ngx.flush(true), guaranteeing immediate
+ -- client delivery.
+ -- > 0 (default: 10): background thread calls ngx.flush(false) every N ms;
+ -- lua_response_filter skips per-chunk flush for maximum
+ -- throughput. Useful when the upstream bursts multiple
+ -- tokens at once.
+ local flush_interval_ms = conf and conf.streaming_flush_interval_ms or 0
+ -- async_flush: true when the interval thread is responsible for flushing
+ local async_flush = flush_interval_ms > 0
+ -- needs_flush is set to true immediately after dispatching a chunk so the
+ -- thread always flushes exactly the data that has been written. Cleared
+ -- before ngx.flush() so any new chunks written during the flush yield are
+ -- picked up on the next interval rather than silently dropped.
+ local needs_flush = false
+ local flush_thread
+ local flush_err
+ if async_flush then
+ local interval_s = flush_interval_ms / 1000
+ local spawn_err
+ flush_thread, spawn_err = ngx.thread.spawn(function()
+ while true do
+ ngx.sleep(interval_s)
+ if needs_flush then
+ needs_flush = false
+ local ok, err = ngx.flush(false)
+ if not ok then
+ flush_err = err
+ return
+ end
+ core.log.debug("ai-proxy: flush_thread periodic flush")
+ end
+ end
+ end)
+ if not flush_thread then
+ core.log.error("failed to spawn flush thread: ", spawn_err)
+ async_flush = false
+ end
+ end
+
local function abort_on_disconnect(flush_err)
core.log.info("client disconnected during AI streaming, ",
"aborting upstream read: ", flush_err)
+ if flush_thread then
+ ngx.thread.kill(flush_thread)
+ flush_thread = nil
+ end
if res._httpc then
res._httpc:close()
res._httpc = nil
end
res._upstream_bytes = bytes_read
+ ctx.var.apisix_upstream_response_time = math.floor(
+ (ngx_now() - ctx.llm_request_start_time) * 1000)
ctx.var.llm_request_done = true
end
+ -- Use a local flag instead of reading ctx.var on every chunk.
+ local first_token_set = false
+
while true do
+ if flush_err then
+ abort_on_disconnect(flush_err)
+ return
+ end
+
local chunk, err = body_reader()
- ctx.var.apisix_upstream_response_time = math.floor((ngx_now() -
- ctx.llm_request_start_time) * 1000)
if err then
+ ctx.var.apisix_upstream_response_time = math.floor(
+ (ngx_now() - ctx.llm_request_start_time) * 1000)
core.log.warn("failed to read response chunk: ", err)
res._upstream_bytes = bytes_read
+ if flush_thread then
+ ngx.thread.kill(flush_thread)
+ flush_thread = nil
+ end
return transport_http.handle_error(err)
end
if not chunk then
- if #sse_buf > 0 then
+ local sse_rem = table.concat(sse_parts)
+ if #sse_rem > 0 then
core.log.warn("dropping incomplete stream frame at EOF, size: ",
- #sse_buf)
+ #sse_rem)
end
res._upstream_bytes = bytes_read
+ ctx.var.apisix_upstream_response_time = math.floor(
+ (ngx_now() - ctx.llm_request_start_time) * 1000)
if converter and not output_sent then
+ if flush_thread then
+ ngx.thread.kill(flush_thread)
+ end
local msg = "streaming response completed without producing "
.. "any output; the upstream likely returned a "
.. "different stream format than the converter expects"
core.log.error(msg)
return 502, msg
end
+ -- Final sync flush: ensure the last async-queued bytes reach the client.
+ -- flush_err means client already disconnected; skip to avoid a noisy log.
+ if flush_thread then
+ ngx.thread.kill(flush_thread)
+ flush_thread = nil
+ end
+ if not flush_err then
+ ngx.flush(true)
+ end
return
end
bytes_read = bytes_read + #chunk
- if ctx.var.llm_time_to_first_token == "0" then
+ if not first_token_set then
ctx.var.llm_time_to_first_token = math.floor(
- (ngx_now() - ctx.llm_request_start_time) * 1000)
+ (ngx_now() - ctx.llm_request_start_time) * 1000)
+ first_token_set = true
end
- sse_buf = sse_buf .. chunk
- local complete, remainder = framing.split_buf(sse_buf)
+ -- Skip table.concat when there is no carry-over remainder (common case).
+ -- sse_parts is reset to {remainder} after each iteration; when the
+ -- previous chunk ended on a boundary, sse_parts[1] == "" and we can
+ -- hand the new chunk directly to decode_buf without allocating a concat.
+ local candidate
+ if sse_parts[1] == "" then
+ candidate = chunk
+ else
+ sse_parts[#sse_parts + 1] = chunk
+ candidate = table.concat(sse_parts)
+ end
+
+ -- One-pass split + decode: finds all complete SSE events and the
+ -- trailing remainder in a single forward scan (no PCRE, no double scan).
+ local events, remainder = framing.decode_buf(candidate)
local max_remainder = framing.max_remainder or 1024 * 1024
if #remainder > max_remainder then
core.log.warn("stream remainder exceeded ", max_remainder, " bytes, resetting")
remainder = ""
end
- sse_buf = remainder
- local events = complete ~= "" and framing.decode(complete) or {}
+ sse_parts = {remainder}
ctx.llm_response_contents_in_chunk = {}
local converted_chunks = {}
@@ -532,24 +624,33 @@
::CONTINUE::
end
- -- Output: converter events or passthrough raw chunk.
- -- Pass wait=true for synchronous flush so we can detect client disconnection.
+ -- Dispatch chunk downstream. Plugins run per-chunk so body_filter
+ -- hooks (e.g. content moderation) receive every SSE event individually.
+ -- no_flush=true when the interval thread handles flushing; otherwise
+ -- no_flush=nil + wait=true for synchronous per-chunk delivery guarantee.
+ local no_flush = async_flush or nil
if converter then
for _, c in ipairs(converted_chunks) do
- local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, c, true)
- output_sent = true
+ local ok, flush_err = plugin.lua_response_filter(
+ ctx, res.headers, c, no_flush, true)
if not ok then
abort_on_disconnect(flush_err)
return
end
+ output_sent = true
end
else
- local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, chunk, true)
- output_sent = true
+ local ok, flush_err = plugin.lua_response_filter(
+ ctx, res.headers, chunk, no_flush, true)
if not ok then
abort_on_disconnect(flush_err)
return
end
+ output_sent = true
+ end
+ -- Let the interval flush thread know there is unflushed output.
+ if async_flush then
+ needs_flush = true
end
-- Enforce runaway-upstream safeguards after processing the chunk.
@@ -561,6 +662,10 @@
limit_hit = "max_response_bytes"
end
if limit_hit then
+ if flush_thread then
+ ngx.thread.kill(flush_thread)
+ flush_thread = nil
+ end
local duration_ms = math.floor((ngx_now() -
ctx.llm_request_start_time) * 1000)
core.log.warn("aborting AI stream: ", limit_hit, " exceeded;",
@@ -574,6 +679,7 @@
end
-- Signal downstream filters (e.g. moderation plugins that defer
-- work until request completion) that no more content is coming.
+ ctx.var.apisix_upstream_response_time = duration_ms
ctx.var.llm_request_done = true
res._upstream_bytes = bytes_read
if output_sent then
@@ -602,6 +708,7 @@
-- backpressure, or time out stalled streams. See #13256 for a proper
-- solution.
ngx.sleep(0)
+
end
end
diff --git a/apisix/plugins/ai-proxy/schema.lua b/apisix/plugins/ai-proxy/schema.lua
index c249b41..79384bd 100644
--- a/apisix/plugins/ai-proxy/schema.lua
+++ b/apisix/plugins/ai-proxy/schema.lua
@@ -267,6 +267,16 @@
description = "keepalive timeout in milliseconds",
},
keepalive_pool = {type = "integer", minimum = 1, default = 30},
+ streaming_flush_interval_ms = {
+ type = "integer",
+ minimum = 0,
+ default = 10,
+ description = "A background thread flushes the output buffer every N "
+ .. "milliseconds (async flush). Useful when the upstream bursts "
+ .. "multiple tokens at once and you need to bound client latency. "
+ .. "Set to 0 to disable the background thread and flush each "
+ .. "chunk synchronously inline.",
+ },
ssl_verify = {type = "boolean", default = true },
override = override_schema,
},
@@ -353,6 +363,16 @@
description = "keepalive timeout in milliseconds",
},
keepalive_pool = {type = "integer", minimum = 1, default = 30},
+ streaming_flush_interval_ms = {
+ type = "integer",
+ minimum = 0,
+ default = 10,
+ description = "A background thread flushes the output buffer every N "
+ .. "milliseconds (async flush). Useful when the upstream bursts "
+ .. "multiple tokens at once and you need to bound client latency. "
+ .. "Set to 0 to disable the background thread and flush each "
+ .. "chunk synchronously inline.",
+ },
ssl_verify = {type = "boolean", default = true },
},
required = {"instances"},
diff --git a/apisix/plugins/ai-transport/aws-eventstream.lua b/apisix/plugins/ai-transport/aws-eventstream.lua
index 09f0178..0ae3a19 100644
--- a/apisix/plugins/ai-transport/aws-eventstream.lua
+++ b/apisix/plugins/ai-transport/aws-eventstream.lua
@@ -289,4 +289,15 @@
end
+--- Combined split + decode in one call, matching the interface expected by
+-- ai-providers/base.lua.
+-- @param buf string Accumulated bytes from the upstream socket.
+-- @return table Array of decoded event tables (same as decode()).
+-- @return string Trailing bytes that did not form a complete frame.
+function _M.decode_buf(buf)
+ local complete, remainder = _M.split_buf(buf)
+ return _M.decode(complete), remainder
+end
+
+
return _M
diff --git a/apisix/plugins/ai-transport/sse.lua b/apisix/plugins/ai-transport/sse.lua
index ad8ab27..a8f6709 100644
--- a/apisix/plugins/ai-transport/sse.lua
+++ b/apisix/plugins/ai-transport/sse.lua
@@ -17,73 +17,141 @@
--- SSE (Server-Sent Events) codec and buffer management.
-local core = require("apisix.core")
local table = require("apisix.core.table")
-local ngx_re = require("ngx.re")
local tonumber = tonumber
local tostring = tostring
-local ipairs = ipairs
local _M = {
- -- Cap on bytes split_buf may leave in `remainder`. Used by the streaming
- -- loop in ai-providers.base to bound the buffer when frames don't
- -- complete. SSE frames are small (text events delimited by blank lines),
- -- so 1 MiB is plenty.
+ -- Cap on bytes that decode_buf (and split_buf) may leave in `remainder`.
+ -- Read by the streaming loop in ai-providers/base.lua to bound the buffer
+ -- when frames don't complete. SSE frames are small (text events delimited
+ -- by blank lines), so 1 MiB is plenty.
max_remainder = 1024 * 1024,
}
+-- Parse one raw SSE event block (text between two boundary markers) into
+-- an event table. Returns nil for empty / comment-only blocks.
+local function parse_raw_event(raw)
+ local event = {
+ type = "message",
+ data = {},
+ id = nil,
+ retry = nil,
+ }
+ local has_field = false
+
+ local pos = 1
+ local raw_len = #raw
+ while pos <= raw_len do
+ local nl = raw:find("\n", pos, true)
+ local line
+ if nl then
+ line = raw:sub(pos, nl - 1)
+ pos = nl + 1
+ else
+ line = raw:sub(pos)
+ pos = raw_len + 1
+ end
+
+ -- Strip trailing \r for CRLF line endings.
+ if line:sub(-1) == "\r" then
+ line = line:sub(1, -2)
+ end
+
+ if #line == 0 then
+ -- blank line inside a raw block (shouldn't happen after split, skip)
+ goto NEXT_LINE
+ end
+
+ -- Find the field:value separator. Plain search avoids pattern engine.
+ local colon = line:find(":", 1, true)
+ if not colon then goto NEXT_LINE end
+ -- Lines starting with ":" are SSE comments; skip without setting has_field.
+ if colon == 1 then goto NEXT_LINE end
+
+ local name = line:sub(1, colon - 1):lower()
+ local value = line:sub(colon + 1)
+ -- Strip a single leading space per the SSE spec.
+ if value:sub(1, 1) == " " then
+ value = value:sub(2)
+ end
+
+ has_field = true
+ if name == "data" then
+ table.insert(event.data, value)
+ elseif name == "event" then
+ event.type = value
+ elseif name == "id" then
+ event.id = value
+ elseif name == "retry" then
+ event.retry = tonumber(value)
+ end
+
+ ::NEXT_LINE::
+ end
+
+ if not has_field then
+ return nil
+ end
+ event.data = table.concat(event.data, "\n")
+ return event
+end
+
+
+-- Find the next event boundary (\n\n or \r\n\r\n) starting at `pos`.
+-- Returns (ev_end, next_pos) or (nil, nil) when none found.
+-- ev_end = last byte of the event content (before the boundary)
+-- next_pos = first byte after the boundary
+local function next_boundary(buf, pos)
+ local p_lf = buf:find("\n\n", pos, true)
+ local p_crlf = buf:find("\r\n\r\n", pos, true)
+
+ if p_lf and p_crlf then
+ if p_lf <= p_crlf then
+ return p_lf - 1, p_lf + 2
+ else
+ return p_crlf - 1, p_crlf + 4
+ end
+ elseif p_lf then
+ return p_lf - 1, p_lf + 2
+ elseif p_crlf then
+ return p_crlf - 1, p_crlf + 4
+ end
+ return nil, nil
+end
+
+
--- Decode an SSE text chunk into a list of event tables.
-- Each event has: type (string), data (string), id (string|nil), retry (number|nil).
+-- The chunk is expected to contain only complete events (no trailing partial event).
+-- Replaces the former ngx_re.split-based implementation; no PCRE overhead.
function _M.decode(chunk)
local events = {}
-
- if not chunk then
+ if not chunk or chunk == "" then
return events
end
- local raw_events, err = ngx_re.split(chunk, "\\r?\\n\\r?\\n")
- if not raw_events then
- core.log.warn("failed to split SSE chunk: ", err)
- return events
- end
- for _, raw_event in ipairs(raw_events) do
- local event = {
- type = "message",
- data = {},
- id = nil,
- retry = nil
- }
- local lines, err = ngx_re.split(raw_event, "\\r?\\n")
- if not lines then
- core.log.warn("failed to split event lines: ", err)
- goto CONTINUE
- end
-
- for _, line in ipairs(lines) do
- local name, value = line:match("^([^:]+): ?(.*)$")
- if not name then goto NEXT_LINE end
-
- name = name:lower()
-
- if name == "event" then
- event.type = value
- elseif name == "data" then
- table.insert(event.data, value)
- elseif name == "id" then
- event.id = value
- elseif name == "retry" then
- event.retry = tonumber(value)
+ local pos = 1
+ local len = #chunk
+ while pos <= len do
+ local ev_end, next_pos = next_boundary(chunk, pos)
+ if not ev_end then
+ -- No trailing blank line: treat the remaining content as a complete
+ -- event for backward compatibility with callers that pass full response
+ -- bodies or per-chunk bodies without a terminal blank line.
+ local event = parse_raw_event(chunk:sub(pos))
+ if event then
+ table.insert(events, event)
end
-
- ::NEXT_LINE::
+ break
end
-
- event.data = table.concat(event.data, "\n")
-
- table.insert(events, event)
-
- ::CONTINUE::
+ local raw = chunk:sub(pos, ev_end)
+ local event = parse_raw_event(raw)
+ if event then
+ table.insert(events, event)
+ end
+ pos = next_pos
end
return events
@@ -116,8 +184,36 @@
end
---- Split an SSE buffer at the last complete event boundary.
--- Returns (complete_events, remainder) where complete_events includes
+--- Decode a raw SSE buffer in one forward pass, returning (events, remainder).
+-- Combines the split_buf + decode two-step into a single scan for use in
+-- high-throughput loops (e.g. ai-providers/base.lua).
+-- `remainder` holds any trailing bytes that did not end with a boundary marker.
+function _M.decode_buf(buf)
+ local events = {}
+ local pos = 1
+ local len = #buf
+ local last_complete_pos = 1
+
+ while pos <= len do
+ local ev_end, next_pos = next_boundary(buf, pos)
+ if not ev_end then
+ break
+ end
+ local raw = buf:sub(pos, ev_end)
+ local event = parse_raw_event(raw)
+ if event then
+ table.insert(events, event)
+ end
+ last_complete_pos = next_pos
+ pos = next_pos
+ end
+
+ local remainder = (last_complete_pos <= len) and buf:sub(last_complete_pos) or ""
+ return events, remainder
+end
+
+
+-- Returns (complete_data, remainder) where complete_data includes
-- all data up to and including the last "\n\n" or "\r\n\r\n" boundary,
-- and remainder holds any trailing incomplete event data.
-- Returns ("", buf) when no boundary is found.
diff --git a/docs/en/latest/plugins/ai-proxy.md b/docs/en/latest/plugins/ai-proxy.md
index ab253df..e3e1b59 100644
--- a/docs/en/latest/plugins/ai-proxy.md
+++ b/docs/en/latest/plugins/ai-proxy.md
@@ -121,6 +121,7 @@
| keepalive_timeout | integer | False | 60000 | ≥ 1000 | Keepalive timeout in milliseconds when connecting to the LLM service. |
| keepalive_pool | integer | False | 30 | ≥ 1 | Keepalive pool size for the LLM service connection. |
| ssl_verify | boolean | False | true | | If true, verifies the LLM service's certificate. |
+| streaming_flush_interval_ms | integer | False | 10 | ≥ 0 | Interval in milliseconds for the background flush thread. When `> 0` (default: `10`), a background timer calls `ngx.flush(false)` every N ms, batching output for bursty upstreams. When `0`, the background thread is disabled and each chunk is flushed synchronously via `ngx.flush(true)`, guaranteeing immediate client delivery. |
## Provider-aware `max_tokens` mapping
diff --git a/docs/zh/latest/plugins/ai-proxy.md b/docs/zh/latest/plugins/ai-proxy.md
index 2d49c0c..ba01056 100644
--- a/docs/zh/latest/plugins/ai-proxy.md
+++ b/docs/zh/latest/plugins/ai-proxy.md
@@ -121,6 +121,7 @@
| keepalive_timeout | integer | 否 | 60000 | ≥ 1000 | 连接到 LLM 服务时的保活超时时间(毫秒)。 |
| keepalive_pool | integer | 否 | 30 | ≥ 1 | LLM 服务连接的保活池大小。 |
| ssl_verify | boolean | 否 | true | | 如果为 true,验证 LLM 服务的证书。 |
+| streaming_flush_interval_ms | integer | 否 | 10 | ≥ 0 | 后台刷新线程的间隔时间(毫秒)。`> 0`(默认值:`10`)时,后台定时器每隔 N 毫秒调用一次 `ngx.flush(false)`,适合上游批量发送 token 的场景。设为 `0` 时禁用后台线程,改为每个 chunk 同步调用 `ngx.flush(true)` 立即刷新。 |
## Provider-aware `max_tokens` mapping
diff --git a/t/plugin/ai-proxy-client-disconnect.t b/t/plugin/ai-proxy-client-disconnect.t
index 654da7e..b144b57 100644
--- a/t/plugin/ai-proxy-client-disconnect.t
+++ b/t/plugin/ai-proxy-client-disconnect.t
@@ -222,3 +222,137 @@
^ok, upstream aborted after ~\d+ chunks$
--- error_log
client disconnected during AI streaming
+
+
+
+=== TEST 3: set route for async flush disconnect test
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/2',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/anything-async-flush",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "gpt-4",
+ "stream": true
+ },
+ "override": {
+ "endpoint": "http://localhost:7750"
+ },
+ "ssl_verify": false,
+ "streaming_flush_interval_ms": 50
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 4: async flush disconnect still aborts upstream read early
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+ if not ok then
+ ngx.status = 500
+ ngx.say("connect failed: ", err)
+ return
+ end
+
+ local res, err = httpc:request({
+ method = "POST",
+ headers = { ["Content-Type"] = "application/json" },
+ path = "/anything-async-flush",
+ body = [[{"messages": [{"role": "user", "content": "hi"}]}]],
+ })
+ if not res then
+ ngx.status = 500
+ ngx.say("request failed: ", err)
+ return
+ end
+
+ for i = 1, 3 do
+ local chunk, rerr = res.body_reader()
+ if rerr or not chunk then
+ ngx.status = 500
+ ngx.say("unexpected end of stream at chunk ", i, ": ", rerr)
+ return
+ end
+ end
+ httpc:close()
+
+ ngx.sleep(1.0)
+
+ local probe = http.new()
+ ok, err = probe:connect({ scheme = "http", host = "localhost", port = 7750 })
+ if not ok then
+ ngx.status = 500
+ ngx.say("probe connect failed: ", err)
+ return
+ end
+ local probe_res, probe_err = probe:request({
+ method = "GET",
+ path = "/chunks",
+ headers = { Host = "localhost" },
+ })
+ if not probe_res then
+ ngx.status = 500
+ ngx.say("probe request failed: ", probe_err)
+ return
+ end
+ local count_str = probe_res:read_body()
+ probe:close()
+
+ if probe_res.status ~= 200 then
+ ngx.status = 500
+ ngx.say("probe status unexpected: ", probe_res.status)
+ return
+ end
+
+ local count = tonumber(count_str)
+ if not count then
+ ngx.status = 500
+ ngx.say("invalid probe response: ", count_str or "nil")
+ return
+ end
+
+ if count > 15 then
+ ngx.status = 500
+ ngx.say("upstream was not aborted promptly, chunks: ", count)
+ return
+ end
+ ngx.say("ok, upstream aborted after ~", count, " chunks")
+ }
+ }
+--- response_body_like
+^ok, upstream aborted after ~\d+ chunks$
+--- error_log
+client disconnected during AI streaming
+--- no_error_log
+final flush failed
diff --git a/t/plugin/ai-proxy-flush.t b/t/plugin/ai-proxy-flush.t
new file mode 100644
index 0000000..eac078e
--- /dev/null
+++ b/t/plugin/ai-proxy-flush.t
@@ -0,0 +1,320 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+log_level("debug");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+ my $http_config = $block->http_config // <<_EOC_;
+ server {
+ server_name mock_openai_sse;
+ listen 7751;
+
+ default_type 'text/event-stream';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ local args = ngx.req.get_uri_args()
+ local delay = args["delay"]
+ ngx.header["Content-Type"] = "text/event-stream"
+ local events = {
+ 'data: {"id":"1","choices":[{"delta":{"role":"assistant","content":""},"index":0,"finish_reason":null}]}\\n\\n',
+ 'data: {"id":"1","choices":[{"delta":{"content":"Hello"},"index":0,"finish_reason":null}]}\\n\\n',
+ 'data: {"id":"1","choices":[{"delta":{"content":" world"},"index":0,"finish_reason":null}]}\\n\\n',
+ 'data: {"id":"1","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}\\n\\n',
+ 'data: [DONE]\\n\\n',
+ }
+ for _, ev in ipairs(events) do
+ ngx.print(ev)
+ ngx.flush(true)
+ if delay then
+ ngx.sleep(0.05)
+ end
+ end
+ }
+ }
+ }
+_EOC_
+
+ $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create route with streaming_flush_interval_ms=0 (per-chunk sync flush)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/flush-default",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer test-key"
+ }
+ },
+ "options": {
+ "model": "gpt-4",
+ "stream": true
+ },
+ "override": {
+ "endpoint": "http://localhost:7751/v1/chat/completions?delay=true"
+ },
+ "ssl_verify": false,
+ "streaming_flush_interval_ms": 0
+ }
+ }
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 2: interval_ms=0 (per-chunk flush) - flush_thread must NOT appear, sync flush per chunk
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+ if not ok then
+ ngx.status = 500
+ ngx.say("connect: " .. err)
+ return
+ end
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/flush-default",
+ headers = { ["Content-Type"] = "application/json" },
+ body = '{"messages":[{"role":"user","content":"hi"}],"model":"gpt-4","stream":true}',
+ })
+ if not res then
+ ngx.status = 500
+ ngx.say("request: " .. err)
+ return
+ end
+
+ local body = res:read_body()
+ if body:find("Hello", 1, true) and
+ body:find(" world", 1, true) and
+ body:find("[DONE]", 1, true) then
+ ngx.say("ok")
+ else
+ ngx.say("FAIL: unexpected body: " .. body:sub(1, 500))
+ end
+ }
+ }
+--- response_body
+ok
+--- no_error_log
+ai-proxy: flush_thread periodic flush
+--- grep_error_log eval
+qr/lua_response_filter: flushing chunk to client/
+--- grep_error_log_out
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+
+
+
+=== TEST 3: create route with streaming_flush_interval_ms=50 (background thread flush)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/2',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/flush-interval",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer test-key"
+ }
+ },
+ "options": {
+ "model": "gpt-4",
+ "stream": true
+ },
+ "override": {
+ "endpoint": "http://localhost:7751/v1/chat/completions?delay=true"
+ },
+ "ssl_verify": false,
+ "streaming_flush_interval_ms": 50
+ }
+ }
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 4: interval_ms=50 (background thread flush) - flush_thread log must appear
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+ if not ok then
+ ngx.status = 500
+ ngx.say("connect: " .. err)
+ return
+ end
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/flush-interval",
+ headers = { ["Content-Type"] = "application/json" },
+ body = '{"messages":[{"role":"user","content":"hi"}],"model":"gpt-4","stream":true}',
+ })
+ if not res then
+ ngx.status = 500
+ ngx.say("request: " .. err)
+ return
+ end
+
+ local body = res:read_body()
+ if body:find("Hello", 1, true) and
+ body:find(" world", 1, true) and
+ body:find("[DONE]", 1, true) then
+ ngx.say("ok")
+ else
+ ngx.say("FAIL: unexpected body: " .. body:sub(1, 500))
+ end
+ }
+ }
+--- response_body
+ok
+--- error_log
+ai-proxy: flush_thread periodic flush
+--- no_error_log
+lua_response_filter: flushing chunk to client
+
+
+
+=== TEST 5: streaming_flush_interval_ms schema rejects negative value
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.ai-proxy")
+ local ok, err = plugin.check_schema({
+ provider = "openai",
+ auth = { header = { Authorization = "Bearer x" } },
+ options = { model = "gpt-4" },
+ streaming_flush_interval_ms = -1,
+ })
+ if ok then
+ ngx.say("should have failed")
+ else
+ ngx.say("rejected: " .. tostring(err))
+ end
+ }
+ }
+--- response_body_like
+rejected: .*
+
+
+
+=== TEST 6: streaming_flush_interval_ms=0 is accepted (disables background thread)
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.ai-proxy")
+ local ok, err = plugin.check_schema({
+ provider = "openai",
+ auth = { header = { Authorization = "Bearer x" } },
+ options = { model = "gpt-4" },
+ streaming_flush_interval_ms = 0,
+ })
+ if ok then
+ ngx.say("ok")
+ else
+ ngx.say("FAIL: " .. tostring(err))
+ end
+ }
+ }
+--- response_body
+ok
+
+
+
+=== TEST 7: omitting streaming_flush_interval_ms applies default value of 10
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.ai-proxy")
+ local conf = {
+ provider = "openai",
+ auth = { header = { Authorization = "Bearer x" } },
+ options = { model = "gpt-4" },
+ }
+ local ok, err = plugin.check_schema(conf)
+ if not ok then
+ ngx.say("FAIL: " .. tostring(err))
+ return
+ end
+ ngx.say(conf.streaming_flush_interval_ms)
+ }
+ }
+--- response_body
+10