blob: 45b05cfdc6bf13162846f0c3c6c08a6d00fa7264 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This is a drop-in replacement for the existing split/join enrichment topology.
# Instead of a fan-out/fan-in architecture, this adopts a data-parallelism strategy
# whereby a message is fully enriched inside of a UnifiedEnrichmentBolt. This simplifies
# the architecture greatly and cuts down on network hops. It has unknown performance
# characteristics, so caveat emptor.
name: "enrichment"
config:
topology.workers: ${enrichment.workers}
topology.acker.executors: ${enrichment.acker.executors}
topology.worker.childopts: ${topology.worker.childopts}
topology.auto-credentials: ${topology.auto-credentials}
topology.max.spout.pending: ${topology.max.spout.pending}
metron.threadpool.size: ${enrichment.threadpool.size} # Either a number (e.g. 5) or multiple of cores (e.g. 5C = 5 times the number of cores)
metron.threadpool.type: ${enrichment.threadpool.type} # FIXED or WORK_STEALING
components:
# enrichment
- id: "stellarEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
configMethods:
- name: "ofType"
args:
- "ENRICHMENT"
# any kafka props for the producer go here.
- id: "kafkaWriterProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
- id: "stellarEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "stellar"
- ref: "stellarEnrichmentAdapter"
- id: "geoEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
- id: "geoEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "geo"
- ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${enrichment.host.known_hosts}'
- id: "hostEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "host"
- ref: "hostEnrichmentAdapter"
- id: "simpleHBaseEnrichmentConfig"
className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
configMethods:
- name: "withProviderImpl"
args:
- "${hbase.provider.impl}"
- name: "withHBaseTable"
args:
- "${enrichment.simple.hbase.table}"
- name: "withHBaseCF"
args:
- "${enrichment.simple.hbase.cf}"
- id: "simpleHBaseEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
configMethods:
- name: "withConfig"
args:
- ref: "simpleHBaseEnrichmentConfig"
- id: "simpleHBaseEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "hbaseEnrichment"
- ref: "simpleHBaseEnrichmentAdapter"
- id: "enrichments"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- ref: "geoEnrichment"
- name: "add"
args:
- ref: "hostEnrichment"
- name: "add"
args:
- ref: "simpleHBaseEnrichment"
- name: "add"
args:
- ref: "stellarEnrichment"
# enrichment error
- id: "enrichmentErrorKafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args:
- "${enrichment.error.topic}"
- name: "withZkQuorum"
args:
- "${kafka.zk}"
- name: "withProducerConfigs"
args:
- ref: "kafkaWriterProps"
# threat Intel
- id: "stellarThreatIntelAdapter"
className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
configMethods:
- name: "ofType"
args:
- "THREAT_INTEL"
- id: "stellarThreatIntelEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "stellar"
- ref: "stellarThreatIntelAdapter"
- id: "simpleHBaseThreatIntelConfig"
className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
configMethods:
- name: "withProviderImpl"
args:
- "${hbase.provider.impl}"
- name: "withTrackerHBaseTable"
args:
- "${threat.intel.tracker.table}"
- name: "withTrackerHBaseCF"
args:
- "${threat.intel.tracker.cf}"
- name: "withHBaseTable"
args:
- "${threat.intel.simple.hbase.table}"
- name: "withHBaseCF"
args:
- "${threat.intel.simple.hbase.cf}"
- id: "simpleHBaseThreatIntelAdapter"
className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
configMethods:
- name: "withConfig"
args:
- ref: "simpleHBaseThreatIntelConfig"
- id: "simpleHBaseThreatIntelEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
- "hbaseThreatIntel"
- ref: "simpleHBaseThreatIntelAdapter"
- id: "threatIntels"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- ref: "simpleHBaseThreatIntelEnrichment"
- name: "add"
args:
- ref: "stellarThreatIntelEnrichment"
# threatintel error
- id: "threatIntelErrorKafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args:
- "${threat.intel.error.topic}"
- name: "withZkQuorum"
args:
- "${kafka.zk}"
- name: "withProducerConfigs"
args:
- ref: "kafkaWriterProps"
# indexing
- id: "kafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args:
- "${enrichment.output.topic}"
- name: "withZkQuorum"
args:
- "${kafka.zk}"
- name: "withProducerConfigs"
args:
- ref: "kafkaWriterProps"
# kafka/zookeeper
# any kafka props for the consumer go here.
- id: "kafkaProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "value.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "key.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "group.id"
- "enrichments"
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
# the fields to pull out of the kafka messages
- id: "fields"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- "value"
- id: "kafkaConfig"
className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
constructorArgs:
- ref: "kafkaProps"
# topic name
- "${enrichment.input.topic}"
- "${kafka.zk}"
- ref: "fields"
configMethods:
- name: "setFirstPollOffsetStrategy"
args:
- "${kafka.start}"
spouts:
- id: "kafkaSpout"
className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
parallelism: ${kafka.spout.parallelism}
bolts:
# enrichment bolt
- id: "enrichmentBolt"
className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichments"
args:
- ref: "enrichments"
- name: "withMaxCacheSize"
args: [${enrichment.cache.size}]
- name: "withMaxTimeRetain"
args: [10]
- name: "withCaptureCacheStats"
args: [true]
- name: "withStrategy"
args:
- "ENRICHMENT"
- name: "withMessageGetter"
args: ["JSON_FROM_POSITION"]
parallelism: ${enrichment.parallelism}
- id: "enrichmentErrorOutputBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "ENRICHMENT"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "enrichmentErrorKafkaWriter"
# threat intel bolts
- id: "threatIntelBolt"
className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichments"
args:
- ref: "threatIntels"
- name: "withMaxCacheSize"
args: [${threat.intel.cache.size}]
- name: "withMaxTimeRetain"
args: [10]
- name: "withCaptureCacheStats"
args: [true]
- name: "withStrategy"
args:
- "THREAT_INTEL"
- name: "withMessageFieldName"
args: ["message"]
- name: "withMessageGetter"
args: ["JSON_FROM_FIELD_BY_REFERENCE"]
parallelism: ${threat.intel.parallelism}
- id: "threatIntelErrorOutputBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "ENRICHMENT"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "threatIntelErrorKafkaWriter"
# output bolt
- id: "outputBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "ENRICHMENT"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "kafkaWriter"
parallelism: ${kafka.writer.parallelism}
streams:
# parser
- name: "spout -> enrichmentBolt"
from: "kafkaSpout"
to: "enrichmentBolt"
grouping:
type: LOCAL_OR_SHUFFLE
# error output
- name: "enrichmentBolt -> enrichmentErrorOutputBolt"
from: "enrichmentBolt"
to: "enrichmentErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE
# threat intel
- name: "enrichmentBolt -> threatIntelBolt"
from: "enrichmentBolt"
to: "threatIntelBolt"
grouping:
streamId: "message"
type: LOCAL_OR_SHUFFLE
# output
- name: "threatIntelBolt -> output"
from: "threatIntelBolt"
to: "outputBolt"
grouping:
streamId: "message"
type: LOCAL_OR_SHUFFLE
# error output
- name: "threatIntelBolt -> threatIntelErrorOutputBolt"
from: "threatIntelBolt"
to: "threatIntelErrorOutputBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE