[type:fix] fix response writer plugin. (#5228)
* [type:fix] fix response writer plugin.
* [type:fix] fix testConcurrentTokenBucket bug.
* [type:fix] fix testConcurrentTokenBucket bug.
* [type:fix] fix testConcurrentTokenBucket bug.
diff --git a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java
index 3b37080..a5540ef 100644
--- a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java
+++ b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/NettyClientMessageWriter.java
@@ -21,6 +21,10 @@
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.enums.RpcTypeEnum;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
+import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
+import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
+import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpStatus;
@@ -55,7 +59,8 @@
return chain.execute(exchange).doOnError(throwable -> cleanup(exchange)).then(Mono.defer(() -> {
Connection connection = exchange.getAttribute(Constants.CLIENT_RESPONSE_CONN_ATTR);
if (Objects.isNull(connection)) {
- return Mono.empty();
+ Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR);
+ return WebFluxResultUtils.result(exchange, error);
}
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
@@ -73,7 +78,7 @@
// watcher httpStatus
final Consumer<HttpStatus> consumer = exchange.getAttribute(Constants.WATCHER_HTTP_STATUS);
Optional.ofNullable(consumer).ifPresent(c -> c.accept(response.getStatusCode()));
- return responseMono;
+ return responseMono.onErrorResume(error -> releaseIfNotConsumed(body, error));
})).doOnCancel(() -> cleanup(exchange));
}
@@ -89,6 +94,10 @@
}
}
+ private static <T> Mono<T> releaseIfNotConsumed(final Flux<NettyDataBuffer> dataBufferDody, final Throwable ex) {
+ return dataBufferDody != null ? dataBufferDody.map(DataBufferUtils::release).then(Mono.error(ex)) : Mono.error(ex);
+ }
+
private boolean isStreamingMediaType(@Nullable final MediaType contentType) {
return Objects.nonNull(contentType) && this.streamingMediaTypes.stream().anyMatch(contentType::isCompatibleWith);
}
diff --git a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java
index d981c1a..0ae9538 100644
--- a/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java
+++ b/shenyu-plugin/shenyu-plugin-response/src/main/java/org/apache/shenyu/plugin/response/strategy/WebClientMessageWriter.java
@@ -25,6 +25,7 @@
import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -80,7 +81,7 @@
Mono<Void> responseMono;
if (Objects.nonNull(fluxResponseEntity.getBody())) {
responseMono = exchange.getResponse().writeWith(fluxResponseEntity.getBody())
- .onErrorResume(error -> releaseIfNotConsumed(fluxResponseEntity, error))
+ .onErrorResume(error -> releaseIfNotConsumed(fluxResponseEntity.getBody(), error))
.doOnCancel(() -> clean(exchange));
} else {
responseMono = exchange.getResponse().writeWith(Mono.empty());
@@ -120,14 +121,14 @@
response.getHeaders().putAll(httpHeaders);
}
- private static <T> Mono<T> releaseIfNotConsumed(final ResponseEntity<Flux<DataBuffer>> fluxResponseEntity, final Throwable ex) {
- return fluxResponseEntity.getBody().onErrorResume(ex2 -> Mono.empty()).then(Mono.error(ex));
+ private static <T> Mono<T> releaseIfNotConsumed(final Flux<DataBuffer> dataBufferDody, final Throwable ex) {
+ return dataBufferDody.map(DataBufferUtils::release).then(Mono.error(ex));
}
private void clean(final ServerWebExchange exchange) {
ResponseEntity<Flux<DataBuffer>> fluxResponseEntity = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.nonNull(fluxResponseEntity) && Objects.nonNull(fluxResponseEntity.getBody())) {
- fluxResponseEntity.getBody().subscribe();
+ fluxResponseEntity.getBody().map(DataBufferUtils::release).subscribe();
}
}