blob: eca79b8aeef913738d180f2616a678e23f3393f2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.alerts.server;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
public class AlertsSearcher implements Runnable {
private static final Logger logger = LoggerFactory.getLogger( AlertsSearcher.class );
@Inject
private Properties configProps;
// TODO: inject Searcher module for either ElasticSearch or Solr...
// TODO: inject MetronServiceFactory here
public final Map<String, AlertsFilterCacheEntry> alertsFilterCache = new HashMap<String, AlertsFilterCacheEntry>();
MessageDigest md;
public AlertsSearcher() throws NoSuchAlgorithmException
{
md = MessageDigest.getInstance("SHA-256");
}
@Override
public void run() {
try
{
logger.debug( "Doing Elastic Search search..." );
long currentSearchTime = System.currentTimeMillis();
long lastSearchTime = 0L;
// look for a marker that tells us the last time we ran...
String homeDir = configProps.getProperty("homeDir");
if( homeDir.endsWith( "/" )) {
homeDir = homeDir.substring(0, homeDir.length()-1);
}
logger.info( "using homeDir = " + homeDir );
File searcherStateFile = new File( homeDir + "/searcherState.properties" );
if( searcherStateFile.exists() )
{
logger.info( "found existing searcherState.properties file" );
FileInputStream fis = null;
try {
fis = new FileInputStream( searcherStateFile );
Properties searcherState = new Properties();
searcherState.load(fis);
lastSearchTime = Long.parseLong( searcherState.getProperty("lastSearchTime"));
}
catch( FileNotFoundException e ) {
logger.error( "Error locating lastSearchTime value from state file", e );
} catch (IOException e) {
logger.error( "Error locating lastSearchTime value from state file", e );
}
finally
{
try {
fis.close();
} catch (IOException e) {
logger.error( "Probably ignorable error closing file stream: ", e );
}
}
}
else
{
// nothing to do here. We'll write out our lastSearchTime at the end
logger.info( "No existing searcherState.properties found" );
}
// search for alerts newer than "lastSearchTime"
Settings settings = ImmutableSettings.settingsBuilder()
.put("client.transport.sniff", true).build();
// .put("cluster.name", "elasticsearch").build();
Client client = null;
try
{
logger.info( "initializing elasticsearch client" );
String elasticSearchHostName = configProps.getProperty( "elasticSearchHostName", "localhost" );
int elasticSearchHostPort = Integer.parseInt(configProps.getProperty( "elasticSearchHostPort", "9300" ) );
client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(elasticSearchHostName, elasticSearchHostPort));
logger.info( "lastSearchTime: " + lastSearchTime );
String alertsIndexes = configProps.getProperty( "alertsIndexes", "alerts" );
String[] alertsIndexArray = alertsIndexes.split(",");
String alertsTypes = configProps.getProperty( "alertsTypes", "" );
String[] alertsTypesArray = alertsTypes.split( ",");
String alertsQueryFieldName = configProps.getProperty( "alertQueryFieldName", "alert.source" );
String alertsQueryFieldValue = configProps.getProperty( "alertsQueryFieldValue", "*" );
logger.info( "alertsIndexes: " + alertsIndexes );
String[] foo = new String[1];
SearchResponse response = client.prepareSearch( alertsIndexArray )
.setTypes( alertsTypesArray )
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.addField("_source")
.setQuery( QueryBuilders.boolQuery().must( QueryBuilders.wildcardQuery( alertsQueryFieldName, alertsQueryFieldValue ) )
.must( QueryBuilders.rangeQuery("message.timestamp").from(lastSearchTime).to(System.currentTimeMillis()).includeLower(true).includeUpper(false)))
.execute()
.actionGet();
SearchHits hits = response.getHits();
logger.debug( "Total hits: " + hits.getTotalHits());
// for all hits, put the alert onto the Kafka topic.
for( SearchHit hit : hits )
{
// calculate hash for this hit...
String sourceData = hit.getSourceAsString();
String hash = new String( md.digest(sourceData.getBytes()));
if( alertsFilterCache.containsKey(hash))
{
logger.warn( "We have already seen this Alert, so skipping..." );
continue;
}
else
{
long timeNow = System.currentTimeMillis();
AlertsFilterCacheEntry cacheEntry = new AlertsFilterCacheEntry( sourceData, timeNow );
alertsFilterCache.put(hash, cacheEntry);
}
doSenderWork(hit);
}
}
finally
{
if( client != null )
{
client.close();
}
}
// record the time we just searched
FileOutputStream fos = null;
try {
fos = new FileOutputStream( searcherStateFile );
Properties searcherState = new Properties();
searcherState.setProperty( "lastSearchTime", Long.toString(currentSearchTime));
searcherState.store(fos, "");
}
catch( FileNotFoundException e ) {
logger.error( "Error saving lastSearchTime: ", e );
} catch (IOException e) {
logger.error( "Error saving lastSearchTime: ", e );
}
finally {
try {
fos.close();
}
catch (IOException e) {
logger.error( "Probably ignorable error closing file stream: ", e );
}
}
logger.info( "Done with ElasticSearch search... " );
}
catch( Exception e )
{
logger.error( "Unexpected error while searching ElasticSearch index:", e );
}
}
private void doSenderWork( SearchHit hit )
{
String kafkaBrokerHostName = configProps.getProperty("kafkaBrokerHostName", "localhost" );
String kafkaBrokerHostPort = configProps.getProperty("kafkaBrokerHostPort", "9092" );
String kafkaTopicName = configProps.getProperty("kafkaTopicName", "test" );
logger.debug( "kafkaBrokerHostName: " + kafkaBrokerHostName );
logger.debug( "kafkaBrokerHostPort: " + kafkaBrokerHostPort );
logger.debug( "kafkaTopicName: " + kafkaTopicName );
String sourceData = hit.getSourceAsString();
logger.debug( "Source Data: " + sourceData );
Properties props = new Properties();
props.put("metadata.broker.list", kafkaBrokerHostName + ":" + kafkaBrokerHostPort );
props.put("serializer.class", "kafka.serializer.StringEncoder");
// props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>(kafkaTopicName, "", sourceData );
producer.send(data);
}
}