[FLINK-11720][connectors] Bump ElasticSearch5 to 5.3.3
The ES5 connector has caused numerous issues in end to end and integration tests (on CI and during release test).
The NOTICE file has been updated according to this maven shade output:
[INFO] --- maven-shade-plugin:3.1.1:shade (shade-flink) @ flink-connector-elasticsearch5_2.11 ---
[INFO] Including org.apache.flink:flink-connector-elasticsearch-base_2.11:jar:1.11-SNAPSHOT in the shaded jar.
[INFO] Including org.elasticsearch.client:transport:jar:5.3.3 in the shaded jar.
[INFO] Including org.elasticsearch:elasticsearch:jar:5.3.3 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-core:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-analyzers-common:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-backward-codecs:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-grouping:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-highlighter:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-join:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-memory:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-misc:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-queries:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-queryparser:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-sandbox:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-spatial:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-spatial-extras:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-spatial3d:jar:6.4.2 in the shaded jar.
[INFO] Including org.apache.lucene:lucene-suggest:jar:6.4.2 in the shaded jar.
[INFO] Including org.elasticsearch:securesm:jar:1.1 in the shaded jar.
[INFO] Including net.sf.jopt-simple:jopt-simple:jar:5.0.2 in the shaded jar.
[INFO] Including com.carrotsearch:hppc:jar:0.7.1 in the shaded jar.
[INFO] Including joda-time:joda-time:jar:2.5 in the shaded jar.
[INFO] Including org.yaml:snakeyaml:jar:1.25 in the shaded jar.
[INFO] Including com.fasterxml.jackson.core:jackson-core:jar:2.10.1 in the shaded jar.
[INFO] Including com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.10.1 in the shaded jar.
[INFO] Including com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.10.1 in the shaded jar.
[INFO] Including com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.10.1 in the shaded jar.
[INFO] Including com.tdunning:t-digest:jar:3.0 in the shaded jar.
[INFO] Including org.hdrhistogram:HdrHistogram:jar:2.1.6 in the shaded jar.
[INFO] Including net.java.dev.jna:jna:jar:4.2.2 in the shaded jar.
[INFO] Including org.elasticsearch.plugin:transport-netty3-client:jar:5.3.3 in the shaded jar.
[INFO] Including org.elasticsearch.plugin:transport-netty4-client:jar:5.3.3 in the shaded jar.
[INFO] Including io.netty:netty-buffer:jar:4.1.7.Final in the shaded jar.
[INFO] Including io.netty:netty-codec:jar:4.1.7.Final in the shaded jar.
[INFO] Including io.netty:netty-codec-http:jar:4.1.7.Final in the shaded jar.
[INFO] Including io.netty:netty-common:jar:4.1.7.Final in the shaded jar.
[INFO] Including io.netty:netty-handler:jar:4.1.7.Final in the shaded jar.
[INFO] Including io.netty:netty-resolver:jar:4.1.7.Final in the shaded jar.
[INFO] Including io.netty:netty-transport:jar:4.1.7.Final in the shaded jar.
[INFO] Including org.elasticsearch.plugin:reindex-client:jar:5.3.3 in the shaded jar.
[INFO] Including org.elasticsearch.client:rest:jar:5.3.3 in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpclient:jar:4.5.3 in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpcore:jar:4.4.6 in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpasyncclient:jar:4.1.2 in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpcore-nio:jar:4.4.5 in the shaded jar.
[INFO] Including commons-codec:commons-codec:jar:1.10 in the shaded jar.
[INFO] Including commons-logging:commons-logging:jar:1.1.3 in the shaded jar.
[INFO] Including org.elasticsearch.plugin:lang-mustache-client:jar:5.3.3 in the shaded jar.
[INFO] Including com.github.spullara.mustache.java:compiler:jar:0.9.3 in the shaded jar.
[INFO] Including org.elasticsearch.plugin:percolator-client:jar:5.3.3 in the shaded jar.
[INFO] Including io.netty:netty:jar:3.10.6.Final in the shaded jar.
[INFO] Including org.apache.flink:force-shading:jar:1.11-SNAPSHOT in the shaded jar.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /Users/robert/Projects/flink/flink-connectors/flink-connector-elasticsearch5/target/flink-connector-elasticsearch5_2.11-1.11-SNAPSHOT.jar with /Users/robert/Projects/flink/flink-connectors/flink-connector-elasticsearch5/target/flink-connector-elasticsearch5_2.11-1.11-SNAPSHOT-shaded.jar
[INFO] Dependency-reduced POM written at: /Users/robert/Projects/flink/flink-connectors/flink-connector-elasticsearch5/target/dependency-reduced-pom.xml
[INFO]
[INFO] --- maven-surefire-plugin:2.22.1:test (integration-tests) @ flink-connector-elasticsearch5_2.11 ---
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
index 113a475..f5c4252 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -37,7 +37,7 @@
<!-- Allow users to pass custom connector versions -->
<properties>
- <elasticsearch.version>5.1.2</elasticsearch.version>
+ <elasticsearch.version>5.3.3</elasticsearch.version>
</properties>
<dependencies>
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index c8df2c0..c285112 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -29,6 +29,7 @@
import org.apache.flink.util.InstantiationUtil;
import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -389,6 +390,7 @@
}
private class BulkProcessorListener implements BulkProcessor.Listener {
+
@Override
public void beforeBulk(long executionId, BulkRequest request) { }
@@ -398,6 +400,7 @@
BulkItemResponse itemResponse;
Throwable failure;
RestStatus restStatus;
+ DocWriteRequest actionRequest;
try {
for (int i = 0; i < response.getItems().length; i++) {
@@ -407,10 +410,19 @@
LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
restStatus = itemResponse.getFailure().getStatus();
+ actionRequest = request.requests().get(i);
if (restStatus == null) {
- failureHandler.onFailure(request.requests().get(i), failure, -1, failureRequestIndexer);
+ if (actionRequest instanceof ActionRequest) {
+ failureHandler.onFailure((ActionRequest) actionRequest, failure, -1, failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
+ }
} else {
- failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), failureRequestIndexer);
+ if (actionRequest instanceof ActionRequest) {
+ failureHandler.onFailure((ActionRequest) actionRequest, failure, restStatus.getStatus(), failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
+ }
}
}
}
@@ -431,8 +443,12 @@
LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
try {
- for (ActionRequest action : request.requests()) {
- failureHandler.onFailure(action, failure, -1, failureRequestIndexer);
+ for (DocWriteRequest writeRequest : request.requests()) {
+ if (writeRequest instanceof ActionRequest) {
+ failureHandler.onFailure((ActionRequest) writeRequest, failure, -1, failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
+ }
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index f0e401e..0648ed9 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -26,6 +26,7 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -419,7 +420,7 @@
private transient BulkRequest nextBulkRequest = new BulkRequest();
private transient MultiShotLatch flushLatch = new MultiShotLatch();
- private List<? extends Throwable> mockItemFailuresList;
+ private List<? extends Exception> mockItemFailuresList;
private Throwable nextBulkFailure;
public DummyElasticsearchSink(
@@ -454,7 +455,7 @@
* <p>The list is used with corresponding order to the requests in the bulk, i.e. the first
* request uses the response at index 0, the second requests uses the response at index 1, etc.
*/
- public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> mockItemFailuresList) {
+ public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Exception> mockItemFailuresList) {
this.mockItemFailuresList = mockItemFailuresList;
}
@@ -506,14 +507,14 @@
if (nextBulkFailure == null) {
BulkItemResponse[] mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()];
for (int i = 0; i < currentBulkRequest.requests().size(); i++) {
- Throwable mockItemFailure = mockItemFailuresList.get(i);
+ Exception mockItemFailure = mockItemFailuresList.get(i);
if (mockItemFailure == null) {
// the mock response for the item is success
- mockResponses[i] = new BulkItemResponse(i, "opType", mock(DocWriteResponse.class));
+ mockResponses[i] = new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, mock(DocWriteResponse.class));
} else {
// the mock response for the item is failure
- mockResponses[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
+ mockResponses[i] = new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
}
}