blob: d363e91331d020c65f1826b3998cbc588ab62263 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.rest.config;
import kafka.admin.AdminUtils$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.IOUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.metron.hbase.client.HBaseClient;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.job.manager.InMemoryJobManager;
import org.apache.metron.job.manager.JobManager;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.mock.*;
import org.apache.metron.rest.service.StormStatusService;
import org.apache.metron.rest.service.impl.CachedStormStatusServiceImpl;
import org.apache.metron.rest.service.impl.PcapToPdmlScriptWrapper;
import org.apache.metron.rest.service.impl.StormCLIWrapper;
import org.apache.metron.rest.user.UserSettingsClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.web.client.RestTemplate;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
@Configuration
@Profile(TEST_PROFILE)
public class TestConfig {
static {
MockHBaseTableProvider.addToCache("updates", "t");
}
@Bean
public Properties zkProperties() {
return new Properties();
}
@Bean
public ZKServerComponent zkServerComponent(Properties zkProperties) {
return new ZKServerComponent()
.withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()));
}
@Bean
public KafkaComponent kafkaWithZKComponent(Properties zkProperties) {
return new KafkaComponent().withTopologyProperties(zkProperties);
}
@Bean(initMethod = "start", destroyMethod="close")
public ConfigurationsCache cache(CuratorFramework client) {
return new ZKConfigurationsCache( client
, ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT
, ZKConfigurationsCache.ConfiguredTypes.PARSER
, ZKConfigurationsCache.ConfiguredTypes.INDEXING
);
}
@Bean(destroyMethod = "stop")
public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) {
ComponentRunner runner = new ComponentRunner.Builder()
.withComponent("zk", zkServerComponent)
.withCustomShutdownOrder(new String[]{"search", "zk"})
.build();
try {
runner.start();
File globalConfigFile = new File("src/test/resources/zookeeper/global.json");
try(BufferedReader r = new BufferedReader(new InputStreamReader(new FileInputStream(globalConfigFile), StandardCharsets.UTF_8))){
String globalConfig = IOUtils.toString(r);
ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.getBytes(
StandardCharsets.UTF_8), zkServerComponent.getConnectionString());
} catch (Exception e) {
throw new IllegalStateException("Unable to upload global config", e);
}
} catch (UnableToStartException e) {
e.printStackTrace();
}
return runner;
}
@Bean(initMethod = "start", destroyMethod = "close")
public CuratorFramework client(ComponentRunner componentRunner) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class);
return CuratorFrameworkFactory.newClient(zkServerComponent.getConnectionString(), retryPolicy);
}
@Bean(destroyMethod = "close")
public ZkClient zkClient(ComponentRunner componentRunner) {
ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class);
return new ZkClient(zkServerComponent.getConnectionString(), 10000, 10000, ZKStringSerializer$.MODULE$);
}
@Bean
public ZkUtils zkUtils(ZkClient zkClient) {
return ZkUtils.apply(zkClient, false);
}
@Bean
public Map<String, Object> kafkaConsumer(KafkaComponent kafkaWithZKComponent) {
Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList());
props.put("group.id", "metron-config");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
@Bean
public ConsumerFactory<String, String> createConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaConsumer(kafkaWithZKComponent(zkProperties())));
}
@Bean
public Map<String, Object> producerProperties(KafkaComponent kafkaWithZKComponent) {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList());
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("request.required.acks", 1);
return producerConfig;
}
@Bean
public KafkaProducer kafkaProducer(KafkaComponent kafkaWithZKComponent) {
return new KafkaProducer<>(producerProperties(kafkaWithZKComponent));
}
@Bean
public StormCLIWrapper stormCLIClientWrapper() {
return new MockStormCLIClientWrapper();
}
@Bean
public RestTemplate restTemplate(StormCLIWrapper stormCLIClientWrapper) {
MockStormRestTemplate restTemplate = new MockStormRestTemplate();
restTemplate.setMockStormCLIClientWrapper((MockStormCLIClientWrapper) stormCLIClientWrapper);
return restTemplate;
}
@Bean
public AdminUtils$ adminUtils() {
return AdminUtils$.MODULE$;
}
@Bean()
public UserSettingsClient userSettingsClient() throws RestException, IOException {
return new UserSettingsClient(new MockHBaseTableProvider().addToCache("user_settings", "cf"), Bytes.toBytes("cf"));
}
@Bean()
public HBaseClient hBaseClient() throws RestException, IOException {
final String cf = "t";
final String cq = "v";
Table table = MockHBaseTableProvider.addToCache("enrichment_list", cf);
List<String> enrichmentTypes = new ArrayList<String>() {{
add("foo");
add("bar");
add("baz");
}};
for (String type : enrichmentTypes) {
Put put = new Put(Bytes.toBytes(type));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), "{}".getBytes(StandardCharsets.UTF_8));
table.put(put);
}
return new HBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(),
"enrichment_list");
}
@Bean
public JobManager jobManager() {
return new InMemoryJobManager();
}
@Bean
public MockPcapJob mockPcapJob() {
return new MockPcapJob();
}
@Bean
public PcapJobSupplier pcapJobSupplier(MockPcapJob mockPcapJob) {
MockPcapJobSupplier mockPcapJobSupplier = new MockPcapJobSupplier();
mockPcapJobSupplier.setMockPcapJob(mockPcapJob);
return mockPcapJobSupplier;
}
@Bean
public PcapToPdmlScriptWrapper pcapToPdmlScriptWrapper() {
return new MockPcapToPdmlScriptWrapper();
}
@Bean
public StormStatusService stormStatusService(
@Autowired @Qualifier("StormStatusServiceImpl") StormStatusService wrappedService) {
long maxCacheSize = 0L;
long maxCacheTimeoutSeconds = 0L;
return new CachedStormStatusServiceImpl(wrappedService, maxCacheSize, maxCacheTimeoutSeconds);
}
}