[Feature]Support ES query index parameters (#254)
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
index 77c12f2..29c8472 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
@@ -60,9 +60,10 @@
String database,
Map<String, String> option) {
databaseCheck(database);
+
try (EsRestClient client =
EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
- return client.listIndex();
+ return client.listIndex(option.get("filterName"));
}
}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
index 01d20bb..64cb100 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
@@ -24,6 +24,7 @@
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
@@ -252,27 +253,7 @@
}
public List<String> listIndex() {
- String endpoint = "/_cat/indices?format=json";
- Request request = new Request("GET", endpoint);
- try {
- Response response = restClient.performRequest(request);
- if (response == null) {
- throw new ResponseException("GET " + endpoint + " response null");
- }
- if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- String entity = EntityUtils.toString(response.getEntity());
- return JsonUtils.toList(entity, Map.class).stream()
- .map(map -> map.get("index").toString())
- .collect(Collectors.toList());
- } else {
- throw new ResponseException(
- String.format(
- "GET %s response status code=%d",
- endpoint, response.getStatusLine().getStatusCode()));
- }
- } catch (IOException ex) {
- throw new ResponseException(ex);
- }
+ return this.listIndex(null);
}
public void dropIndex(String tableName) {
@@ -365,4 +346,41 @@
}
return mapping;
}
+
+ public List<String> listIndex(String filterName) {
+ String endpoint = "/_cat/indices?format=json";
+ Request request = new Request("GET", endpoint);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ResponseException("GET " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ List<String> indices =
+ JsonUtils.toList(entity, Map.class).stream()
+ .map(map -> map.get("index").toString())
+ .collect(Collectors.toList());
+
+ if (StringUtils.isNotEmpty(filterName)) {
+ indices =
+ indices.stream()
+ .filter(
+ index ->
+ index.toLowerCase()
+ .contains(filterName.toLowerCase()))
+ .collect(Collectors.toList());
+ }
+
+ return indices;
+ } else {
+ throw new ResponseException(
+ String.format(
+ "GET %s response status code=%d",
+ endpoint, response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ResponseException(ex);
+ }
+ }
}