perf(ai-proxy): optimize SSE decoder - remove PCRE, add decode_buf, fix comment lines (#13391)
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index 4f7410a..2e664e6 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -1471,52 +1471,45 @@
 --   can detect client disconnection. Defaults to false (async flush).
 -- @return boolean, string|nil Always returns (ok, err). On success returns true.
 --   On flush failure or print failure returns false, err.
-function _M.lua_response_filter(api_ctx, headers, body, wait)
+function _M.lua_response_filter(api_ctx, headers, body, no_flush, wait)
     local plugins = api_ctx.plugins
-    if not plugins or #plugins == 0 then
-        -- if there is no any plugin, just print the original body to downstream
-        local ok, err = ngx_print(body)
-        if not ok then
-            return false, err
-        end
-        ok, err = ngx_flush(wait == true)
-        if not ok then
-            return false, err
-        end
-        return true
-    end
-    for i = 1, #plugins, 2 do
-        local phase_func = plugins[i]["lua_body_filter"]
-        if phase_func then
-            local conf = plugins[i + 1]
-            if not meta_filter(api_ctx, plugins[i]["name"], conf)then
-                goto CONTINUE
-            end
-
-            run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
-            local code, new_body = phase_func(conf, api_ctx, headers, body)
-            if code then
-                if code ~= ngx_ok then
-                    ngx.status = code
+    if plugins and #plugins > 0 then
+        for i = 1, #plugins, 2 do
+            local phase_func = plugins[i]["lua_body_filter"]
+            if phase_func then
+                local conf = plugins[i + 1]
+                if not meta_filter(api_ctx, plugins[i]["name"], conf)then
+                    goto CONTINUE
                 end
 
-                ngx_print(new_body)
-                ngx_exit(ngx_ok)
-            end
-            if new_body then
-                body = new_body
-            end
-        end
+                run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
+                local code, new_body = phase_func(conf, api_ctx, headers, body)
+                if code then
+                    if code ~= ngx_ok then
+                        ngx.status = code
+                    end
 
-        ::CONTINUE::
+                    ngx_print(new_body)
+                    ngx_exit(ngx_ok)
+                end
+                if new_body then
+                    body = new_body
+                end
+            end
+
+            ::CONTINUE::
+        end
     end
     local ok, err = ngx_print(body)
     if not ok then
         return false, err
     end
-    ok, err = ngx_flush(wait == true)
-    if not ok then
-        return false, err
+    if not no_flush then
+        core.log.debug("lua_response_filter: flushing chunk to client")
+        ok, err = ngx_flush(wait == true)
+        if not ok then
+            return false, err
+        end
     end
     return true
 end
diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua
index 8cad693..f7cde8a 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -37,8 +37,8 @@
 local deep_merge = require("apisix.plugins.ai-proxy.merge").deep_merge
 local ngx = ngx
 local ngx_now = ngx.now
-local tonumber = tonumber
 local require = require
+local tonumber = tonumber
 
 local table = table
 local pairs = pairs
@@ -420,7 +420,12 @@
     local body_reader = res.body_reader
     local contents = {}
     local sse_state = { is_first = true }
-    local sse_buf = ""
+    -- SSE framing buffer: accumulate chunks with table.insert to avoid
+    -- allocating a new string on every append; reset to {remainder} after
+    -- each split so the table never grows beyond two elements.
+    -- Initialized with "" so the fast-path (sse_parts[1] == "") activates
+    -- immediately on the first chunk, avoiding an unnecessary table.concat.
+    local sse_parts = {""}
     -- Track whether any output was sent to the client.
     -- When a converter is active but the upstream returns a different SSE format,
     -- all events may be skipped and no output produced, leaving the response
@@ -436,59 +441,146 @@
     end
     local bytes_read = 0
 
+    -- streaming_flush_interval_ms controls both flush strategy and the thread:
+    --   == 0          : no thread; lua_response_filter flushes synchronously
+    --                   per chunk via ngx.flush(true), guaranteeing immediate
+    --                   client delivery.
+    --   >  0 (default: 10): background thread calls ngx.flush(false) every N ms;
+    --                   lua_response_filter skips per-chunk flush for maximum
+    --                   throughput. Useful when the upstream bursts multiple
+    --                   tokens at once.
+    local flush_interval_ms = conf and conf.streaming_flush_interval_ms or 0
+    -- async_flush: true when the interval thread is responsible for flushing
+    local async_flush = flush_interval_ms > 0
+    -- needs_flush is set to true immediately after dispatching a chunk so the
+    -- thread always flushes exactly the data that has been written.  Cleared
+    -- before ngx.flush() so any new chunks written during the flush yield are
+    -- picked up on the next interval rather than silently dropped.
+    local needs_flush = false
+    local flush_thread
+    local flush_err
+    if async_flush then
+        local interval_s = flush_interval_ms / 1000
+        local spawn_err
+        flush_thread, spawn_err = ngx.thread.spawn(function()
+            while true do
+                ngx.sleep(interval_s)
+                if needs_flush then
+                    needs_flush = false
+                    local ok, err = ngx.flush(false)
+                    if not ok then
+                        flush_err = err
+                        return
+                    end
+                    core.log.debug("ai-proxy: flush_thread periodic flush")
+                end
+            end
+        end)
+        if not flush_thread then
+            core.log.error("failed to spawn flush thread: ", spawn_err)
+            async_flush = false
+        end
+    end
+
     local function abort_on_disconnect(flush_err)
         core.log.info("client disconnected during AI streaming, ",
                       "aborting upstream read: ", flush_err)
+        if flush_thread then
+            ngx.thread.kill(flush_thread)
+            flush_thread = nil
+        end
         if res._httpc then
             res._httpc:close()
             res._httpc = nil
         end
         res._upstream_bytes = bytes_read
+        ctx.var.apisix_upstream_response_time = math.floor(
+            (ngx_now() - ctx.llm_request_start_time) * 1000)
         ctx.var.llm_request_done = true
     end
 
+    -- Use a local flag instead of reading ctx.var on every chunk.
+    local first_token_set = false
+
     while true do
+        if flush_err then
+            abort_on_disconnect(flush_err)
+            return
+        end
+
         local chunk, err = body_reader()
-        ctx.var.apisix_upstream_response_time = math.floor((ngx_now() -
-                                         ctx.llm_request_start_time) * 1000)
         if err then
+            ctx.var.apisix_upstream_response_time = math.floor(
+                (ngx_now() - ctx.llm_request_start_time) * 1000)
             core.log.warn("failed to read response chunk: ", err)
             res._upstream_bytes = bytes_read
+            if flush_thread then
+                ngx.thread.kill(flush_thread)
+                flush_thread = nil
+            end
             return transport_http.handle_error(err)
         end
         if not chunk then
-            if #sse_buf > 0 then
+            local sse_rem = table.concat(sse_parts)
+            if #sse_rem > 0 then
                 core.log.warn("dropping incomplete stream frame at EOF, size: ",
-                              #sse_buf)
+                              #sse_rem)
             end
 
             res._upstream_bytes = bytes_read
+            ctx.var.apisix_upstream_response_time = math.floor(
+                (ngx_now() - ctx.llm_request_start_time) * 1000)
             if converter and not output_sent then
+                if flush_thread then
+                    ngx.thread.kill(flush_thread)
+                end
                 local msg = "streaming response completed without producing "
                             .. "any output; the upstream likely returned a "
                             .. "different stream format than the converter expects"
                 core.log.error(msg)
                 return 502, msg
             end
+            -- Final sync flush: ensure the last async-queued bytes reach the client.
+            -- flush_err means client already disconnected; skip to avoid a noisy log.
+            if flush_thread then
+                ngx.thread.kill(flush_thread)
+                flush_thread = nil
+            end
+            if not flush_err then
+                ngx.flush(true)
+            end
             return
         end
 
         bytes_read = bytes_read + #chunk
 
-        if ctx.var.llm_time_to_first_token == "0" then
+        if not first_token_set then
             ctx.var.llm_time_to_first_token = math.floor(
-                                            (ngx_now() - ctx.llm_request_start_time) * 1000)
+                (ngx_now() - ctx.llm_request_start_time) * 1000)
+            first_token_set = true
         end
 
-        sse_buf = sse_buf .. chunk
-        local complete, remainder = framing.split_buf(sse_buf)
+        -- Skip table.concat when there is no carry-over remainder (common case).
+        -- sse_parts is reset to {remainder} after each iteration; when the
+        -- previous chunk ended on a boundary, sse_parts[1] == "" and we can
+        -- hand the new chunk directly to decode_buf without allocating a concat.
+        local candidate
+        if sse_parts[1] == "" then
+            candidate = chunk
+        else
+            sse_parts[#sse_parts + 1] = chunk
+            candidate = table.concat(sse_parts)
+        end
+
+        -- One-pass split + decode: finds all complete SSE events and the
+        -- trailing remainder in a single forward scan (no PCRE, no double scan).
+        local events, remainder = framing.decode_buf(candidate)
         local max_remainder = framing.max_remainder or 1024 * 1024
         if #remainder > max_remainder then
             core.log.warn("stream remainder exceeded ", max_remainder, " bytes, resetting")
             remainder = ""
         end
-        sse_buf = remainder
-        local events = complete ~= "" and framing.decode(complete) or {}
+        sse_parts = {remainder}
         ctx.llm_response_contents_in_chunk = {}
         local converted_chunks = {}
 
@@ -532,24 +624,33 @@
             ::CONTINUE::
         end
 
-        -- Output: converter events or passthrough raw chunk.
-        -- Pass wait=true for synchronous flush so we can detect client disconnection.
+        -- Dispatch chunk downstream.  Plugins run per-chunk so body_filter
+        -- hooks (e.g. content moderation) receive every SSE event individually.
+        -- no_flush=true when the interval thread handles flushing; otherwise
+        -- no_flush=nil + wait=true for synchronous per-chunk delivery guarantee.
+        local no_flush = async_flush or nil
         if converter then
             for _, c in ipairs(converted_chunks) do
-                local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, c, true)
-                output_sent = true
+                local ok, flush_err = plugin.lua_response_filter(
+                    ctx, res.headers, c, no_flush, true)
                 if not ok then
                     abort_on_disconnect(flush_err)
                     return
                 end
+                output_sent = true
             end
         else
-            local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, chunk, true)
-            output_sent = true
+            local ok, flush_err = plugin.lua_response_filter(
+                ctx, res.headers, chunk, no_flush, true)
             if not ok then
                 abort_on_disconnect(flush_err)
                 return
             end
