METRON-1834: Migrate Elasticsearch from TransportClient to new Java REST API (mmiklavc via mmiklavc)
diff --git a/Upgrading.md b/Upgrading.md
index a0dd5d3..1da43fa 100644
--- a/Upgrading.md
+++ b/Upgrading.md
@@ -21,6 +21,13 @@
## 0.6.0 to 0.6.1
+### [METRON-1834: Migrate Elasticsearch from TransportClient to new Java REST API](https://issues.apache.org/jira/browse/METRON-1834)
+The Elasticsearch Java client has now been migrated from TransportClient to the new Java REST client. The motivation for this change
+is that TransportClient will be deprecated in Elasticsearch 7.0 and removed entirely in 8.0. See [ES Java API ](https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.6/client.html) for more details.
+The primary client-facing change for upgrades will be the new properties for configuring the new client. An explanation of the new properties
+as well as a mapping from the old properties to the new can be found in [metron-elasticsearch](metron-platform/metron-elasticsearch/README.md#Properties) under `es.client.settings`.
+
+
### [METRON-1855: Make unified enrichment topology the default and deprecate split-join](https://issues.apache.org/jira/browse/METRON-1855)
The unified enrichment topology will be the new default in this release,
and the split-join enrichment topology is now considered deprecated.
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 7bc0e9b..bdc2508 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -487,3 +487,5 @@
com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator
com.github.palindromicity:simple-syslog-5424:jar:0.0.9:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424
+org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:5.6.2:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
+org.elasticsearch.plugin:aggs-matrix-stats-client:jar:5.6.2:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
diff --git a/metron-deployment/Kerberos-manual-setup.md b/metron-deployment/Kerberos-manual-setup.md
index 4bd4516..6d40552 100644
--- a/metron-deployment/Kerberos-manual-setup.md
+++ b/metron-deployment/Kerberos-manual-setup.md
@@ -563,17 +563,16 @@
sudo -u hdfs hdfs dfs -chown metron:metron /apps/metron/elasticsearch/xpack-password
```
-1. New settings have been added to configure the Elasticsearch client. By default the client will run as the normal ES prebuilt transport client. If you enable X-Pack you should set the es.client.class as shown below.
+1. New settings have been added to configure the Elasticsearch client.
- Add the `es.client.settings` to global.json
+ Modify the `es.client.settings` key in global.json
```
$METRON_HOME/config/zookeeper/global.json ->
"es.client.settings" : {
- "es.client.class" : "org.elasticsearch.xpack.client.PreBuiltXPackTransportClient",
- "es.xpack.username" : "transport_client_user",
- "es.xpack.password.file" : "/apps/metron/elasticsearch/xpack-password"
+ "xpack.username" : "transport_client_user",
+ "xpack.password.file" : "/apps/metron/elasticsearch/xpack-password"
}
```
@@ -583,151 +582,6 @@
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -i $METRON_HOME/config/zookeeper/ -z $ZOOKEEPER
```
-1. The last step before restarting the topology is to create a custom X-Pack shaded and relocated jar. This is up to you because of licensing restrictions, but here is a sample Maven pom file that should help.
-
- ```
- <?xml version="1.0" encoding="UTF-8"?>
- <!--
- Licensed to the Apache Software
- Foundation (ASF) under one or more contributor license agreements. See the
- NOTICE file distributed with this work for additional information regarding
- copyright ownership. The ASF licenses this file to You under the Apache License,
- Version 2.0 (the "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software distributed
- under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
- OR CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the License.
- -->
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-xpack-shaded</artifactId>
- <name>elasticsearch-xpack-shaded</name>
- <packaging>jar</packaging>
- <version>5.6.2</version>
- <repositories>
- <repository>
- <id>elasticsearch-releases</id>
- <url>https://artifacts.elastic.co/maven</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
- <dependencies>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>x-pack-transport</artifactId>
- <version>5.6.2</version>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-cbor</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.0</version>
- <configuration>
- <createDependencyReducedPom>true</createDependencyReducedPom>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <relocations>
- <relocation>
- <pattern>io.netty</pattern>
- <shadedPattern>org.apache.metron.io.netty</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.logging.log4j</pattern>
- <shadedPattern>org.apache.metron.logging.log4j</shadedPattern>
- </relocation>
- </relocations>
- <artifactSet>
- <excludes>
- <exclude>org.slf4j.impl*</exclude>
- <exclude>org.slf4j:slf4j-log4j*</exclude>
- </excludes>
- </artifactSet>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
- <resources>
- <resource>.yaml</resource>
- <resource>LICENSE.txt</resource>
- <resource>ASL2.0</resource>
- <resource>NOTICE.txt</resource>
- </resources>
- </transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass></mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
- ```
-
-1. Once you've built the `elasticsearch-xpack-shaded-5.6.2.jar`, it needs to be made available to Storm when you submit the topology. Create a contrib directory for indexing and put the jar file in this directory.
-
- ```
- mkdir $METRON_HOME/indexing_contrib
- cp elasticsearch-xpack-shaded-5.6.2.jar $METRON_HOME/indexing_contrib/elasticsearch-xpack-shaded-5.6.2.jar
- ```
-
1. Now you can restart the Elasticsearch topology. Note, you should perform this step manually, as follows.
```
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 81dda6c..e644b31 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -95,15 +95,6 @@
</value-attributes>
</property>
<property>
- <name>es_binary_port</name>
- <value>9300</value>
- <description>Elasticsearch binary port. (9300)</description>
- <display-name>Elasticsearch Binary Port</display-name>
- <value-attributes>
- <empty-value-valid>true</empty-value-valid>
- </value-attributes>
- </property>
- <property>
<name>es_http_port</name>
<value>9200</value>
<description>Elasticsearch HTTP port. (9200)</description>
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 9d15e93..a7074da 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -583,8 +583,6 @@
missing.append("metron-env/es_cluster_name")
if not config['configurations']['metron-env']['es_hosts']:
missing.append("metron-env/es_hosts")
- if not config['configurations']['metron-env']['es_binary_port']:
- missing.append("metron-env/es_binary_port")
if not config['configurations']['metron-env']['es_date_format']:
missing.append("metron-env/es_date_format")
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 060dfe4..5635330 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -106,9 +106,8 @@
es_cluster_name = config['configurations']['metron-env']['es_cluster_name']
es_hosts = config['configurations']['metron-env']['es_hosts']
es_host_list = es_hosts.split(",")
-es_binary_port = config['configurations']['metron-env']['es_binary_port']
-es_url = ",".join([host + ":" + es_binary_port for host in es_host_list])
es_http_port = config['configurations']['metron-env']['es_http_port']
+es_url = ",".join([host + ":" + es_http_port for host in es_host_list])
es_http_url = es_host_list[0] + ":" + es_http_port
es_date_format = config['configurations']['metron-env']['es_date_format']
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index 46c06dd..7f84f1d 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -489,10 +489,6 @@
"subsection-name": "subsection-index-settings"
},
{
- "config": "metron-env/es_binary_port",
- "subsection-name": "subsection-index-settings"
- },
- {
"config": "metron-env/es_http_port",
"subsection-name": "subsection-index-settings"
},
@@ -1006,12 +1002,6 @@
}
},
{
- "config": "metron-env/es_binary_port",
- "widget": {
- "type": "text-field"
- }
- },
- {
"config": "metron-env/es_http_port",
"widget": {
"type": "text-field"
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index bd8419f..7581ef3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -59,7 +59,7 @@
public SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException {
try {
return dao.getAllMetaAlertsForAlert(guid);
- } catch (InvalidSearchException ise) {
+ } catch (IOException|InvalidSearchException ise) {
throw new RestException(ise.getMessage(), ise);
}
}
diff --git a/metron-platform/elasticsearch-shaded/pom.xml b/metron-platform/elasticsearch-shaded/pom.xml
index ccad3cb..7766e3d 100644
--- a/metron-platform/elasticsearch-shaded/pom.xml
+++ b/metron-platform/elasticsearch-shaded/pom.xml
@@ -30,40 +30,15 @@
<version>18.0</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>4.1.13.Final</version>
+ </dependency>
+ <dependency>
<groupId>org.elasticsearch.client</groupId>
- <artifactId>transport</artifactId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${global_elasticsearch_version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-smile</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-cbor</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
@@ -95,10 +70,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
- <exclusion> <!-- this is causing a weird build error if not excluded - Error creating shaded jar: null: IllegalArgumentException -->
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -155,10 +126,6 @@
<shadedPattern>org.apache.metron.io.netty</shadedPattern>
</relocation>
<relocation>
- <pattern>org.apache.logging.log4j</pattern>
- <shadedPattern>org.apache.metron.logging.log4j</shadedPattern>
- </relocation>
- <relocation>
<pattern>com.google.common</pattern>
<shadedPattern>org.apache.metron.guava.elasticsearch-shaded</shadedPattern>
</relocation>
diff --git a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties b/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
deleted file mode 100644
index c4bd3f0..0000000
--- a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-LoggerContextFactory = org.apache.metron.logging.log4j.core.impl.Log4jContextFactory
-Log4jAPIVersion = 2.6.0
-FactoryPriority= 10
\ No newline at end of file
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
index dac1974..d68259a 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -79,29 +79,30 @@
Various parts of our stack uses the global config are documented throughout the Metron documentation,
but a convenient index is provided here:
-| Property Name | Subsystem | Type | Ambari Property |
-|---------------------------------------------------------------------------------------------------------------------|---------------|------------|----------------------------|
-| [`es.clustername`](../metron-elasticsearch#esclustername) | Indexing | String | `es_cluster_name` |
-| [`es.ip`](../metron-elasticsearch#esip) | Indexing | String | `es_hosts` |
-| [`es.port`](../metron-elasticsearch#esport) | Indexing | String | `es_port` |
-| [`es.date.format`](../metron-elasticsearch#esdateformat) | Indexing | String | `es_date_format` |
-| [`fieldValidations`](#validation-framework) | Parsing | Object | N/A |
-| [`parser.error.topic`](../metron-parsers#parsererrortopic) | Parsing | String | N/A |
-| [`stellar.function.paths`](../../metron-stellar/stellar-common#stellarfunctionpaths) | Stellar | CSV String | N/A |
-| [`stellar.function.resolver.includes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar | CSV String | N/A |
-| [`stellar.function.resolver.excludes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar | CSV String | N/A |
-| [`profiler.period.duration`](../../metron-analytics/metron-profiler#profilerperiodduration) | Profiler | Integer | `profiler_period_duration` |
-| [`profiler.period.duration.units`](../../metron-analytics/metron-profiler#profilerperioddurationunits) | Profiler | String | `profiler_period_units` |
-| [`profiler.writer.batchSize`](../../metron-analytics/metron-profiler/#profilerwriterbatchsize) | Profiler | Integer | N/A |
-| [`profiler.writer.batchTimeout`](../../metron-analytics/metron-profiler/#profilerwriterbatchtimeout) | Profiler | Integer | N/A |
-| [`update.hbase.table`](../metron-indexing#updatehbasetable) | REST/Indexing | String | `update_hbase_table` |
-| [`update.hbase.cf`](../metron-indexing#updatehbasecf) | REST/Indexing | String | `update_hbase_cf` |
-| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` |
-| [`enrichment.writer.batchSize`](../metron-enrichment#enrichmentwriterbatchsize) | Enrichment | Integer | N/A |
-| [`enrichment.writer.batchTimeout`](../metron-enrichment#enrichmentwriterbatchtimeout) | Enrichment | Integer | N/A |
-| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` |
-| [`source.type.field`](../../metron-interface/metron-alerts#sourcetypefield) | UI | String | `source_type_field` |
-| [`threat.triage.score.field`](../../metron-interface/metron-alerts#threattriagescorefield) | UI | String | `threat_triage_score_field` |
+| Property Name | Subsystem | Type | Ambari Property |
+|---------------------------------------------------------------------------------------------------------------------|---------------|------------|------------------------------|
+| [`es.clustername`](../metron-elasticsearch#esclustername) | Indexing | String | `es_cluster_name` |
+| [`es.ip`](../metron-elasticsearch#esip) | Indexing | String | `es_hosts` |
+| [`es.port`](../metron-elasticsearch#esport) | Indexing | String | `es_port` |
+| [`es.date.format`](../metron-elasticsearch#esdateformat) | Indexing | String | `es_date_format` |
+| [`es.client.settings`](../metron-elasticsearch#esclientsettings) | Indexing | Object | N/A |
+| [`fieldValidations`](#validation-framework) | Parsing | Object | N/A |
+| [`parser.error.topic`](../metron-parsers#parsererrortopic) | Parsing | String | N/A |
+| [`stellar.function.paths`](../../metron-stellar/stellar-common#stellarfunctionpaths) | Stellar | CSV String | N/A |
+| [`stellar.function.resolver.includes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar | CSV String | N/A |
+| [`stellar.function.resolver.excludes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar | CSV String | N/A |
+| [`profiler.period.duration`](../../metron-analytics/metron-profiler#profilerperiodduration) | Profiler | Integer | `profiler_period_duration` |
+| [`profiler.period.duration.units`](../../metron-analytics/metron-profiler#profilerperioddurationunits) | Profiler | String | `profiler_period_units` |
+| [`profiler.writer.batchSize`](../../metron-analytics/metron-profiler/#profilerwriterbatchsize) | Profiler | Integer | N/A |
+| [`profiler.writer.batchTimeout`](../../metron-analytics/metron-profiler/#profilerwriterbatchtimeout) | Profiler | Integer | N/A |
+| [`update.hbase.table`](../metron-indexing#updatehbasetable) | REST/Indexing | String | `update_hbase_table` |
+| [`update.hbase.cf`](../metron-indexing#updatehbasecf) | REST/Indexing | String | `update_hbase_cf` |
+| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` |
+| [`enrichment.writer.batchSize`](../metron-enrichment#enrichmentwriterbatchsize) | Enrichment | Integer | N/A |
+| [`enrichment.writer.batchTimeout`](../metron-enrichment#enrichmentwriterbatchtimeout) | Enrichment | Integer | N/A |
+| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` |
+| [`source.type.field`](../../metron-interface/metron-alerts#sourcetypefield) | UI | String | `source_type_field` |
+| [`threat.triage.score.field`](../../metron-interface/metron-alerts#threattriagescorefield) | UI | String | `threat_triage_score_field` |
## Note Configs in Ambari
If a field is managed via ambari, you should change the field via
@@ -439,3 +440,4 @@
-p DIRECTORY, --hdp_home=DIRECTORY
HDP home directory
```
+`
diff --git a/metron-platform/metron-common/src/main/config/zookeeper/global.json b/metron-platform/metron-common/src/main/config/zookeeper/global.json
index 9e5402e..b638ca3 100644
--- a/metron-platform/metron-common/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-common/src/main/config/zookeeper/global.json
@@ -6,6 +6,5 @@
"update.hbase.table": "metron_update",
"update.hbase.cf": "t",
"es.client.settings": {
- "client.transport.ping_timeout": "500s"
}
}
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
index 6308f0a..14d5b69 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
@@ -30,6 +30,13 @@
return (s, o) -> o;
}
+ /**
+ * Returns true if the map contains the key for the defined config option
+ */
+ default boolean containsOption(Map<String, Object> map) {
+ return map.containsKey(getKey());
+ }
+
default void put(Map<String, Object> map, Object value) {
map.put(getKey(), value);
}
diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md
index 177412e..463a0b8 100644
--- a/metron-platform/metron-elasticsearch/README.md
+++ b/metron-platform/metron-elasticsearch/README.md
@@ -59,6 +59,49 @@
roll hourly, whereas an `es.date.format` of `yyyy.MM.dd` would have the consequence that the indices would
roll daily.
+### `es.client.settings`
+
+This field in global config allows you to specify Elasticsearch REST client options. These are used in conjunction with the previously mentioned Elasticsearch properties
+when setting up client connections to an Elasticsearch cluster. The available properties should be supplied as an object map. Current available options are as follows:
+
+| Property Name | Type | Required? | Default Value | Description |
+|-------------------------------------|-----------|-----------|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connection.timeout.millis | Integer | No | 1000 | Sets connection timeout. |
+| socket.timeout.millis | Integer | No | 30000 | Sets socket timeout. |
+| max.retry.timeout.millis | Integer | No | 30000 | Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request. |
+| num.client.connection.threads | Integer | No | 1 | Number of worker threads used by the connection manager. Defaults to Runtime.getRuntime().availableProcessors(). |
+| xpack.username | String | No | null | X-Pack username. |
+| xpack.password.file | String | No | null | 1-line HDFS file where the X-Pack password is set. |
+| ssl.enabled | Boolean | No | false | Turn on SSL connections. |
+| keystore.type | String | No | "jks" | Allows you to specify a keytstore type. See https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#KeyStore for more details. |
+| keystore.path | String | No | null | Path to the Trust Store that holds your Elasticsearch certificate authorities and certificate. |
+| keystore.password.file | String | No | null | 1-line HDFS file where the keystore password is set. |
+
+__Note:__ The migration from Elasticsearch's TransportClient to the Java REST client has resulted in some existing properties to change. Below is a mapping of the old properties to the new ones:
+
+| Old Property Name | New Property Name |
+|----------------------------------------|-------------------------------------|
+| client.transport.ping_timeout | n/a |
+| n/a | connection.timeout.millis |
+| n/a | socket.timeout.millis |
+| n/a | max.retry.timeout.millis |
+| n/a | num.client.connection.threads |
+| es.client.class | n/a |
+| es.xpack.username | xpack.username |
+| es.xpack.password.file | xpack.password.file |
+| xpack.security.transport.ssl.enabled | ssl.enabled |
+| xpack.ssl.key | n/a |
+| xpack.ssl.certificate | n/a |
+| xpack.ssl.certificate_authorities | n/a |
+| n/a | keystore.type |
+| keystore.path | keystore.path |
+| n/a | keystore.password.file |
+
+__Notes:__
+* The transport client implementation provides for a 'xpack.security.user' property, however we never used this property directly. Rather, in order to secure the password we used custom properties for user/pass. These properties have been carried over as `xpack.username` and `xpack.password.file`.
+* See [https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_common_configuration.html](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_common_configuration.html) for more specifics on the new client properties.
+* Other notes on JSSE - [https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html](https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html)
+
## Upgrading to 5.6.2
Users should be prepared to re-index when migrating from Elasticsearch 2.3.3 to 5.6.2. There are a number of template changes, most notably around
@@ -287,7 +330,7 @@
## Using Metron with Elasticsearch 5.6.2
-Although infrequent sometimes an internal field is added in Metron and existing templates must be updated. The following steps outlines how to do this, using `metron_alert` as an example.
+Although infrequent, sometimes an internal field is added in Metron and existing templates must be updated. The following steps outlines how to do this, using `metron_alert` as an example.
With the addition of the meta alert feature, there is a requirement that all sensors templates have a nested `metron_alert` field defined. This field is a dummy field. See [Ignoring Unmapped Fields](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html#_ignoring_unmapped_fields) for more information
diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml
index adc601a..e3cf840 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -73,6 +73,17 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.plugin</groupId>
+ <artifactId>transport-netty4-client</artifactId>
+ <version>${global_elasticsearch_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-hbase</artifactId>
<version>${project.parent.version}</version>
@@ -206,22 +217,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <version>${global_log4j_core_version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>${global_log4j_core_version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-testlib</artifactId>
- <version>${global_guava_version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
@@ -283,7 +278,6 @@
</excludes>
</artifactSet>
<transformers>
- <transformer implementation="org.atteo.classindex.ClassIndexTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resources>
@@ -297,13 +291,15 @@
<!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
<addHeader>false</addHeader>
<projectName>${project.name}</projectName>
- </transformer-->
+ </transformer>-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
+ <!-- ClassIndexTransformer needs to go LAST. For some reason it will clobber other transformers from operating when it is put first -->
+ <transformer implementation="org.atteo.classindex.ClassIndexTransformer"/>
</transformers>
</configuration>
</execution>
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClient.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClient.java
new file mode 100644
index 0000000..d62a7c0
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClient.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.client;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.apache.metron.elasticsearch.utils.FieldProperties;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+
+/**
+ * Wrapper around the Elasticsearch REST clients. Exposes capabilities of the low and high-level clients.
+ * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-overview.html. Most, if not
+ * all of use in Metron would be focused through the high-level client. It handles marshaling/unmarshaling.
+ */
+public class ElasticsearchClient implements AutoCloseable{
+ private RestClient lowLevelClient;
+ private RestHighLevelClient highLevelClient;
+
+ /**
+ * Instantiate with ElasticsearchClientFactory.
+ *
+ * @param lowLevelClient
+ * @param highLevelClient
+ */
+ public ElasticsearchClient(RestClient lowLevelClient, RestHighLevelClient highLevelClient) {
+ this.lowLevelClient = lowLevelClient;
+ this.highLevelClient = highLevelClient;
+ }
+
+ /**
+ * Exposes an Elasticsearch low-level client. Prefer the high level client.
+ */
+ public RestClient getLowLevelClient() {
+ return lowLevelClient;
+ }
+
+ /**
+ * <p>
+ * Exposes an Elasticsearch high-level client. Prefer to use this client over the low-level client where possible. This client wraps the low-level
+ * client and exposes some additional sugar on top of the low level methods including marshaling/unmarshaling.
+ * </p>
+ * <p>
+ * Note, as of 5.6.2 it does NOT support index or cluster management operations.
+ * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_changing_the_application_8217_s_code.html
+ * <br>
+ * <i>Does not provide indices or cluster management APIs. Management operations can be executed by external scripts or using the low-level client.</i>
+ * </p>
+ * <p>
+ * Current supported ES API's seen here - https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high-supported-apis.html
+ * </p>
+ *
+ * <ul>
+ * <li>Single document APIs
+ * <ul>
+ * <li>Index API</li>
+ * <li>Get API</li>
+ * <li>Delete API</li>
+ * <li>Update API</li>
+ * </ul>
+ * </li>
+ * <li>Multi document APIs
+ * <ul>
+ * <li>Bulk API</li>
+ * </ul>
+ * </li>
+ * <li>Search APIs
+ * <ul>
+ * <li>Search API</li>
+ * <li>Search Scroll API</li>
+ * <li>Clear Scroll API</li>
+ * </ul>
+ * </li>
+ * <li>Miscellaneous APIs
+ * <ul>
+ * <li>Info API</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
+ public RestHighLevelClient getHighLevelClient() {
+ return highLevelClient;
+ }
+
+ /**
+ * Included as part of AutoCloseable because Elasticsearch recommends closing the client when not
+ * being used.
+ * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_changing_the_client_8217_s_initialization_code.html
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ if (lowLevelClient != null) {
+ lowLevelClient.close();
+ }
+ }
+
+ /**
+ * https://www.elastic.co/guide/en/elasticsearch/reference/5.6/indices-put-mapping.html
+ * @param index
+ * @param mappingType https://www.elastic.co/guide/en/elasticsearch/reference/5.6/mapping.html#mapping-type
+ * @param mapping
+ * @throws IOException
+ */
+ public void putMapping(String index, String mappingType, String mapping) throws IOException {
+ HttpEntity entity = new StringEntity(mapping);
+ Response response = lowLevelClient.performRequest("PUT"
+ , "/" + index + "/_mapping/" + mappingType
+ , Collections.emptyMap()
+ , entity
+ );
+
+ if(response.getStatusLine().getStatusCode() != 200) {
+ String responseStr = IOUtils.toString(response.getEntity().getContent());
+ throw new IllegalStateException("Got a " + response.getStatusLine().getStatusCode() + " due to " + responseStr);
+ }
+ }
+
+ /**
+ * Gets ALL Elasticsearch indices, or null if status code returned is not OK 200.
+ */
+ public String[] getIndices() throws IOException {
+ Response response = lowLevelClient.performRequest("GET", "/_cat/indices");
+ if(response.getStatusLine().getStatusCode() == 200) {
+ String responseStr = IOUtils.toString(response.getEntity().getContent());
+ List<String> indices = new ArrayList<>();
+ for(String line : Splitter.on("\n").split(responseStr)) {
+ Iterable<String> splits = Splitter.on(" ").split(line.replaceAll("\\s+", " ").trim());
+ if(Iterables.size(splits) > 3) {
+ String index = Iterables.get(splits, 2, "");
+ if(!StringUtils.isEmpty(index)) {
+ indices.add(index.trim());
+ }
+ }
+ }
+ String[] ret = new String[indices.size()];
+ ret=indices.toArray(ret);
+ return ret;
+ }
+ return null;
+ }
+
+ /**
+ * Gets FieldMapping detail for a list of indices.
+ *
+ * @param indices get field mapppings for the provided indices
+ * @return mapping of index name to FieldMapping
+ */
+ public Map<String, FieldMapping> getMappingByIndex(String[] indices) throws IOException {
+ Map<String, FieldMapping> ret = new HashMap<>();
+ String indicesCsv = Joiner.on(",").join(indices);
+ Response response = lowLevelClient.performRequest("GET", "/" + indicesCsv + "/_mapping");
+ if(response.getStatusLine().getStatusCode() == 200) {
+ String responseStr = IOUtils.toString(response.getEntity().getContent());
+ Map<String, Object> indexToMapping = JSONUtils.INSTANCE.load(responseStr, JSONUtils.MAP_SUPPLIER);
+ for(Map.Entry<String, Object> index2Mapping : indexToMapping.entrySet()) {
+ String index = index2Mapping.getKey();
+ Map<String, Object> mappings = getInnerMap((Map<String, Object>)index2Mapping.getValue(), "mappings");
+ if(mappings.size() > 0) {
+ Map.Entry<String, Object> docMap = Iterables.getFirst(mappings.entrySet(), null);
+ if(docMap != null) {
+ Map<String, Object> fieldPropertiesMap = getInnerMap((Map<String, Object>)docMap.getValue(), "properties");
+ if(fieldPropertiesMap != null) {
+ FieldMapping mapping = new FieldMapping();
+ for (Map.Entry<String, Object> field2PropsKV : fieldPropertiesMap.entrySet()) {
+ if(field2PropsKV.getValue() != null) {
+ FieldProperties props = new FieldProperties((Map<String, Object>) field2PropsKV.getValue());
+ mapping.put(field2PropsKV.getKey(), props);
+ }
+ }
+ ret.put(index, mapping);
+ }
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Traverses the outer map to retrieve a leaf map by iteratively calling get(key) using the provided keys in order. e.g.
+ * for an outer map provided as follows:
+ * <pre>
+ * {
+ * "foo" : {
+ * "bar" : {
+ * "baz" : {
+ * "hello" : "world"
+ * }
+ * }
+ * }
+ * }
+ * </pre>
+ * calling getInnerMap(outerMap, new String[] { "foo", "bar", "baz" }) would return the following:
+ * <pre>
+ * {hello=world}
+ * </pre>
+ * @param outerMap Complex map of nested keys/values
+ * @param keys ordered list of keys to iterate over to grab a leaf mapping.
+ * @return leaf node, or innermost matching node from outerMap if no leaf exists
+ */
+ private Map<String, Object> getInnerMap(Map<String, Object> outerMap, String... keys) {
+ Map<String, Object> ret = outerMap;
+ if(keys.length == 0) {
+ return outerMap;
+ }
+ for(String key : keys) {
+ ret = (Map<String, Object>)ret.get(key);
+ if(ret == null) {
+ return ret;
+ }
+ }
+ return ret;
+ }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
new file mode 100644
index 0000000..4e0b2fe
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import javax.net.ssl.SSLContext;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.metron.elasticsearch.config.ElasticsearchClientConfig;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils.HostnamePort;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point to create the ES client.
+ */
+public class ElasticsearchClientFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String ES_SETTINGS_KEY = "es.client.settings"; // es config key in global config
+
+ /**
+ * Creates an Elasticsearch client from settings provided via the global config.
+ *
+ * @return new client
+ */
+ public static ElasticsearchClient create(Map<String, Object> globalConfig) {
+ ElasticsearchClientConfig esClientConfig = new ElasticsearchClientConfig(
+ getEsSettings(globalConfig));
+ HttpHost[] httpHosts = getHttpHosts(globalConfig, esClientConfig.getConnectionScheme());
+ RestClientBuilder builder = RestClient.builder(httpHosts);
+
+ builder.setRequestConfigCallback(reqConfigBuilder -> {
+ // Modifies request config builder with connection and socket timeouts.
+ // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_timeouts.html
+ reqConfigBuilder.setConnectTimeout(esClientConfig.getConnectTimeoutMillis());
+ reqConfigBuilder.setSocketTimeout(esClientConfig.getSocketTimeoutMillis());
+ return reqConfigBuilder;
+ });
+ builder.setMaxRetryTimeoutMillis(esClientConfig.getMaxRetryTimeoutMillis());
+
+ builder.setHttpClientConfigCallback(clientBuilder -> {
+ clientBuilder.setDefaultIOReactorConfig(getIOReactorConfig(esClientConfig));
+ clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider(esClientConfig));
+ clientBuilder.setSSLContext(getSSLContext(esClientConfig));
+ return clientBuilder;
+ });
+
+ RestClient lowLevelClient = builder.build();
+ RestHighLevelClient client = new RestHighLevelClient(lowLevelClient);
+ return new ElasticsearchClient(lowLevelClient, client);
+ }
+
+ private static Map<String, Object> getEsSettings(Map<String, Object> globalConfig) {
+ return (Map<String, Object>) globalConfig.getOrDefault(ES_SETTINGS_KEY, new HashMap<>());
+ }
+
+ private static HttpHost[] getHttpHosts(Map<String, Object> globalConfiguration, String scheme) {
+ List<HostnamePort> hps = ElasticsearchUtils.getIps(globalConfiguration);
+ HttpHost[] httpHosts = new HttpHost[hps.size()];
+ int i = 0;
+ for (HostnamePort hp : hps) {
+ httpHosts[i++] = new HttpHost(hp.hostname, hp.port, scheme);
+ }
+ return httpHosts;
+ }
+
+ /**
+ * Creates config with setting for num connection threads. Default is ES client default,
+ * which is 1 to num processors per the documentation.
+ * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_number_of_threads.html
+ */
+ private static IOReactorConfig getIOReactorConfig(ElasticsearchClientConfig esClientConfig) {
+ if (esClientConfig.getNumClientConnectionThreads().isPresent()) {
+ Integer numThreads = esClientConfig.getNumClientConnectionThreads().get();
+ LOG.info("Setting number of client connection threads: {}", numThreads);
+ return IOReactorConfig.custom().setIoThreadCount(numThreads).build();
+ } else {
+ return IOReactorConfig.DEFAULT;
+ }
+ }
+
+ private static CredentialsProvider getCredentialsProvider(
+ ElasticsearchClientConfig esClientConfig) {
+ Optional<Entry<String, String>> credentials = esClientConfig.getCredentials();
+ if (credentials.isPresent()) {
+ LOG.info(
+ "Found auth credentials - setting up user/pass authenticated client connection for ES.");
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ UsernamePasswordCredentials upcredentials = new UsernamePasswordCredentials(
+ credentials.get().getKey(), credentials.get().getValue());
+ credentialsProvider.setCredentials(AuthScope.ANY, upcredentials);
+ return credentialsProvider;
+ } else {
+ LOG.info(
+ "Elasticsearch client credentials not provided. Defaulting to non-authenticated client connection.");
+ return null;
+ }
+ }
+
+ /**
+ * <p>Setup connection encryption details (SSL) if applicable.
+ * If ssl.enabled=true, sets up SSL connection. If enabled, keystore.path is required. User can
+ * also optionally set keystore.password and keystore.type.
+ * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_encrypted_communication.html
+ * <p>
+ * <p>Other guidance on the HTTP Component library and configuring SSL connections.
+ * http://www.robinhowlett.com/blog/2016/01/05/everything-you-ever-wanted-to-know-about-ssl-but-were-afraid-to-ask.
+ * <p>
+ * <p>JSSE docs - https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
+ * <p>
+ * <p>Additional guidance for configuring Elasticsearch for SSL can be found here - https://www.elastic.co/guide/en/x-pack/5.6/ssl-tls.html
+ */
+ private static SSLContext getSSLContext(ElasticsearchClientConfig esClientConfig) {
+ if (esClientConfig.isSSLEnabled()) {
+ LOG.info("Configuring client for SSL connection.");
+ if (!esClientConfig.getKeyStorePath().isPresent()) {
+ throw new IllegalStateException("KeyStore path must be provided for SSL connection.");
+ }
+ Optional<String> optKeyStorePass = esClientConfig.getKeyStorePassword();
+ char[] keyStorePass = optKeyStorePass.map(String::toCharArray).orElse(null);
+ KeyStore trustStore = getStore(esClientConfig.getKeyStoreType(),
+ esClientConfig.getKeyStorePath().get(), keyStorePass);
+ try {
+ SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
+ return sslBuilder.build();
+ } catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
+ throw new IllegalStateException("Unable to load truststore.", e);
+ }
+ }
+ return null;
+ }
+
+ private static KeyStore getStore(String type, Path path, char[] pass) {
+ KeyStore store;
+ try {
+ store = KeyStore.getInstance(type);
+ } catch (KeyStoreException e) {
+ throw new IllegalStateException("Unable to get keystore type '" + type + "'", e);
+ }
+ try (InputStream is = Files.newInputStream(path)) {
+ store.load(is, pass);
+ } catch (IOException | NoSuchAlgorithmException | CertificateException e) {
+ throw new IllegalStateException("Unable to load keystore from path '" + path + "'", e);
+ }
+ return store;
+ }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
new file mode 100644
index 0000000..2ca4763
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.config;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.utils.HDFSUtils;
+
+/**
+ * Access configuration options for the ES client.
+ */
+public class ElasticsearchClientConfig extends AbstractMapDecorator<String, Object> {
+
+ private static final Integer THIRTY_SECONDS_IN_MILLIS = 30_000;
+ private static final Integer ONE_SECONDS_IN_MILLIS = 1_000;
+ private static final String DEFAULT_KEYSTORE_TYPE = "JKS";
+
+ /**
+ * Initialize config from provided settings Map.
+ *
+ * @param settings Map of config options from which to initialize.
+ */
+ public ElasticsearchClientConfig(Map<String, Object> settings) {
+ super(settings);
+ }
+
+ /**
+ * @return Connection timeout as specified by user, or default 1s as defined by the ES client.
+ */
+ public Integer getConnectTimeoutMillis() {
+ return ElasticsearchClientOptions.CONNECTION_TIMEOUT_MILLIS
+ .getOrDefault(this, Integer.class, ONE_SECONDS_IN_MILLIS);
+ }
+
+ /**
+ * @return socket timeout specified by user, or default 30s as defined by the ES client.
+ */
+ public Integer getSocketTimeoutMillis() {
+ return ElasticsearchClientOptions.SOCKET_TIMEOUT_MILLIS
+ .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS);
+ }
+
+ /**
+ * @return max retry timeout specified by user, or default 30s as defined by the ES client.
+ */
+ public Integer getMaxRetryTimeoutMillis() {
+ return ElasticsearchClientOptions.MAX_RETRY_TIMEOUT_MILLIS
+ .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS);
+ }
+
+ /**
+ * Elasticsearch X-Pack credentials.
+ *
+ * @return Username, password
+ */
+ public Optional<Map.Entry<String, String>> getCredentials() {
+ if (ElasticsearchClientOptions.XPACK_PASSWORD_FILE.containsOption(this)) {
+ if (!ElasticsearchClientOptions.XPACK_USERNAME.containsOption(this) ||
+ StringUtils.isEmpty(ElasticsearchClientOptions.XPACK_USERNAME.get(this, String.class))) {
+ throw new IllegalArgumentException(
+ "X-pack username is required when password supplied and cannot be empty");
+ }
+ String user = ElasticsearchClientOptions.XPACK_USERNAME.get(this, String.class);
+ String password = getPasswordFromFile(
+ ElasticsearchClientOptions.XPACK_PASSWORD_FILE.get(this, String.class));
+ if (user != null && password != null) {
+ return Optional.of(new AbstractMap.SimpleImmutableEntry<String, String>(user, password));
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Expects single password on first line.
+ */
+ private static String getPasswordFromFile(String hdfsPath) {
+ List<String> lines = readLines(hdfsPath);
+ if (lines.size() == 0) {
+ throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath));
+ }
+ return lines.get(0);
+ }
+
+ /**
+ * Read all lines from HDFS file.
+ *
+ * @param hdfsPath path to file
+ * @return lines
+ */
+ private static List<String> readLines(String hdfsPath) {
+ try {
+ return HDFSUtils.readFile(hdfsPath);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e);
+ }
+ }
+
+ /**
+ * Determines if SSL is enabled from user-supplied config ssl.enabled.
+ */
+ public boolean isSSLEnabled() {
+ return ElasticsearchClientOptions.SSL_ENABLED.getOrDefault(this, Boolean.class, false);
+ }
+
+ /**
+ * http by default, https if ssl is enabled.
+ */
+ public String getConnectionScheme() {
+ return isSSLEnabled() ? "https" : "http";
+ }
+
+ /**
+ * @return Number of threads to use for client connection.
+ */
+ public Optional<Integer> getNumClientConnectionThreads() {
+ if (ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.containsOption(this)) {
+ return Optional
+ .of(ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.get(this, Integer.class));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * @return User-defined keystore type. Defaults to "JKS" if not defined.
+ */
+ public String getKeyStoreType() {
+ if (ElasticsearchClientOptions.KEYSTORE_TYPE.containsOption(this)
+ && StringUtils
+ .isNotEmpty(ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, String.class))) {
+ return ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, String.class);
+ }
+ return DEFAULT_KEYSTORE_TYPE;
+ }
+
+ /**
+ * Reads keystore password from the HDFS file defined by setting "keystore.password.file", if it
+ * exists.
+ *
+ * @return password if it exists, empty optional otherwise.
+ */
+ public Optional<String> getKeyStorePassword() {
+ if (ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.containsOption(this)) {
+ String password = getPasswordFromFile(
+ ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.get(this, String.class));
+ if (StringUtils.isNotEmpty(password)) {
+ return Optional.of(password);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * @return keystore path.
+ */
+ public Optional<Path> getKeyStorePath() {
+ if (ElasticsearchClientOptions.KEYSTORE_PATH.containsOption(this)) {
+ return Optional.of(Paths.get(ElasticsearchClientOptions.KEYSTORE_PATH.get(this, String.class)));
+ }
+ return Optional.empty();
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
new file mode 100644
index 0000000..c92a34f
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.config;
+
+import org.apache.metron.common.configuration.ConfigOption;
+
+public enum ElasticsearchClientOptions implements ConfigOption {
+ CONNECTION_TIMEOUT_MILLIS("connection.timeout.millis"),
+ SOCKET_TIMEOUT_MILLIS("socket.timeout.millis"),
+ MAX_RETRY_TIMEOUT_MILLIS("max.retry.timeout.millis"),
+ NUM_CLIENT_CONNECTION_THREADS("num.client.connection.threads"),
+ // authentication
+ XPACK_USERNAME("xpack.username"),
+ XPACK_PASSWORD_FILE("xpack.password.file"),
+ // security/encryption
+ SSL_ENABLED("ssl.enabled"),
+ KEYSTORE_TYPE("keystore.type"),
+ KEYSTORE_PATH("keystore.path"),
+ KEYSTORE_PASSWORD_FILE("keystore.password.file");
+
+ private final String key;
+
+ ElasticsearchClientOptions(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Convenience method for printing all options as their key representation.
+ */
+ public static void printOptions() {
+ String newLine = "";
+ for (ElasticsearchClientOptions opt : ElasticsearchClientOptions.values()) {
+ System.out.print(newLine);
+ System.out.print(opt.getKey());
+ newLine = System.lineSeparator();
+ }
+ }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
index 6a8cad8..cb44694 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -18,26 +18,23 @@
package org.apache.metron.elasticsearch.dao;
-import org.apache.metron.indexing.dao.ColumnMetadataDao;
-import org.apache.metron.indexing.dao.search.FieldType;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.apache.metron.elasticsearch.utils.FieldProperties;
+import org.apache.metron.indexing.dao.ColumnMetadataDao;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Responsible for retrieving column-level metadata for Elasticsearch search indices.
@@ -61,16 +58,13 @@
elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
}
- /**
- * An Elasticsearch administrative client.
- */
- private transient AdminClient adminClient;
+ private transient ElasticsearchClient esClient;
/**
- * @param adminClient The Elasticsearch admin client.
+ * @param esClient The Elasticsearch client.
*/
- public ElasticsearchColumnMetadataDao(AdminClient adminClient) {
- this.adminClient = adminClient;
+ public ElasticsearchColumnMetadataDao(ElasticsearchClient esClient) {
+ this.esClient = esClient;
}
@SuppressWarnings("unchecked")
@@ -82,51 +76,40 @@
String[] latestIndices = getLatestIndices(indices);
if (latestIndices.length > 0) {
- ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = adminClient
- .indices()
- .getMappings(new GetMappingsRequest().indices(latestIndices))
- .actionGet()
- .getMappings();
+
+ Map<String, FieldMapping> mappings = esClient.getMappingByIndex(latestIndices);
// for each index
- for (Object key : mappings.keys().toArray()) {
- String indexName = key.toString();
- ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
+ for (Map.Entry<String, FieldMapping> kv : mappings.entrySet()) {
+ String indexName = kv.getKey();
+ FieldMapping mapping = kv.getValue();
// for each mapping in the index
- Iterator<String> mappingIterator = mapping.keysIt();
- while (mappingIterator.hasNext()) {
- MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
- Map<String, Object> sourceAsMap = mappingMetaData.getSourceAsMap();
- if (sourceAsMap.containsKey("properties")) {
- Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) sourceAsMap.get("properties");
+ for(Map.Entry<String, FieldProperties> fieldToProperties : mapping.entrySet()) {
+ String field = fieldToProperties.getKey();
+ FieldProperties properties = fieldToProperties.getValue();
+ if (!fieldBlackList.contains(field)) {
+ FieldType type = toFieldType((String) properties.get("type"));
- // for each field in the mapping
- for (String field : map.keySet()) {
- if (!fieldBlackList.contains(field)) {
- FieldType type = toFieldType(map.get(field).get("type"));
+ if(!indexColumnMetadata.containsKey(field)) {
+ indexColumnMetadata.put(field, type);
- if(!indexColumnMetadata.containsKey(field)) {
- indexColumnMetadata.put(field, type);
+ // record the last index in which a field exists, to be able to print helpful error message on type mismatch
+ previousIndices.put(field, indexName);
- // record the last index in which a field exists, to be able to print helpful error message on type mismatch
- previousIndices.put(field, indexName);
-
- } else {
- FieldType previousType = indexColumnMetadata.get(field);
- if (!type.equals(previousType)) {
- String previousIndexName = previousIndices.get(field);
- LOG.error(String.format(
+ } else {
+ FieldType previousType = indexColumnMetadata.get(field);
+ if (!type.equals(previousType)) {
+ String previousIndexName = previousIndices.get(field);
+ LOG.error(String.format(
"Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.",
indexName, field, type.getFieldType(),
previousIndexName, field, previousType.getFieldType(),
FieldType.OTHER.getFieldType()));
- indexColumnMetadata.put(field, FieldType.OTHER);
+ indexColumnMetadata.put(field, FieldType.OTHER);
- // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER
- fieldBlackList.add(field);
- }
- }
+ // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER
+ fieldBlackList.add(field);
}
}
}
@@ -166,15 +149,11 @@
* @param includeIndices The base names of the indices to include
* @return The latest version of a set of indices.
*/
- String[] getLatestIndices(List<String> includeIndices) {
+ String[] getLatestIndices(List<String> includeIndices) throws IOException {
LOG.debug("Getting latest indices; indices={}", includeIndices);
Map<String, String> latestIndices = new HashMap<>();
- String[] indices = adminClient
- .indices()
- .prepareGetIndex()
- .setFeatures()
- .get()
- .getIndices();
+
+ String[] indices = esClient.getIndices();
for (String index : indices) {
int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 9f6e1a1..210e1ce 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -22,7 +22,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.RetrieveLatestDao;
@@ -38,7 +39,6 @@
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
import org.apache.metron.indexing.dao.update.ReplaceRequest;
-import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private transient TransportClient client;
+ private transient ElasticsearchClient client;
private ElasticsearchSearchDao searchDao;
private ElasticsearchUpdateDao updateDao;
private ElasticsearchRetrieveLatestDao retrieveLatestDao;
@@ -64,7 +64,7 @@
private AccessConfig accessConfig;
- protected ElasticsearchDao(TransportClient client,
+ protected ElasticsearchDao(ElasticsearchClient client,
AccessConfig config,
ElasticsearchSearchDao searchDao,
ElasticsearchUpdateDao updateDao,
@@ -96,10 +96,9 @@
@Override
public synchronized void init(AccessConfig config) {
if (this.client == null) {
- this.client = ElasticsearchUtils
- .getClient(config.getGlobalConfigSupplier().get());
+ this.client = ElasticsearchClientFactory.create(config.getGlobalConfigSupplier().get());
this.accessConfig = config;
- this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin());
+ this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client);
this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao,
requestSubmitter);
@@ -127,13 +126,13 @@
}
@Override
- public Document getLatest(final String guid, final String sensorType) {
+ public Document getLatest(final String guid, final String sensorType) throws IOException {
return retrieveLatestDao.getLatest(guid, sensorType);
}
@Override
public Iterable<Document> getAllLatest(
- final List<GetRequest> getRequests) {
+ final List<GetRequest> getRequests) throws IOException {
return retrieveLatestDao.getAllLatest(getRequests);
}
@@ -188,7 +187,7 @@
return this.updateDao.removeCommentFromAlert(request, latest);
}
- protected Optional<String> getIndexName(String guid, String sensorType) {
+ protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
return updateDao.getIndexName(guid, sensorType);
}
@@ -202,7 +201,7 @@
return searchDao.group(groupRequest, queryBuilder);
}
- public TransportClient getClient() {
+ public ElasticsearchClient getClient() {
return this.client;
}
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index fc0b20c..ac5417e 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -176,7 +176,7 @@
}
@Override
- public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+ public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException {
return metaAlertSearchDao.getAllMetaAlertsForAlert(guid);
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
index 00fc9d0..65bfa20 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
@@ -41,6 +41,8 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import java.io.IOException;
+
public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao {
protected ElasticsearchDao elasticsearchDao;
@@ -89,7 +91,7 @@
}
@Override
- public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+ public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException {
if (guid == null || guid.trim().isEmpty()) {
throw new InvalidSearchException("Guid cannot be empty");
}
@@ -104,7 +106,7 @@
).innerHit(new InnerHitBuilder())
)
.must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
- return queryAllResults(elasticsearchDao.getClient(), qb, config.getMetaAlertIndex(),
+ return queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, config.getMetaAlertIndex(),
pageSize);
}
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
index 3b67891..2e9c855 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -199,7 +199,7 @@
* @param alertGuid The GUID of the child alert
* @return The Elasticsearch response containing the meta alerts
*/
- protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
+ protected SearchResponse getMetaAlertsForAlert(String alertGuid) throws IOException {
QueryBuilder qb = boolQuery()
.must(
nestedQuery(
@@ -212,7 +212,7 @@
)
.must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
return ElasticsearchUtils
- .queryAllResults(elasticsearchDao.getClient(), qb, getConfig().getMetaAlertIndex(),
+ .queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, getConfig().getMetaAlertIndex(),
pageSize);
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
index 0e0df21..c63532e 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -20,13 +20,12 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +42,9 @@
/**
* The Elasticsearch client.
*/
- private TransportClient client;
+ private ElasticsearchClient client;
- public ElasticsearchRequestSubmitter(TransportClient client) {
+ public ElasticsearchRequestSubmitter(ElasticsearchClient client) {
this.client = client;
}
@@ -60,12 +59,10 @@
// submit the search request
org.elasticsearch.action.search.SearchResponse esResponse;
try {
- esResponse = client
- .search(request)
- .actionGet();
+ esResponse = client.getHighLevelClient().search(request);
LOG.debug("Got Elasticsearch response; response={}", esResponse.toString());
- } catch (SearchPhaseExecutionException e) {
+ } catch (Exception e) {
String msg = String.format(
"Failed to execute search; error='%s', search='%s'",
ExceptionUtils.getRootCauseMessage(e),
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index f6bfeda..0c91007 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -28,33 +28,35 @@
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.indexing.dao.RetrieveLatestDao;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
- private TransportClient transportClient;
+ private ElasticsearchClient transportClient;
- public ElasticsearchRetrieveLatestDao(TransportClient transportClient) {
+ public ElasticsearchRetrieveLatestDao(ElasticsearchClient transportClient) {
this.transportClient = transportClient;
}
@Override
- public Document getLatest(String guid, String sensorType) {
+ public Document getLatest(String guid, String sensorType) throws IOException {
Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
return doc.orElse(null);
}
@Override
- public Iterable<Document> getAllLatest(List<GetRequest> getRequests) {
+ public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
Collection<String> guids = new HashSet<>();
Collection<String> sensorTypes = new HashSet<>();
for (GetRequest getRequest : getRequests) {
@@ -80,7 +82,7 @@
}
<T> Optional<T> searchByGuid(String guid, String sensorType,
- Function<SearchHit, Optional<T>> callback) {
+ Function<SearchHit, Optional<T>> callback) throws IOException {
Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
if (results.size() > 0) {
@@ -96,7 +98,7 @@
* If more than one hit happens, the first one will be returned.
*/
<T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
- Function<SearchHit, Optional<T>> callback) {
+ Function<SearchHit, Optional<T>> callback) throws IOException {
if (guids == null || guids.isEmpty()) {
return Collections.emptyList();
}
@@ -113,11 +115,13 @@
for (String guid : guids) {
query = idsQuery.addIds(guid);
}
+ SearchRequest request = new SearchRequest();
+ SearchSourceBuilder builder = new SearchSourceBuilder();
+ builder.query(query);
+ builder.size(guids.size());
+ request.source(builder);
- SearchRequestBuilder request = transportClient.prepareSearch()
- .setQuery(query)
- .setSize(guids.size());
- org.elasticsearch.action.search.SearchResponse response = request.get();
+ org.elasticsearch.action.search.SearchResponse response = transportClient.getHighLevelClient().search(request);
SearchHits hits = response.getHits();
List<T> results = new ArrayList<>();
for (SearchHit hit : hits) {
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
index 5cd0a4d..0b87e56 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -19,24 +19,19 @@
import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.function.Function;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.search.FieldType;
-import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.Group;
import org.apache.metron.indexing.dao.search.GroupOrder;
import org.apache.metron.indexing.dao.search.GroupOrderType;
@@ -50,16 +45,10 @@
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.search.SortField;
import org.apache.metron.indexing.dao.search.SortOrder;
-import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
-import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
@@ -88,12 +77,12 @@
*/
private static final String SORT_MISSING_FIRST = "_first";
- private transient TransportClient client;
+ private transient ElasticsearchClient client;
private AccessConfig accessConfig;
private ElasticsearchColumnMetadataDao columnMetadataDao;
private ElasticsearchRequestSubmitter requestSubmitter;
- public ElasticsearchSearchDao(TransportClient client,
+ public ElasticsearchSearchDao(ElasticsearchClient client,
AccessConfig accessConfig,
ElasticsearchColumnMetadataDao columnMetadataDao,
ElasticsearchRequestSubmitter requestSubmitter) {
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index 6843ac7..c769b2f 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -28,18 +28,19 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
-import org.elasticsearch.client.transport.TransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,11 +48,11 @@
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private transient TransportClient client;
+ private transient ElasticsearchClient client;
private AccessConfig accessConfig;
private ElasticsearchRetrieveLatestDao retrieveLatestDao;
- public ElasticsearchUpdateDao(TransportClient client,
+ public ElasticsearchUpdateDao(ElasticsearchClient client,
AccessConfig accessConfig,
ElasticsearchRetrieveLatestDao searchDao) {
this.client = client;
@@ -68,7 +69,7 @@
IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
try {
- IndexResponse response = client.index(indexRequest).get();
+ IndexResponse response = client.getHighLevelClient().index(indexRequest);
ShardInfo shardInfo = response.getShardInfo();
int failed = shardInfo.getFailed();
@@ -87,7 +88,7 @@
String indexPostfix = ElasticsearchUtils
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
- BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+ BulkRequest bulkRequestBuilder = new BulkRequest();
// Get the indices we'll actually be using for each Document.
for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
@@ -103,7 +104,7 @@
bulkRequestBuilder.add(indexRequest);
}
- BulkResponse bulkResponse = bulkRequestBuilder.get();
+ BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder);
if (bulkResponse.hasFailures()) {
LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage());
throw new IOException(
@@ -181,13 +182,13 @@
return update(newVersion, Optional.empty());
}
- protected String getIndexName(Document update, Optional<String> index, String indexPostFix) {
+ protected String getIndexName(Document update, Optional<String> index, String indexPostFix) throws IOException {
return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
.orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null))
);
}
- protected Optional<String> getIndexName(String guid, String sensorType) {
+ protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
return retrieveLatestDao.searchByGuid(guid,
sensorType,
hit -> Optional.ofNullable(hit.getIndex())
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 98dc66d..47cbd98 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -17,55 +17,35 @@
*/
package org.apache.metron.elasticsearch.utils;
-import static java.lang.String.format;
-
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.utils.HDFSUtils;
-import org.apache.metron.common.utils.ReflectionUtils;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
-import org.apache.metron.netty.utils.NettyRuntimeWrapper;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.codehaus.jackson.map.ObjectMapper;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ElasticsearchUtils {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient";
- private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file";
- private static final String USERNAME_CONFIG_KEY = "es.xpack.username";
- private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user";
-
-
private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
= ThreadLocal.withInitial(() -> new HashMap<>());
@@ -79,10 +59,6 @@
*/
public static final String INDEX_NAME_DELIMITER = "_index";
- public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations) {
- return getIndexFormat(configurations.getGlobalConfig());
- }
-
public static SimpleDateFormat getIndexFormat(Map<String, Object> globalConfig) {
String format = (String) globalConfig.get("es.date.format");
return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new);
@@ -103,135 +79,16 @@
return indexName;
}
- /**
- * Extracts the base index name from a full index name.
- *
- * For example, given an index named 'bro_index_2017.01.01.01', the base
- * index name is 'bro'.
- *
- * @param indexName The full index name including delimiter and date postfix.
- * @return The base index name.
- */
- public static String getBaseIndexName(String indexName) {
-
- String[] parts = indexName.split(INDEX_NAME_DELIMITER);
- if(parts.length < 1 || StringUtils.isEmpty(parts[0])) {
- String msg = format("Unexpected index name; index=%s, delimiter=%s", indexName, INDEX_NAME_DELIMITER);
- throw new IllegalStateException(msg);
- }
-
- return parts[0];
- }
-
- /**
- * Instantiates an Elasticsearch client based on es.client.class, if set. Defaults to
- * org.elasticsearch.transport.client.PreBuiltTransportClient.
- *
- * @param globalConfiguration Metron global config
- * @return
- */
- public static TransportClient getClient(Map<String, Object> globalConfiguration) {
- Set<String> customESSettings = new HashSet<>();
- customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY));
- Settings.Builder settingsBuilder = Settings.builder();
- Map<String, String> esSettings = getEsSettings(globalConfiguration);
- for (Map.Entry<String, String> entry : esSettings.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if (!customESSettings.contains(key)) {
- settingsBuilder.put(key, value);
- }
- }
- settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername"));
- settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s"));
- setXPackSecurityOrNone(settingsBuilder, esSettings);
-
- try {
- LOG.info("Number of available processors in Netty: {}", NettyRuntimeWrapper.availableProcessors());
- // Netty sets available processors statically and if an attempt is made to set it more than
- // once an IllegalStateException is thrown by NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
- // https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
- // https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
- System.setProperty("es.set.netty.runtime.available.processors", "false");
- TransportClient client = createTransportClient(settingsBuilder.build(), esSettings);
- for (HostnamePort hp : getIps(globalConfiguration)) {
- client.addTransportAddress(
- new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
- );
- }
- return client;
- } catch (UnknownHostException exception) {
- throw new RuntimeException(exception);
- }
- }
-
- private static Map<String, String> getEsSettings(Map<String, Object> config) {
- return ConversionUtils
- .convertMap((Map<String, Object>) config.getOrDefault("es.client.settings", new HashMap<String, Object>()),
- String.class);
- }
-
- /*
- * Append Xpack security settings (if any)
- */
- private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) {
-
- if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) {
-
- if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) {
- throw new IllegalArgumentException("X-pack username is required and cannot be empty");
- }
-
- settingsBuilder.put(
- TRANSPORT_CLIENT_USER_KEY,
- esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))
- );
- }
- }
-
- /*
- * Single password on first line
- */
- private static String getPasswordFromFile(String hdfsPath) {
- List<String> lines = null;
- try {
- lines = HDFSUtils.readFile(hdfsPath);
- } catch (IOException e) {
- throw new IllegalArgumentException(
- format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e);
- }
- if (lines.size() == 0) {
- throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath));
- }
- return lines.get(0);
- }
-
- /**
- * Constructs ES transport client from the provided ES settings additional es config
- *
- * @param settings client settings
- * @param esSettings client type to instantiate
- * @return client with provided settings
- */
- private static TransportClient createTransportClient(Settings settings,
- Map<String, String> esSettings) {
- String esClientClassName = (String) esSettings
- .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT);
- return ReflectionUtils
- .createInstance(esClientClassName, new Class[]{Settings.class, Class[].class},
- new Object[]{settings, new Class[0]});
- }
-
public static class HostnamePort {
- String hostname;
- Integer port;
+ public String hostname;
+ public Integer port;
public HostnamePort(String hostname, Integer port) {
this.hostname = hostname;
this.port = port;
}
}
- protected static List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
+ public static List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
Object ipObj = globalConfiguration.get("es.ip");
Object portObj = globalConfiguration.get("es.port");
if(ipObj == null) {
@@ -335,30 +192,29 @@
* @param qb A QueryBuilder that provides the query to be run.
* @return A SearchResponse containing the appropriate results.
*/
- public static SearchResponse queryAllResults(TransportClient transportClient,
+ public static SearchResponse queryAllResults(RestHighLevelClient transportClient,
QueryBuilder qb,
String index,
int pageSize
- ) {
- SearchRequestBuilder searchRequestBuilder = transportClient
- .prepareSearch(index)
- .addStoredField("*")
- .setFetchSource(true)
- .setQuery(qb)
- .setSize(pageSize);
- org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
- .execute()
- .actionGet();
+ ) throws IOException {
+ org.elasticsearch.action.search.SearchRequest request = new org.elasticsearch.action.search.SearchRequest();
+ SearchSourceBuilder builder = new SearchSourceBuilder();
+ builder.query(qb);
+ builder.size(pageSize);
+ builder.fetchSource(true);
+ builder.storedField("*");
+ request.source(builder);
+ request.indices(index);
+
+ org.elasticsearch.action.search.SearchResponse esResponse = transportClient.search(request);
List<SearchResult> allResults = getSearchResults(esResponse);
long total = esResponse.getHits().getTotalHits();
if (total > pageSize) {
int pages = (int) (total / pageSize) + 1;
for (int i = 1; i < pages; i++) {
int from = i * pageSize;
- searchRequestBuilder.setFrom(from);
- esResponse = searchRequestBuilder
- .execute()
- .actionGet();
+ builder.from(from);
+ esResponse = transportClient.search(request);
allResults.addAll(getSearchResults(esResponse));
}
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
new file mode 100644
index 0000000..15bcb4c
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+
+import java.util.HashMap;
+
+/**
+ * Typedef that maps Elasticsearch index name to properties.
+ */
+public class FieldMapping extends AbstractMapDecorator<String, FieldProperties>{
+ public FieldMapping() {
+ super(new HashMap<String, FieldProperties>());
+ }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
new file mode 100644
index 0000000..d116b40
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Typedef that maps Elasticsearch field names to types.
+ */
+public class FieldProperties extends AbstractMapDecorator<String, Object> {
+ public FieldProperties() {
+ super(new HashMap<>());
+ }
+
+ public FieldProperties(Map<String, Object> m) {
+ super(m);
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 4b8dd08..fbdd4fe 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -23,14 +23,15 @@
import org.apache.metron.common.field.FieldNameConverters;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.action.index.IndexRequest;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +54,7 @@
/**
* The Elasticsearch client.
*/
- private transient TransportClient client;
+ private transient ElasticsearchClient client;
/**
* A simple data formatter used to build the appropriate Elasticsearch index name.
@@ -65,7 +66,7 @@
public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
- client = ElasticsearchUtils.getClient(globalConfiguration);
+ client = ElasticsearchClientFactory.create(globalConfiguration);
dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
}
@@ -76,7 +77,7 @@
FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
final String indexPostfix = dateFormat.format(new Date());
- BulkRequestBuilder bulkRequest = client.prepareBulk();
+ BulkRequest bulkRequest = new BulkRequest();
for(JSONObject message: messages) {
JSONObject esDoc = new JSONObject();
@@ -85,22 +86,21 @@
}
String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
- IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, sensorType + "_doc");
- indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString());
+ IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
+ indexRequest.source(esDoc.toJSONString());
String guid = (String)esDoc.get(Constants.GUID);
if(guid != null) {
- indexRequestBuilder.setId(guid);
+ indexRequest.id(guid);
}
Object ts = esDoc.get("timestamp");
if(ts != null) {
- indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
+ indexRequest.timestamp(ts.toString());
}
-
- bulkRequest.add(indexRequestBuilder);
+ bulkRequest.add(indexRequest);
}
- BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+ BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
return buildWriteReponse(tuples, bulkResponse);
}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
index 0a83ee0..c9389c0 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
@@ -18,24 +18,22 @@
package org.apache.metron.elasticsearch.dao;
-import org.elasticsearch.action.ActionFuture;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests the ElasticsearchColumnMetadata class.
@@ -47,7 +45,7 @@
* @return An object to test.
*/
public ElasticsearchColumnMetadataDao setup(String[] indices) {
- return setup(indices, ImmutableOpenMap.of());
+ return setup(indices, new HashMap<>());
}
/**
@@ -57,32 +55,23 @@
*/
public ElasticsearchColumnMetadataDao setup(
String[] indices,
- ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings) {
+ Map<String, FieldMapping> mappings) {
+ ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), mock(RestHighLevelClient.class)) {
+ @Override
+ public String[] getIndices() throws IOException {
+ return indices;
+ }
- AdminClient adminClient = mock(AdminClient.class);
- IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
- GetIndexRequestBuilder getIndexRequestBuilder = mock(GetIndexRequestBuilder.class);
- GetIndexResponse getIndexResponse = mock(GetIndexResponse.class);
- ActionFuture getMappingsActionFuture = mock(ActionFuture.class);
- GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
-
- // setup the mocks so that a set of indices are available to the DAO
- when(adminClient.indices()).thenReturn(indicesAdminClient);
- when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder);
- when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder);
- when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse);
- when(getIndexResponse.getIndices()).thenReturn(indices);
-
- // setup the mocks so that a set of mappings are available to the DAO
- when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture);
- when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse);
- when(getMappingsResponse.getMappings()).thenReturn(mappings);
-
- return new ElasticsearchColumnMetadataDao(adminClient);
+ @Override
+ public Map<String, FieldMapping> getMappingByIndex(String[] indices) throws IOException {
+ return mappings;
+ }
+ };
+ return new ElasticsearchColumnMetadataDao(client);
}
@Test
- public void testGetOneLatestIndex() {
+ public void testGetOneLatestIndex() throws IOException {
// setup
String[] existingIndices = new String[] {
@@ -105,7 +94,7 @@
}
@Test
- public void testGetLatestIndices() {
+ public void testGetLatestIndices() throws IOException {
// setup
String[] existingIndices = new String[] {
"bro_index_2017.10.03.19",
@@ -127,7 +116,7 @@
}
@Test
- public void testLatestIndicesWhereNoneExist() {
+ public void testLatestIndicesWhereNoneExist() throws IOException {
// setup - there are no existing indices
String[] existingIndices = new String[] {};
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 6c3c327..6dc01a4 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -29,6 +29,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.search.FieldType;
@@ -37,7 +39,8 @@
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SortField;
import org.apache.metron.indexing.dao.search.SortOrder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@@ -90,7 +93,8 @@
requestSubmitter = mock(ElasticsearchRequestSubmitter.class);
when(requestSubmitter.submitSearch(any())).thenReturn(response);
- TransportClient client = mock(TransportClient.class);
+ RestHighLevelClient highLevel = mock(RestHighLevelClient.class);
+ ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevel);
// provides configuration
AccessConfig config = mock(AccessConfig.class);
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 07019c3..7a84588 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -18,17 +18,19 @@
package org.apache.metron.elasticsearch.dao;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchShardTarget;
import org.junit.Test;
-import org.mockito.Mockito;
+
+import java.io.IOException;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
@@ -39,21 +41,20 @@
private ElasticsearchRequestSubmitter submitter;
- public ElasticsearchRequestSubmitter setup(SearchResponse response) {
+ public ElasticsearchRequestSubmitter setup(SearchResponse response) throws IOException {
// mocks
- TransportClient client = mock(TransportClient.class);
- ActionFuture future = Mockito.mock(ActionFuture.class);
+ RestHighLevelClient highLevelClient = mock(RestHighLevelClient.class);
+ ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevelClient);
// the client should return the given search response
- when(client.search(any())).thenReturn(future);
- when(future.actionGet()).thenReturn(response);
+ when(highLevelClient.search(any())).thenReturn(response);
return new ElasticsearchRequestSubmitter(client);
}
@Test
- public void searchShouldSucceedWhenOK() throws InvalidSearchException {
+ public void searchShouldSucceedWhenOK() throws InvalidSearchException, IOException {
// mocks
SearchResponse response = mock(SearchResponse.class);
@@ -71,7 +72,7 @@
}
@Test(expected = InvalidSearchException.class)
- public void searchShouldFailWhenNotOK() throws InvalidSearchException {
+ public void searchShouldFailWhenNotOK() throws InvalidSearchException, IOException {
// mocks
SearchResponse response = mock(SearchResponse.class);
@@ -88,7 +89,7 @@
}
@Test
- public void searchShouldHandleShardFailure() throws InvalidSearchException {
+ public void searchShouldHandleShardFailure() throws InvalidSearchException, IOException {
// mocks
SearchResponse response = mock(SearchResponse.class);
SearchRequest request = new SearchRequest();
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
index 3b48a60..3b7f132 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
@@ -18,30 +18,32 @@
package org.apache.metron.elasticsearch.dao;
+import static org.mockito.Mockito.mock;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.UpdateDaoTest;
import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Before;
-import static org.mockito.Mockito.mock;
-
/**
* This class returns the ElasticsearchUpdateDao implementation to be used in UpdateDaoTest. UpdateDaoTest contains a
* common set of tests that all Dao implementations must pass.
*/
public class ElasticsearchUpdateDaoTest extends UpdateDaoTest {
- private TransportClient client;
private AccessConfig accessConfig;
private ElasticsearchRetrieveLatestDao retrieveLatestDao;
private ElasticsearchUpdateDao updateDao;
@Before
public void setup() {
- client = mock(TransportClient.class);
accessConfig = new AccessConfig();
retrieveLatestDao = mock(ElasticsearchRetrieveLatestDao.class);
+ RestHighLevelClient highLevel = mock(RestHighLevelClient.class);
+ ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevel);
updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao);
}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
index c05efc1..03b1639 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
@@ -144,7 +144,7 @@
Map<String, Object> globalConfig = new HashMap<String, Object>() {
{
put("es.clustername", "metron");
- put("es.port", "9300");
+ put("es.port", "9200");
put("es.ip", "localhost");
put("es.date.format", DATE_FORMAT);
}
@@ -334,11 +334,8 @@
}
@Override
- protected void setupTypings() {
- ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX)
- .setType("test_doc")
- .setSource(nestedAlertMapping)
- .get();
+ protected void setupTypings() throws IOException {
+ ((ElasticsearchDao) esDao).getClient().putMapping(INDEX, "test_doc", nestedAlertMapping);
}
@Override
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 8187468..d03da0e 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -18,6 +18,10 @@
package org.apache.metron.elasticsearch.integration;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@@ -25,8 +29,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.indexing.dao.AccessConfig;
@@ -39,10 +48,13 @@
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.integration.InMemoryComponent;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -57,33 +69,38 @@
private static String dateFormat = "yyyy.MM.dd.HH";
private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template";
private static String snortTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template";
- private static final int MAX_RETRIES = 10;
- private static final int SLEEP_MS = 500;
+ protected static final String BRO_INDEX = "bro_index_2017.01.01.01";
+ protected static final String SNORT_INDEX = "snort_index_2017.01.01.02";
+ protected static Map<String, Object> globalConfig;
+ protected static RestClient lowLevelClient;
+ protected static RestHighLevelClient highLevelClient;
protected static IndexDao dao;
@BeforeClass
public static void setup() throws Exception {
indexComponent = startIndex();
- dao = createDao();
+ globalConfig = new HashMap<String, Object>() {{
+ put("es.clustername", "metron");
+ put("es.port", "9200");
+ put("es.ip", "localhost");
+ put("es.date.format", dateFormat);
+ }};
+ ElasticsearchClient esClient = ElasticsearchClientFactory.create(globalConfig);
+ lowLevelClient = esClient.getLowLevelClient();
+ highLevelClient = esClient.getHighLevelClient();
+ dao = createDao(globalConfig);
// The data is all static for searches, so we can set it up beforehand, and it's faster
loadTestData();
}
- protected static IndexDao createDao() {
- AccessConfig config = new AccessConfig();
- config.setMaxSearchResults(100);
- config.setMaxSearchGroups(100);
- config.setGlobalConfigSupplier( () ->
- new HashMap<String, Object>() {{
- put("es.clustername", "metron");
- put("es.port", "9300");
- put("es.ip", "localhost");
- put("es.date.format", dateFormat);
- }}
- );
+ protected static IndexDao createDao(Map<String, Object> globalConfig) {
+ AccessConfig accessConfig = new AccessConfig();
+ accessConfig.setMaxSearchResults(100);
+ accessConfig.setMaxSearchGroups(100);
+ accessConfig.setGlobalConfigSupplier(() -> globalConfig);
IndexDao dao = new ElasticsearchDao();
- dao.init(config);
+ dao.init(accessConfig);
return dao;
}
@@ -97,51 +114,64 @@
}
protected static void loadTestData() throws ParseException, IOException {
- ElasticSearchComponent es = (ElasticSearchComponent) indexComponent;
-
+ // add bro template
JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class);
addTestFieldMappings(broTemplate, "bro_doc");
- es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01")
- .addMapping("bro_doc", JSONUtils.INSTANCE.toJSON(broTemplate.get("mappings"), false)).get();
+ String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true);
+ HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON);
+ Response response = lowLevelClient.performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity);
+ assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+ // add snort template
JSONObject snortTemplate = JSONUtils.INSTANCE.load(new File(snortTemplatePath), JSONObject.class);
addTestFieldMappings(snortTemplate, "snort_doc");
- es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02")
- .addMapping("snort_doc", JSONUtils.INSTANCE.toJSON(snortTemplate.get("mappings"), false)).get();
+ String snortTemplateJson = JSONUtils.INSTANCE.toJSON(snortTemplate, true);
+ HttpEntity snortEntity = new NStringEntity(snortTemplateJson, ContentType.APPLICATION_JSON);
+ response = lowLevelClient.performRequest("PUT", "/_template/snort_template", Collections.emptyMap(), snortEntity);
+ assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+ // create bro index
+ response = lowLevelClient.performRequest("PUT", BRO_INDEX);
+ assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+ // create snort index
+ response = lowLevelClient.performRequest("PUT", SNORT_INDEX);
+ assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
- BulkRequestBuilder bulkRequest = es.getClient().prepareBulk()
- .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
- JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
- for (Object o : broArray) {
- JSONObject jsonObject = (JSONObject) o;
- IndexRequestBuilder indexRequestBuilder = es.getClient()
- .prepareIndex("bro_index_2017.01.01.01", "bro_doc");
- indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid"));
- indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
- indexRequestBuilder = indexRequestBuilder
- .setTimestamp(jsonObject.get("timestamp").toString());
- bulkRequest.add(indexRequestBuilder);
+ JSONArray broRecords = (JSONArray) new JSONParser().parse(broData);
+
+ BulkRequest bulkRequest = new BulkRequest();
+ for (Object o : broRecords) {
+ JSONObject json = (JSONObject) o;
+ IndexRequest indexRequest = new IndexRequest(BRO_INDEX, "bro_doc", (String) json.get("guid"));
+ indexRequest.source(json);
+ indexRequest.timestamp(json.get("timestamp").toString());
+ bulkRequest.add(indexRequest);
}
- JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
- for (Object o : snortArray) {
- JSONObject jsonObject = (JSONObject) o;
- IndexRequestBuilder indexRequestBuilder = es.getClient()
- .prepareIndex("snort_index_2017.01.01.02", "snort_doc");
- indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid"));
- indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
- indexRequestBuilder = indexRequestBuilder
- .setTimestamp(jsonObject.get("timestamp").toString());
- bulkRequest.add(indexRequestBuilder);
+ bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
+ BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest);
+ assertFalse(bulkResponse.hasFailures());
+ assertThat(bulkResponse.status().getStatus(), equalTo(200));
+
+ JSONArray snortRecords = (JSONArray) new JSONParser().parse(snortData);
+
+ bulkRequest = new BulkRequest();
+ for (Object o : snortRecords) {
+ JSONObject json = (JSONObject) o;
+ IndexRequest indexRequest = new IndexRequest(SNORT_INDEX, "snort_doc", (String) json.get("guid"));
+ indexRequest.source(json);
+ indexRequest.timestamp(json.get("timestamp").toString());
+ bulkRequest.add(indexRequest);
}
- BulkResponse bulkResponse = bulkRequest.execute().actionGet();
- if (bulkResponse.hasFailures()) {
- throw new RuntimeException("Failed to index test data");
- }
+ bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
+ bulkResponse = highLevelClient.bulk(bulkRequest);
+ assertFalse(bulkResponse.hasFailures());
+ assertThat(bulkResponse.status().getStatus(), equalTo(200));
}
/**
* Add test fields to a template with defined types in case they are not defined in the sensor template shipped with Metron.
* This is useful for testing certain cases, for example faceting on fields of various types.
- * @param template
+ * Template follows this pattern:
+ * { "mappings" : { "xxx_doc" : { "properties" : { ... }}}}
+ * @param template - this method has side effects - template is modified with field mappings.
* @param docType
*/
private static void addTestFieldMappings(JSONObject template, String docType) {
@@ -167,12 +197,11 @@
dao.search(request);
}
-
-
@Override
public void returns_column_metadata_for_specified_indices() throws Exception {
// getColumnMetadata with only bro
{
+ Assert.assertEquals(262, dao.getColumnMetadata(Collections.singletonList("bro")).size());
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro"));
Assert.assertEquals(262, fieldTypes.size());
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method"));
@@ -187,11 +216,11 @@
Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point"));
- Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("ttl"));
Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
}
// getColumnMetadata with only snort
{
+ Assert.assertEquals(32, dao.getColumnMetadata(Collections.singletonList("snort")).size());
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort"));
Assert.assertEquals(32, fieldTypes.size());
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator"));
@@ -213,6 +242,7 @@
@Override
public void returns_column_data_for_multiple_indices() throws Exception {
+ Assert.assertEquals(277, dao.getColumnMetadata(Arrays.asList("bro", "snort")).size());
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
Assert.assertEquals(277, fieldTypes.size());
@@ -273,9 +303,9 @@
@Override
protected String getIndexName(String sensorType) {
if ("bro".equals(sensorType)) {
- return "bro_index_2017.01.01.01";
+ return BRO_INDEX;
} else {
- return "snort_index_2017.01.01.02";
+ return SNORT_INDEX;
}
}
}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index c5c0bc1..6f36790 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -104,7 +104,7 @@
protected static Map<String, Object> createGlobalConfig() {
return new HashMap<String, Object>() {{
put("es.clustername", "metron");
- put("es.port", "9300");
+ put("es.port", "9200");
put("es.ip", "localhost");
put("es.date.format", dateFormat);
}};
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index 45b4d60..3e14c00 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -135,7 +135,7 @@
.put("path.data",dataDir.getAbsolutePath())
.put("path.home", indexDir.getAbsoluteFile())
.put("transport.type", "netty4")
- .put("http.enabled", "false");
+ .put("http.enabled", "true");
if (extraElasticSearchSettings != null) {
settingsBuilder = settingsBuilder.put(extraElasticSearchSettings);
@@ -277,7 +277,9 @@
@Override
public void stop() {
try {
- node.close();
+ if(node != null) {
+ node.close();
+ }
} catch (IOException e) {
throw new RuntimeException("Unable to stop node." , e);
}
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java
index e8b9f26..cbbe9ee 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java
@@ -22,6 +22,8 @@
import org.apache.metron.indexing.dao.search.SearchDao;
import org.apache.metron.indexing.dao.search.SearchResponse;
+import java.io.IOException;
+
public interface MetaAlertSearchDao extends SearchDao {
/**
@@ -30,6 +32,6 @@
* @return All meta alerts with a child alert having the GUID
* @throws InvalidSearchException If a problem occurs with the search
*/
- SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException;
+ SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException;
}
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
index 7e28853..24989b4 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
@@ -1095,7 +1095,7 @@
protected abstract long getMatchingMetaAlertCount(String fieldName, String fieldValue)
throws IOException, InterruptedException;
- protected abstract void setupTypings();
+ protected abstract void setupTypings() throws IOException;
// Get the base index name without any adjustments (e.g. without ES's "_index")
protected abstract String getTestIndexName();
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
index 9292f72..f7d45a7 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
@@ -1,7 +1,7 @@
{
"es.clustername": "metron",
"es.ip": "localhost",
- "es.port": 9300,
+ "es.port": 9200,
"es.date.format": "yyyy.MM.dd.HH",
"solr.zookeeper": "localhost:2181",