| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.common.zookeeper; |
| |
| import java.nio.charset.StandardCharsets; |
| import org.adrianwalker.multilinestring.Multiline; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.metron.TestConstants; |
| import org.apache.metron.common.configuration.*; |
| import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; |
| import org.apache.metron.common.configuration.profiler.ProfilerConfig; |
| import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; |
| import org.apache.metron.common.utils.JSONUtils; |
| import org.apache.metron.integration.components.ZKServerComponent; |
| import org.junit.*; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.util.Map; |
| |
| import static org.apache.metron.integration.utils.TestUtils.assertEventually; |
| |
| public class ZKConfigurationsCacheIntegrationTest { |
| private CuratorFramework client; |
| |
| /** |
| { |
| "profiles": [ |
| { |
| "profile": "example2", |
| "foreach": "ip_src_addr", |
| "onlyif": "protocol == 'HTTP'", |
| "init": { |
| "total_bytes": 0.0 |
| }, |
| "update": { |
| "total_bytes": "total_bytes + bytes_in" |
| }, |
| "result": "total_bytes", |
| "expires": 30 |
| } |
| ] |
| } |
| */ |
| @Multiline |
| public static String profilerConfig; |
| /** |
| { |
| "hdfs" : { |
| "index": "yaf", |
| "batchSize": 1, |
| "enabled" : true |
| }, |
| "elasticsearch" : { |
| "index": "yaf", |
| "batchSize": 25, |
| "batchTimeout": 7, |
| "enabled" : false |
| }, |
| "solr" : { |
| "index": "yaf", |
| "batchSize": 5, |
| "enabled" : false |
| } |
| } |
| */ |
| @Multiline |
| public static String testIndexingConfig; |
| |
| /** |
| { |
| "enrichment": { |
| "fieldMap": { } |
| ,"fieldToTypeMap": { } |
| }, |
| "threatIntel": { |
| "fieldMap": { }, |
| "fieldToTypeMap": { }, |
| "triageConfig" : { } |
| } |
| } |
| */ |
| @Multiline |
| public static String testEnrichmentConfig; |
| |
| /** |
| { |
| "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser", |
| "sensorTopic":"brop", |
| "parserConfig": {} |
| } |
| */ |
| @Multiline |
| public static String testParserConfig; |
| |
| /** |
| *{ |
| "es.clustername": "metron", |
| "es.ip": "localhost", |
| "es.port": 9300, |
| "es.date.format": "yyyy.MM.dd.HH" |
| } |
| */ |
| @Multiline |
| public static String globalConfig; |
| |
| public ConfigurationsCache cache; |
| |
| public ZKServerComponent zkComponent; |
| |
| @Before |
| public void setup() throws Exception { |
| zkComponent = new ZKServerComponent(); |
| zkComponent.start(); |
| client = ConfigurationsUtils.getClient(zkComponent.getConnectionString()); |
| client.start(); |
| cache = new ZKConfigurationsCache(client); |
| cache.start(); |
| { |
| //parser |
| byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.PARSER_CONFIGS_PATH + "/parsers/bro.json"))); |
| ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", config, client); |
| } |
| { |
| //indexing |
| byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/indexing/test.json"))); |
| ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", config, client); |
| } |
| { |
| //enrichments |
| byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json"))); |
| ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", config, client); |
| } |
| { |
| //enrichments |
| byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json"))); |
| ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", config, client); |
| } |
| { |
| //profiler |
| byte[] config = IOUtils.toByteArray(new FileInputStream(new File("src/test/resources/profiler/profiler.json"))); |
| ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client); |
| } |
| { |
| //global config |
| byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/global.json"))); |
| ConfigurationsUtils.writeGlobalConfigToZookeeper(config, client); |
| } |
| } |
| |
| @After |
| public void teardown() throws Exception { |
| if(cache != null) { |
| cache.close(); |
| } |
| if(client != null) { |
| client.close(); |
| } |
| if(zkComponent != null) { |
| zkComponent.stop(); |
| } |
| } |
| |
| |
| @Test |
| public void validateDelete() throws Exception { |
| client.delete().forPath(ConfigurationType.GLOBAL.getZookeeperRoot()); |
| client.delete().forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/test"); |
| client.delete().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/test"); |
| client.delete().forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro"); |
| client.delete().forPath(ConfigurationType.PROFILER.getZookeeperRoot() ); |
| //global |
| { |
| IndexingConfigurations config = cache.get( IndexingConfigurations.class); |
| assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); |
| } |
| //indexing |
| { |
| IndexingConfigurations config = cache.get( IndexingConfigurations.class); |
| assertEventually(() -> Assert.assertNull(config.getSensorIndexingConfig("test", false))); |
| assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); |
| } |
| //enrichment |
| { |
| EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class); |
| assertEventually(() -> Assert.assertNull(config.getSensorEnrichmentConfig("test"))); |
| assertEventually(()-> Assert.assertNull(config.getGlobalConfig(false))); |
| } |
| //parser |
| { |
| ParserConfigurations config = cache.get( ParserConfigurations.class); |
| assertEventually(() -> Assert.assertNull(config.getSensorParserConfig("bro"))); |
| assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); |
| } |
| //profiler |
| { |
| ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); |
| assertEventually(() -> Assert.assertNull(config.getProfilerConfig())); |
| assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); |
| } |
| } |
| |
| @Test |
| public void validateUpdate() throws Exception { |
| ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", testIndexingConfig.getBytes( |
| StandardCharsets.UTF_8), client); |
| ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.getBytes(StandardCharsets.UTF_8), client); |
| ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", testEnrichmentConfig.getBytes( |
| StandardCharsets.UTF_8), client); |
| ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", testParserConfig.getBytes( |
| StandardCharsets.UTF_8), client); |
| ConfigurationsUtils.writeProfilerConfigToZookeeper( profilerConfig.getBytes( |
| StandardCharsets.UTF_8), client); |
| //indexing |
| { |
| Map<String, Object> expectedConfig = JSONUtils.INSTANCE.load(testIndexingConfig, JSONUtils.MAP_SUPPLIER); |
| IndexingConfigurations config = cache.get( IndexingConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorIndexingConfig("test"))); |
| } |
| //enrichment |
| { |
| SensorEnrichmentConfig expectedConfig = JSONUtils.INSTANCE.load(testEnrichmentConfig, SensorEnrichmentConfig.class); |
| Map<String, Object> expectedGlobalConfig = JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER); |
| EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorEnrichmentConfig("test"))); |
| assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); |
| } |
| //parsers |
| { |
| SensorParserConfig expectedConfig = JSONUtils.INSTANCE.load(testParserConfig, SensorParserConfig.class); |
| ParserConfigurations config = cache.get( ParserConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorParserConfig("bro"))); |
| } |
| //profiler |
| { |
| ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(profilerConfig, ProfilerConfig.class); |
| ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig())); |
| } |
| } |
| |
| @Test |
| public void validateBaseWrite() throws Exception { |
| File globalConfigFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/global.json"); |
| Map<String, Object> expectedGlobalConfig = JSONUtils.INSTANCE.load(globalConfigFile, JSONUtils.MAP_SUPPLIER); |
| //indexing |
| { |
| File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/indexing/test.json"); |
| Map<String, Object> expectedConfig = JSONUtils.INSTANCE.load(inFile, JSONUtils.MAP_SUPPLIER); |
| IndexingConfigurations config = cache.get( IndexingConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorIndexingConfig("test"))); |
| assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); |
| assertEventually(() -> Assert.assertNull(config.getSensorIndexingConfig("notthere", false))); |
| } |
| //enrichment |
| { |
| File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json"); |
| SensorEnrichmentConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, SensorEnrichmentConfig.class); |
| EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorEnrichmentConfig("test"))); |
| assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); |
| assertEventually(() -> Assert.assertNull(config.getSensorEnrichmentConfig("notthere"))); |
| } |
| //parsers |
| { |
| File inFile = new File(TestConstants.PARSER_CONFIGS_PATH + "/parsers/bro.json"); |
| SensorParserConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, SensorParserConfig.class); |
| ParserConfigurations config = cache.get( ParserConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorParserConfig("bro"))); |
| assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); |
| assertEventually(() -> Assert.assertNull(config.getSensorParserConfig("notthere"))); |
| } |
| //profiler |
| { |
| File inFile = new File("src/test/resources/profiler/profiler.json"); |
| ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class); |
| ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); |
| assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig())); |
| assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); |
| } |
| } |
| } |