+            output_sent = true
+        end
+        -- Let the interval flush thread know there is unflushed output.
+        if async_flush then
+            needs_flush = true
         end
 
         -- Enforce runaway-upstream safeguards after processing the chunk.
@@ -561,6 +662,10 @@
             limit_hit = "max_response_bytes"
         end
         if limit_hit then
+            if flush_thread then
+                ngx.thread.kill(flush_thread)
+                flush_thread = nil
+            end
             local duration_ms = math.floor((ngx_now() -
                                             ctx.llm_request_start_time) * 1000)
             core.log.warn("aborting AI stream: ", limit_hit, " exceeded;",
@@ -574,6 +679,7 @@
             end
             -- Signal downstream filters (e.g. moderation plugins that defer
             -- work until request completion) that no more content is coming.
+            ctx.var.apisix_upstream_response_time = duration_ms
             ctx.var.llm_request_done = true
             res._upstream_bytes = bytes_read
             if output_sent then
@@ -602,6 +708,7 @@
         -- backpressure, or time out stalled streams. See #13256 for a proper
         -- solution.
         ngx.sleep(0)
+
     end
 end
 
diff --git a/apisix/plugins/ai-proxy/schema.lua b/apisix/plugins/ai-proxy/schema.lua
index c249b41..79384bd 100644
--- a/apisix/plugins/ai-proxy/schema.lua
+++ b/apisix/plugins/ai-proxy/schema.lua
@@ -267,6 +267,16 @@
             description = "keepalive timeout in milliseconds",
         },
         keepalive_pool = {type = "integer", minimum = 1, default = 30},
