[FLINK-18359] Log failures in handler instead of ElasticsearchSinkBase
This allows more control for the handler whether or not to log error
messages. In some cases, users know that they will get a lot of
failures, for example when back-filling existing data in ES. For those,
you don't want your log flooded with ERROR messages.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2811008..d19fba6 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -39,8 +39,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.HashMap;
@@ -71,8 +69,6 @@
private static final long serialVersionUID = -1007596293618451942L;
- private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class);
-
// ------------------------------------------------------------------------
// Internal bulk processor configuration
// ------------------------------------------------------------------------
@@ -408,8 +404,6 @@
itemResponse = response.getItems()[i];
failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
- LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
-
restStatus = itemResponse.getFailure().getStatus();
actionRequest = request.requests().get(i);
if (restStatus == null) {
@@ -441,8 +435,6 @@
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
-
try {
for (DocWriteRequest writeRequest : request.requests()) {
if (writeRequest instanceof ActionRequest) {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
index 4726dc1..c076fc8 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -22,6 +22,8 @@
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
@@ -31,8 +33,11 @@
private static final long serialVersionUID = 737941343410827885L;
+ private static final Logger LOG = LoggerFactory.getLogger(NoOpFailureHandler.class);
+
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
// simply fail the sink
throw failure;
}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index ca710cb..98b58f9 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -25,6 +25,8 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary
@@ -36,8 +38,11 @@
private static final long serialVersionUID = -7423562912824511906L;
+ private static final Logger LOG = LoggerFactory.getLogger(RetryRejectedExecutionFailureHandler.class);
+
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else {