[FLINK-18359] Log failures in handler instead of ElasticsearchSinkBase

This allows more control for the handler whether or not to log error
messages. In some cases, users know that they will get a lot of
failures, for example when back-filling existing data in ES. For those,
you don't want your log flooded with ERROR messages.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2811008..d19fba6 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -39,8 +39,6 @@
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.rest.RestStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -71,8 +69,6 @@
 
 	private static final long serialVersionUID = -1007596293618451942L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class);
-
 	// ------------------------------------------------------------------------
 	//  Internal bulk processor configuration
 	// ------------------------------------------------------------------------
@@ -408,8 +404,6 @@
 						itemResponse = response.getItems()[i];
 						failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
 						if (failure != null) {
-							LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
-
 							restStatus = itemResponse.getFailure().getStatus();
 							actionRequest = request.requests().get(i);
 							if (restStatus == null) {
@@ -441,8 +435,6 @@
 
 		@Override
 		public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-			LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
-
 			try {
 				for (DocWriteRequest writeRequest : request.requests()) {
 					if (writeRequest instanceof ActionRequest) {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
index 4726dc1..c076fc8 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -22,6 +22,8 @@
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 
 import org.elasticsearch.action.ActionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures.
@@ -31,8 +33,11 @@
 
 	private static final long serialVersionUID = 737941343410827885L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(NoOpFailureHandler.class);
+
 	@Override
 	public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+		LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
 		// simply fail the sink
 		throw failure;
 	}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index ca710cb..98b58f9 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -25,6 +25,8 @@
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary
@@ -36,8 +38,11 @@
 
 	private static final long serialVersionUID = -7423562912824511906L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(RetryRejectedExecutionFailureHandler.class);
+
 	@Override
 	public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+		LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
 		if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
 			indexer.add(action);
 		} else {