+        streaming_flush_interval_ms = {
+            type = "integer",
+            minimum = 0,
+            default = 10,
+            description = "A background thread flushes the output buffer every N "
+                       .. "milliseconds (async flush). Useful when the upstream bursts "
+                       .. "multiple tokens at once and you need to bound client latency. "
+                       .. "Set to 0 to disable the background thread and flush each "
+                       .. "chunk synchronously inline.",
+        },
         ssl_verify = {type = "boolean", default = true },
         override = override_schema,
     },
@@ -353,6 +363,16 @@
             description = "keepalive timeout in milliseconds",
         },
         keepalive_pool = {type = "integer", minimum = 1, default = 30},
+        streaming_flush_interval_ms = {
+            type = "integer",
+            minimum = 0,
+            default = 10,
+            description = "A background thread flushes the output buffer every N "
+                       .. "milliseconds (async flush). Useful when the upstream bursts "
+                       .. "multiple tokens at once and you need to bound client latency. "
+                       .. "Set to 0 to disable the background thread and flush each "
+                       .. "chunk synchronously inline.",
+        },
         ssl_verify = {type = "boolean", default = true },
     },
     required = {"instances"},
diff --git a/apisix/plugins/ai-transport/aws-eventstream.lua b/apisix/plugins/ai-transport/aws-eventstream.lua
index 09f0178..0ae3a19 100644
--- a/apisix/plugins/ai-transport/aws-eventstream.lua
+++ b/apisix/plugins/ai-transport/aws-eventstream.lua
@@ -289,4 +289,15 @@
 end
 
 
+--- Combined split + decode in one call, matching the interface expected by
+-- ai-providers/base.lua.
+-- @param buf string  Accumulated bytes from the upstream socket.
+-- @return table  Array of decoded event tables (same as decode()).
+-- @return string  Trailing bytes that did not form a complete frame.
+function _M.decode_buf(buf)
+    local complete, remainder = _M.split_buf(buf)
+    return _M.decode(complete), remainder
+end
+
+
 return _M
diff --git a/apisix/plugins/ai-transport/sse.lua b/apisix/plugins/ai-transport/sse.lua
index ad8ab27..a8f6709 100644
--- a/apisix/plugins/ai-transport/sse.lua
+++ b/apisix/plugins/ai-transport/sse.lua
@@ -17,73 +17,141 @@
 
 --- SSE (Server-Sent Events) codec and buffer management.
 
-local core = require("apisix.core")
 local table = require("apisix.core.table")
-local ngx_re = require("ngx.re")
 local tonumber = tonumber
 local tostring = tostring
-local ipairs = ipairs
 
 local _M = {
-    -- Cap on bytes split_buf may leave in `remainder`. Used by the streaming
-    -- loop in ai-providers.base to bound the buffer when frames don't
-    -- complete. SSE frames are small (text events delimited by blank lines),
-    -- so 1 MiB is plenty.
+    -- Cap on bytes that decode_buf (and split_buf) may leave in `remainder`.
+    -- Read by the streaming loop in ai-providers/base.lua to bound the buffer
+    -- when frames don't complete. SSE frames are small (text events delimited
+    -- by blank lines), so 1 MiB is plenty.
     max_remainder = 1024 * 1024,
 }
 
 
