[FLINK-18006] Always overwrite RestClientFactory in ElasticsearchXDynamicSink

We always overwrite the RestClientFactory in order to workaround an
issue with shading classes in lambdas deserialization method. That way
we never use the default lambda from ElasticsearchSink$Builder which
cannot be deserialized when used from a
flink-sql-connector-elasticsearch module due to shading.

This closes #12455
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index bedfbef..680cb2c 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -136,8 +136,9 @@
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
 			config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
 
-			config.getPathPrefix()
-				.ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+			// we must overwrite the default factory which is defined with a lambda because of a bug
+			// in shading lambda serialization shading see FLINK-18006
+			builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
 
 			final ElasticsearchSink<RowData> sink = builder.build();
 
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
index 408673e..7aa52ea 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -136,8 +136,9 @@
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
 			config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
 
-			config.getPathPrefix()
-				.ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+			// we must overwrite the default factory which is defined with a lambda because of a bug
+			// in shading lambda serialization shading see FLINK-18006
+			builder.setRestClientFactory(new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
 
 			final ElasticsearchSink<RowData> sink = builder.build();