[type:fix] fix response writer plugin. (#5228)

* [type:fix] fix response writer plugin.

* [type:fix] fix testConcurrentTokenBucket bug.

* [type:fix] fix testConcurrentTokenBucket bug.

* [type:fix] fix testConcurrentTokenBucket bug.
diff --git a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java
index 3b37080..a5540ef 100644
--- a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java
+++ b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java
@@ -21,6 +21,10 @@
 import org.apache.shenyu.common.constant.Constants;
 import org.apache.shenyu.common.enums.RpcTypeEnum;
 import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
+import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
+import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
+import org.springframework.core.io.buffer.DataBufferUtils;
 import org.springframework.core.io.buffer.NettyDataBuffer;
 import org.springframework.core.io.buffer.NettyDataBufferFactory;
 import org.springframework.http.HttpStatus;
@@ -55,7 +59,8 @@
         return chain.execute(exchange).doOnError(throwable -> cleanup(exchange)).then(Mono.defer(() -> {
             Connection connection = exchange.getAttribute(Constants.CLIENT_RESPONSE_CONN_ATTR);
             if (Objects.isNull(connection)) {
-                return Mono.empty();
+                Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR);
+                return WebFluxResultUtils.result(exchange, error);
             }
             ServerHttpResponse response = exchange.getResponse();
             NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
@@ -73,7 +78,7 @@
             // watcher httpStatus
             final Consumer<HttpStatus> consumer = exchange.getAttribute(Constants.WATCHER_HTTP_STATUS);
             Optional.ofNullable(consumer).ifPresent(c -> c.accept(response.getStatusCode()));
-            return responseMono;
+            return responseMono.onErrorResume(error -> releaseIfNotConsumed(body, error));
         })).doOnCancel(() -> cleanup(exchange));
     }
     
@@ -89,6 +94,10 @@
         }
     }
 
+    private static <T> Mono<T> releaseIfNotConsumed(final Flux<NettyDataBuffer> dataBufferDody, final Throwable ex) {
+        return dataBufferDody != null ? dataBufferDody.map(DataBufferUtils::release).then(Mono.error(ex)) : Mono.error(ex);
+    }
+
     private boolean isStreamingMediaType(@Nullable final MediaType contentType) {
         return Objects.nonNull(contentType) && this.streamingMediaTypes.stream().anyMatch(contentType::isCompatibleWith);
     }
diff --git a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java
index d981c1a..0ae9538 100644
--- a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java
+++ b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java
@@ -25,6 +25,7 @@
 import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
 import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
 import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
@@ -80,7 +81,7 @@
             Mono<Void> responseMono;
             if (Objects.nonNull(fluxResponseEntity.getBody())) {
                 responseMono = exchange.getResponse().writeWith(fluxResponseEntity.getBody())
-                        .onErrorResume(error -> releaseIfNotConsumed(fluxResponseEntity, error))
+                        .onErrorResume(error -> releaseIfNotConsumed(fluxResponseEntity.getBody(), error))
                         .doOnCancel(() -> clean(exchange));
             } else {
                 responseMono = exchange.getResponse().writeWith(Mono.empty());
@@ -120,14 +121,14 @@
         response.getHeaders().putAll(httpHeaders);
     }
 
-    private static <T> Mono<T> releaseIfNotConsumed(final ResponseEntity<Flux<DataBuffer>> fluxResponseEntity, final Throwable ex) {
-        return fluxResponseEntity.getBody().onErrorResume(ex2 -> Mono.empty()).then(Mono.error(ex));
+    private static <T> Mono<T> releaseIfNotConsumed(final Flux<DataBuffer> dataBufferDody, final Throwable ex) {
+        return dataBufferDody.map(DataBufferUtils::release).then(Mono.error(ex));
     }
 
     private void clean(final ServerWebExchange exchange) {
         ResponseEntity<Flux<DataBuffer>> fluxResponseEntity = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
         if (Objects.nonNull(fluxResponseEntity) && Objects.nonNull(fluxResponseEntity.getBody())) {
-            fluxResponseEntity.getBody().subscribe();
+            fluxResponseEntity.getBody().map(DataBufferUtils::release).subscribe();
         }
     }