+-- Parse one raw SSE event block (text between two boundary markers) into
+-- an event table.  Returns nil for empty / comment-only blocks.
+local function parse_raw_event(raw)
+    local event = {
+        type  = "message",
+        data  = {},
+        id    = nil,
+        retry = nil,
+    }
+    local has_field = false
+
+    local pos = 1
+    local raw_len = #raw
+    while pos <= raw_len do
+        local nl = raw:find("\n", pos, true)
+        local line
+        if nl then
+            line = raw:sub(pos, nl - 1)
+            pos  = nl + 1
+        else
+            line = raw:sub(pos)
+            pos  = raw_len + 1
+        end
+
+        -- Strip trailing \r for CRLF line endings.
+        if line:sub(-1) == "\r" then
+            line = line:sub(1, -2)
+        end
+
+        if #line == 0 then
+            -- blank line inside a raw block (shouldn't happen after split, skip)
+            goto NEXT_LINE
+        end
+
+        -- Find the field:value separator.  Plain search avoids pattern engine.
+        local colon = line:find(":", 1, true)
+        if not colon then goto NEXT_LINE end
+        -- Lines starting with ":" are SSE comments; skip without setting has_field.
+        if colon == 1 then goto NEXT_LINE end
+
+        local name  = line:sub(1, colon - 1):lower()
+        local value = line:sub(colon + 1)
+        -- Strip a single leading space per the SSE spec.
+        if value:sub(1, 1) == " " then
+            value = value:sub(2)
+        end
+
+        has_field = true
+        if name == "data" then
+            table.insert(event.data, value)
+        elseif name == "event" then
+            event.type = value
+        elseif name == "id" then
+            event.id = value
+        elseif name == "retry" then
+            event.retry = tonumber(value)
+        end
+
+        ::NEXT_LINE::
+    end
+
+    if not has_field then
+        return nil
+    end
+    event.data = table.concat(event.data, "\n")
+    return event
+end
+
+
+-- Find the next event boundary (\n\n or \r\n\r\n) starting at `pos`.
+-- Returns (ev_end, next_pos) or (nil, nil) when none found.
+-- ev_end   = last byte of the event content (before the boundary)
+-- next_pos = first byte after the boundary
+local function next_boundary(buf, pos)
+    local p_lf   = buf:find("\n\n",     pos, true)
+    local p_crlf = buf:find("\r\n\r\n", pos, true)
+
+    if p_lf and p_crlf then
+        if p_lf <= p_crlf then
+            return p_lf - 1, p_lf + 2
+        else
+            return p_crlf - 1, p_crlf + 4
+        end
+    elseif p_lf then
+        return p_lf - 1, p_lf + 2
+    elseif p_crlf then
+        return p_crlf - 1, p_crlf + 4
+    end
+    return nil, nil
+end
+
+
 --- Decode an SSE text chunk into a list of event tables.
 -- Each event has: type (string), data (string), id (string|nil), retry (number|nil).
+-- The chunk is expected to contain only complete events (no trailing partial event).
+-- Replaces the former ngx_re.split-based implementation; no PCRE overhead.
 function _M.decode(chunk)
     local events = {}
-
-    if not chunk then
+    if not chunk or chunk == "" then
         return events
     end
 
-    local raw_events, err = ngx_re.split(chunk, "\\r?\\n\\r?\\n")
-    if not raw_events then
-        core.log.warn("failed to split SSE chunk: ", err)
-        return events
-    end
-    for _, raw_event in ipairs(raw_events) do
-        local event = {
-            type = "message",
-            data = {},
-            id = nil,
-            retry = nil
-        }
-        local lines, err = ngx_re.split(raw_event, "\\r?\\n")
-        if not lines then
-            core.log.warn("failed to split event lines: ", err)
-            goto CONTINUE
-        end
-
-        for _, line in ipairs(lines) do
-            local name, value = line:match("^([^:]+): ?(.*)$")
-            if not name then goto NEXT_LINE end
-
-            name = name:lower()
-
-            if name == "event" then
-                event.type = value
-            elseif name == "data" then
-                table.insert(event.data, value)
-            elseif name == "id" then
-                event.id = value
-            elseif name == "retry" then
-                event.retry = tonumber(value)
+    local pos = 1
+    local len = #chunk
+    while pos <= len do
+        local ev_end, next_pos = next_boundary(chunk, pos)
+        if not ev_end then
+            -- No trailing blank line: treat the remaining content as a complete
+            -- event for backward compatibility with callers that pass full response
+            -- bodies or per-chunk bodies without a terminal blank line.
+            local event = parse_raw_event(chunk:sub(pos))
+            if event then
+                table.insert(events, event)
             end
-
-            ::NEXT_LINE::
+            break
         end
-
-        event.data = table.concat(event.data, "\n")
-
-        table.insert(events, event)
-
-        ::CONTINUE::
+        local raw = chunk:sub(pos, ev_end)
+        local event = parse_raw_event(raw)
+        if event then
+            table.insert(events, event)
+        end
+        pos = next_pos
     end
 
     return events
@@ -116,8 +184,36 @@
 end
 
 
---- Split an SSE buffer at the last complete event boundary.
--- Returns (complete_events, remainder) where complete_events includes
+--- Decode a raw SSE buffer in one forward pass, returning (events, remainder).
+-- Combines the split_buf + decode two-step into a single scan for use in
+-- high-throughput loops (e.g. ai-providers/base.lua).
+-- `remainder` holds any trailing bytes that did not end with a boundary marker.
+function _M.decode_buf(buf)
+    local events = {}
+    local pos = 1
+    local len = #buf
+    local last_complete_pos = 1
+
+    while pos <= len do
+        local ev_end, next_pos = next_boundary(buf, pos)
+        if not ev_end then
+            break
+        end
+        local raw = buf:sub(pos, ev_end)
+        local event = parse_raw_event(raw)
+        if event then
+            table.insert(events, event)
+        end
+        last_complete_pos = next_pos
+        pos = next_pos
+    end
+
+    local remainder = (last_complete_pos <= len) and buf:sub(last_complete_pos) or ""
+    return events, remainder
+end
+
+
+-- Returns (complete_data, remainder) where complete_data includes
 -- all data up to and including the last "\n\n" or "\r\n\r\n" boundary,
 -- and remainder holds any trailing incomplete event data.
 -- Returns ("", buf) when no boundary is found.
diff --git a/docs/en/latest/plugins/ai-proxy.md b/docs/en/latest/plugins/ai-proxy.md
index ab253df..e3e1b59 100644
--- a/docs/en/latest/plugins/ai-proxy.md
+++ b/docs/en/latest/plugins/ai-proxy.md
@@ -121,6 +121,7 @@
 | keepalive_timeout | integer | False | 60000  | ≥ 1000                                   | Keepalive timeout in milliseconds when connecting to the LLM service. |
 | keepalive_pool | integer | False    | 30       | ≥ 1                                      | Keepalive pool size for the LLM service connection. |
 | ssl_verify     | boolean | False    | true   |                                          | If true, verifies the LLM service's certificate. |
+| streaming_flush_interval_ms | integer | False | 10 | ≥ 0 | Interval in milliseconds for the background flush thread. When `> 0` (default: `10`), a background timer calls `ngx.flush(false)` every N ms, batching output for bursty upstreams. When `0`, the background thread is disabled and each chunk is flushed synchronously via `ngx.flush(true)`, guaranteeing immediate client delivery. |
 
 ## Provider-aware `max_tokens` mapping
 
diff --git a/docs/zh/latest/plugins/ai-proxy.md b/docs/zh/latest/plugins/ai-proxy.md
index 2d49c0c..ba01056 100644
--- a/docs/zh/latest/plugins/ai-proxy.md
+++ b/docs/zh/latest/plugins/ai-proxy.md
@@ -121,6 +121,7 @@
 | keepalive_timeout | integer | 否 | 60000  | ≥ 1000                                   | 连接到 LLM 服务时的保活超时时间(毫秒)。 |
 | keepalive_pool | integer | 否    | 30       | ≥ 1                                      | LLM 服务连接的保活池大小。 |
 | ssl_verify     | boolean | 否    | true   |                                          | 如果为 true,验证 LLM 服务的证书。 |
+| streaming_flush_interval_ms | integer | 否 | 10 | ≥ 0 | 后台刷新线程的间隔时间(毫秒)。`> 0`(默认值:`10`)时,后台定时器每隔 N 毫秒调用一次 `ngx.flush(false)`,适合上游批量发送 token 的场景。设为 `0` 时禁用后台线程,改为每个 chunk 同步调用 `ngx.flush(true)` 立即刷新。 |
 
 ## Provider-aware `max_tokens` mapping
 
diff --git a/t/plugin/ai-proxy-client-disconnect.t b/t/plugin/ai-proxy-client-disconnect.t
index 654da7e..b144b57 100644
--- a/t/plugin/ai-proxy-client-disconnect.t
+++ b/t/plugin/ai-proxy-client-disconnect.t
@@ -222,3 +222,137 @@
 ^ok, upstream aborted after ~\d+ chunks$
 --- error_log
 client disconnected during AI streaming
+
+
+
+=== TEST 3: set route for async flush disconnect test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/2',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/anything-async-flush",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": "http://localhost:7750"
+                            },
+                            "ssl_verify": false,
+                            "streaming_flush_interval_ms": 50
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 4: async flush disconnect still aborts upstream read early
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+            if not ok then
+                ngx.status = 500
+                ngx.say("connect failed: ", err)
+                return
+            end
+
+            local res, err = httpc:request({
+                method = "POST",
+                headers = { ["Content-Type"] = "application/json" },
+                path = "/anything-async-flush",
+                body = [[{"messages": [{"role": "user", "content": "hi"}]}]],
+            })
+            if not res then
+                ngx.status = 500
+                ngx.say("request failed: ", err)
+                return
+            end
+
+            for i = 1, 3 do
+                local chunk, rerr = res.body_reader()
+                if rerr or not chunk then
+                    ngx.status = 500
+                    ngx.say("unexpected end of stream at chunk ", i, ": ", rerr)
+                    return
+                end
+            end
+            httpc:close()
+
+            ngx.sleep(1.0)
+
+            local probe = http.new()
+            ok, err = probe:connect({ scheme = "http", host = "localhost", port = 7750 })
+            if not ok then
+                ngx.status = 500
+                ngx.say("probe connect failed: ", err)
+                return
+            end
+            local probe_res, probe_err = probe:request({
+                method = "GET",
+                path = "/chunks",
+                headers = { Host = "localhost" },
+            })
+            if not probe_res then
+                ngx.status = 500
+                ngx.say("probe request failed: ", probe_err)
+                return
+            end
+            local count_str = probe_res:read_body()
+            probe:close()
+
+            if probe_res.status ~= 200 then
+                ngx.status = 500
+                ngx.say("probe status unexpected: ", probe_res.status)
+                return
+            end
+
+            local count = tonumber(count_str)
+            if not count then
+                ngx.status = 500
+                ngx.say("invalid probe response: ", count_str or "nil")
+                return
+            end
+
+            if count > 15 then
+                ngx.status = 500
+                ngx.say("upstream was not aborted promptly, chunks: ", count)
+                return
+            end
+            ngx.say("ok, upstream aborted after ~", count, " chunks")
+        }
+    }
+--- response_body_like
+^ok, upstream aborted after ~\d+ chunks$
+--- error_log
+client disconnected during AI streaming
+--- no_error_log
+final flush failed
diff --git a/t/plugin/ai-proxy-flush.t b/t/plugin/ai-proxy-flush.t
new file mode 100644
index 0000000..eac078e
--- /dev/null
+++ b/t/plugin/ai-proxy-flush.t
@@ -0,0 +1,320 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+log_level("debug");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    my $http_config = $block->http_config // <<_EOC_;
+        server {
+            server_name mock_openai_sse;
+            listen 7751;
+
+            default_type 'text/event-stream';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    local args = ngx.req.get_uri_args()
+                    local delay = args["delay"]
+                    ngx.header["Content-Type"] = "text/event-stream"
+                    local events = {
+                        'data: {"id":"1","choices":[{"delta":{"role":"assistant","content":""},"index":0,"finish_reason":null}]}\\n\\n',
+                        'data: {"id":"1","choices":[{"delta":{"content":"Hello"},"index":0,"finish_reason":null}]}\\n\\n',
+                        'data: {"id":"1","choices":[{"delta":{"content":" world"},"index":0,"finish_reason":null}]}\\n\\n',
+                        'data: {"id":"1","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}\\n\\n',
+                        'data: [DONE]\\n\\n',
+                    }
+                    for _, ev in ipairs(events) do
+                        ngx.print(ev)
+                        ngx.flush(true)
+                        if delay then
+                            ngx.sleep(0.05)
+                        end
+                    end
+                }
+            }
+        }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create route with streaming_flush_interval_ms=0 (per-chunk sync flush)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "uri": "/flush-default",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer test-key"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": "http://localhost:7751/v1/chat/completions?delay=true"
+                            },
+                            "ssl_verify": false,
+                            "streaming_flush_interval_ms": 0
+                        }
+                    }
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: interval_ms=0 (per-chunk flush) - flush_thread must NOT appear, sync flush per chunk
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+            if not ok then
+                ngx.status = 500
+                ngx.say("connect: " .. err)
+                return
+            end
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/flush-default",
+                headers = { ["Content-Type"] = "application/json" },
+                body = '{"messages":[{"role":"user","content":"hi"}],"model":"gpt-4","stream":true}',
+            })
+            if not res then
+                ngx.status = 500
+                ngx.say("request: " .. err)
+                return
+            end
+
+            local body = res:read_body()
+            if body:find("Hello", 1, true) and
+               body:find(" world", 1, true) and
+               body:find("[DONE]", 1, true) then
+                ngx.say("ok")
+            else
+                ngx.say("FAIL: unexpected body: " .. body:sub(1, 500))
+            end
+        }
+    }
+--- response_body
+ok
+--- no_error_log
+ai-proxy: flush_thread periodic flush
+--- grep_error_log eval
+qr/lua_response_filter: flushing chunk to client/
+--- grep_error_log_out
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+lua_response_filter: flushing chunk to client
+
+
+
+=== TEST 3: create route with streaming_flush_interval_ms=50 (background thread flush)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/2',
+                ngx.HTTP_PUT,
+                [[{
+                    "uri": "/flush-interval",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer test-key"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": "http://localhost:7751/v1/chat/completions?delay=true"
+                            },
+                            "ssl_verify": false,
+                            "streaming_flush_interval_ms": 50
+                        }
+                    }
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 4: interval_ms=50 (background thread flush) - flush_thread log must appear
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+            if not ok then
+                ngx.status = 500
+                ngx.say("connect: " .. err)
+                return
+            end
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/flush-interval",
+                headers = { ["Content-Type"] = "application/json" },
+                body = '{"messages":[{"role":"user","content":"hi"}],"model":"gpt-4","stream":true}',
+            })
+            if not res then
+                ngx.status = 500
+                ngx.say("request: " .. err)
+                return
+            end
+
+            local body = res:read_body()
+            if body:find("Hello", 1, true) and
+               body:find(" world", 1, true) and
+               body:find("[DONE]", 1, true) then
+                ngx.say("ok")
+            else
+                ngx.say("FAIL: unexpected body: " .. body:sub(1, 500))
+            end
+        }
+    }
+--- response_body
+ok
+--- error_log
+ai-proxy: flush_thread periodic flush
+--- no_error_log
+lua_response_filter: flushing chunk to client
+
+
+
+=== TEST 5: streaming_flush_interval_ms schema rejects negative value
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.ai-proxy")
+            local ok, err = plugin.check_schema({
+                provider = "openai",
+                auth = { header = { Authorization = "Bearer x" } },
+                options = { model = "gpt-4" },
+                streaming_flush_interval_ms = -1,
+            })
+            if ok then
+                ngx.say("should have failed")
+            else
+                ngx.say("rejected: " .. tostring(err))
+            end
+        }
+    }
+--- response_body_like
+rejected: .*
+
+
+
+=== TEST 6: streaming_flush_interval_ms=0 is accepted (disables background thread)
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.ai-proxy")
+            local ok, err = plugin.check_schema({
+                provider = "openai",
+                auth = { header = { Authorization = "Bearer x" } },
+                options = { model = "gpt-4" },
+                streaming_flush_interval_ms = 0,
+            })
+            if ok then
+                ngx.say("ok")
+            else
+                ngx.say("FAIL: " .. tostring(err))
+            end
+        }
+    }
+--- response_body
+ok
+
+
+
+=== TEST 7: omitting streaming_flush_interval_ms applies default value of 10
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.ai-proxy")
+            local conf = {
+                provider = "openai",
+                auth = { header = { Authorization = "Bearer x" } },
+                options = { model = "gpt-4" },
+            }
+            local ok, err = plugin.check_schema(conf)
+            if not ok then
+                ngx.say("FAIL: " .. tostring(err))
+                return
+            end
+            ngx.say(conf.streaming_flush_interval_ms)
+        }
+    }
+--- response_body
+10