blob: 8a1bba1d465fe72d7ddd918d7a1d573c83c5bcc3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: "enrichment"
config:
topology.workers: ${enrichment.workers}
topology.acker.executors: ${enrichment.acker.executors}
topology.worker.childopts: ${topology.worker.childopts}
topology.auto-credentials: ${topology.auto-credentials}
topology.max.spout.pending: ${topology.max.spout.pending}
components:
# Enrichment
- id: "stellarEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
configMethods:
- name: "ofType"
args:
- "ENRICHMENT"
# Any kafka props for the producer go here.
- id: "kafkaWriterProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
- id: "stellarEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "stellar"
- ref: "stellarEnrichmentAdapter"
- id: "geoEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
- id: "geoEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "geo"
- ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${enrichment.host.known_hosts}'
- id: "hostEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "host"
- ref: "hostEnrichmentAdapter"
- id: "simpleHBaseEnrichmentConfig"
className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
configMethods:
- name: "withProviderImpl"
args:
- "${hbase.provider.impl}"
- name: "withHBaseTable"
args:
- "${enrichment.simple.hbase.table}"
- name: "withHBaseCF"
args:
- "${enrichment.simple.hbase.cf}"
- id: "simpleHBaseEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
configMethods:
- name: "withConfig"
args:
- ref: "simpleHBaseEnrichmentConfig"
- id: "simpleHBaseEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "hbaseEnrichment"
- ref: "simpleHBaseEnrichmentAdapter"
- id: "enrichments"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- ref: "geoEnrichment"
- name: "add"
args:
- ref: "hostEnrichment"
- name: "add"
args:
- ref: "simpleHBaseEnrichment"
- name: "add"
args:
- ref: "stellarEnrichment"
#enrichment error
- id: "enrichmentErrorKafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args:
- "${enrichment.error.topic}"
- name: "withZkQuorum"
args:
- "${kafka.zk}"
- name: "withProducerConfigs"
args:
- ref: "kafkaWriterProps"
# Threat Intel
- id: "stellarThreatIntelAdapter"
className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
configMethods:
- name: "ofType"
args:
- "THREAT_INTEL"
- id: "stellarThreatIntelEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "stellar"
- ref: "stellarThreatIntelAdapter"
- id: "simpleHBaseThreatIntelConfig"
className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
configMethods:
- name: "withProviderImpl"
args:
- "${hbase.provider.impl}"
- name: "withTrackerHBaseTable"
args:
- "${threat.intel.tracker.table}"
- name: "withTrackerHBaseCF"
args:
- "${threat.intel.tracker.cf}"
- name: "withHBaseTable"
args:
- "${threat.intel.simple.hbase.table}"
- name: "withHBaseCF"
args:
- "${threat.intel.simple.hbase.cf}"
- id: "simpleHBaseThreatIntelAdapter"
className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
configMethods:
- name: "withConfig"
args:
- ref: "simpleHBaseThreatIntelConfig"
- id: "simpleHBaseThreatIntelEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "hbaseThreatIntel"
- ref: "simpleHBaseThreatIntelAdapter"
- id: "threatIntels"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- ref: "simpleHBaseThreatIntelEnrichment"
- name: "add"
args:
- ref: "stellarThreatIntelEnrichment"
#threatintel error
- id: "threatIntelErrorKafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args:
- "${threat.intel.error.topic}"
- name: "withZkQuorum"
args:
- "${kafka.zk}"
- name: "withProducerConfigs"
args:
- ref: "kafkaWriterProps"
#indexing
- id: "kafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args:
- "${enrichment.output.topic}"
- name: "withZkQuorum"
args:
- "${kafka.zk}"
- name: "withProducerConfigs"
args:
- ref: "kafkaWriterProps"
#kafka/zookeeper
# Any kafka props for the consumer go here.
- id: "kafkaProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "value.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "key.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "group.id"
- "enrichments"
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
# The fields to pull out of the kafka messages
- id: "fields"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- "value"
- id: "kafkaConfig"
className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
constructorArgs:
- ref: "kafkaProps"
# topic name
- "${enrichment.input.topic}"
- "${kafka.zk}"
- ref: "fields"
configMethods:
- name: "setFirstPollOffsetStrategy"
args:
- "${kafka.start}"
spouts:
- id: "kafkaSpout"
className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
parallelism: ${kafka.spout.parallelism}
bolts:
# Enrichment Bolts
- id: "enrichmentSplitBolt"
className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichments"
args:
- ref: "enrichments"
parallelism: ${enrichment.split.parallelism}
- id: "geoEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "geoEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "stellarEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "stellarEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
parallelism: ${enrichment.stellar.parallelism}
- id: "hostEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "hostEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "simpleHBaseEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "simpleHBaseEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "enrichmentJoinBolt"
className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withMaxCacheSize"
args: [${enrichment.join.cache.size}]
- name: "withMaxTimeRetain"
args: [10]
parallelism: ${enrichment.join.parallelism}
- id: "enrichmentErrorOutputBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "ENRICHMENT"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "enrichmentErrorKafkaWriter"
# Threat Intel Bolts
- id: "threatIntelSplitBolt"
className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichments"
args:
- ref: "threatIntels"
- name: "withMessageFieldName"
args: ["message"]
parallelism: ${threat.intel.split.parallelism}
- id: "simpleHBaseThreatIntelBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "simpleHBaseThreatIntelEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "stellarThreatIntelBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "stellarThreatIntelEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
parallelism: ${threat.intel.stellar.parallelism}
- id: "threatIntelJoinBolt"
className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withMaxCacheSize"
args: [${threat.intel.join.cache.size}]
- name: "withMaxTimeRetain"
args: [10]
parallelism: ${threat.intel.join.parallelism}
- id: "threatIntelErrorOutputBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "ENRICHMENT"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "threatIntelErrorKafkaWriter"
# Indexing Bolts
- id: "outputBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "ENRICHMENT"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "kafkaWriter"
parallelism: ${kafka.writer.parallelism}
streams:
#parser
- name: "spout -> enrichmentSplit"
from: "kafkaSpout"
to: "enrichmentSplitBolt"
grouping:
type: LOCAL_OR_SHUFFLE
#enrichment
- name: "enrichmentSplit -> host"
from: "enrichmentSplitBolt"
to: "hostEnrichmentBolt"
grouping:
streamId: "host"
type: FIELDS
args: ["message"]
- name: "enrichmentSplit -> geo"
from: "enrichmentSplitBolt"
to: "geoEnrichmentBolt"
grouping:
streamId: "geo"
type: FIELDS
args: ["message"]
- name: "enrichmentSplit -> stellar"
from: "enrichmentSplitBolt"
to: "stellarEnrichmentBolt"
grouping:
streamId: "stellar"
type: FIELDS
args: ["message"]
- name: "enrichmentSplit -> simpleHBaseEnrichmentBolt"
from: "enrichmentSplitBolt"
to: "simpleHBaseEnrichmentBolt"
grouping:
streamId: "hbaseEnrichment"
type: FIELDS
args: ["message"]
- name: "splitter -> join"
from: "enrichmentSplitBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
- name: "geo -> join"
from: "geoEnrichmentBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "geo"
type: FIELDS
args: ["key"]
- name: "stellar -> join"
from: "stellarEnrichmentBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "stellar"
type: FIELDS
args: ["key"]
- name: "simpleHBaseEnrichmentBolt -> join"
from: "simpleHBaseEnrichmentBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "hbaseEnrichment"
type: FIELDS
args: ["key"]
- name: "host -> join"
from: "hostEnrichmentBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "host"
type: FIELDS
args: ["key"]
# Error output
- name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt"
from: "geoEnrichmentBolt"
to: "enrichmentErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE
- name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt"
from: "stellarEnrichmentBolt"
to: "enrichmentErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE
- name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt"
from: "hostEnrichmentBolt"
to: "enrichmentErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE
- name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt"
from: "simpleHBaseEnrichmentBolt"
to: "enrichmentErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE
#threat intel
- name: "enrichmentJoin -> threatSplit"
from: "enrichmentJoinBolt"
to: "threatIntelSplitBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
- name: "threatSplit -> simpleHBaseThreatIntel"
from: "threatIntelSplitBolt"
to: "simpleHBaseThreatIntelBolt"
grouping:
streamId: "hbaseThreatIntel"
type: FIELDS
args: ["message"]
- name: "threatSplit -> stellarThreatIntel"
from: "threatIntelSplitBolt"
to: "stellarThreatIntelBolt"
grouping:
streamId: "stellar"
type: FIELDS
args: ["message"]
- name: "simpleHBaseThreatIntel -> join"
from: "simpleHBaseThreatIntelBolt"
to: "threatIntelJoinBolt"
grouping:
streamId: "hbaseThreatIntel"
type: FIELDS
args: ["key"]
- name: "stellarThreatIntel -> join"
from: "stellarThreatIntelBolt"
to: "threatIntelJoinBolt"
grouping:
streamId: "stellar"
type: FIELDS
args: ["key"]
- name: "threatIntelSplit -> threatIntelJoin"
from: "threatIntelSplitBolt"
to: "threatIntelJoinBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
#output
- name: "threatIntelJoin -> output"
from: "threatIntelJoinBolt"
to: "outputBolt"
grouping:
streamId: "message"
type: LOCAL_OR_SHUFFLE
# Error output
- name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt"
from: "simpleHBaseThreatIntelBolt"
to: "threatIntelErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE
- name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt"
from: "stellarThreatIntelBolt"
to: "threatIntelErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE