blob: ef2143d8cb29f2139cf97a493792a0b2a2d3177b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: "enrichment"
config:
topology.workers: 1
topology.acker.executors: 0
components:
# Enrichment
- id: "jdbcConfig"
className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
properties:
- name: "host"
value: "${mysql.ip}"
- name: "port"
value: ${mysql.port}
- name: "username"
value: "${mysql.username}"
- name: "password"
value: "${mysql.password}"
- name: "table"
value: "GEO"
- id: "geoEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
configMethods:
- name: "withJdbcConfig"
args:
- ref: "jdbcConfig"
- id: "geoEnrichment"
className: "org.apache.metron.domain.Enrichment"
constructorArgs:
- "geo"
- ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${org.apache.metron.enrichment.host.known_hosts}'
- id: "hostEnrichment"
className: "org.apache.metron.domain.Enrichment"
constructorArgs:
- "host"
- ref: "hostEnrichmentAdapter"
- id: "simpleHBaseEnrichmentConfig"
className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
configMethods:
- name: "withProviderImpl"
args:
- "${hbase.provider.impl}"
- name: "withHBaseTable"
args:
- "${enrichment.simple.hbase.table}"
- name: "withHBaseCF"
args:
- "${enrichment.simple.hbase.cf}"
- id: "simpleHBaseEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
configMethods:
- name: "withConfig"
args:
- ref: "simpleHBaseEnrichmentConfig"
- id: "simpleHBaseEnrichment"
className: "org.apache.metron.domain.Enrichment"
constructorArgs:
- "hbaseEnrichment"
- ref: "simpleHBaseEnrichmentAdapter"
- id: "enrichments"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- ref: "geoEnrichment"
- name: "add"
args:
- ref: "hostEnrichment"
- name: "add"
args:
- ref: "simpleHBaseEnrichment"
# Threat Intel
- id: "simpleHBaseThreatIntelConfig"
className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
configMethods:
- name: "withProviderImpl"
args:
- "${hbase.provider.impl}"
- name: "withTrackerHBaseTable"
args:
- "${threat.intel.tracker.table}"
- name: "withTrackerHBaseCF"
args:
- "${threat.intel.tracker.cf}"
- name: "withHBaseTable"
args:
- "${threat.intel.simple.hbase.table}"
- name: "withHBaseCF"
args:
- "${threat.intel.simple.hbase.cf}"
- id: "simpleHBaseThreatIntelAdapter"
className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
configMethods:
- name: "withConfig"
args:
- ref: "simpleHBaseThreatIntelConfig"
- id: "simpleHBaseThreatIntelEnrichment"
className: "org.apache.metron.domain.Enrichment"
constructorArgs:
- "hbaseThreatIntel"
- ref: "simpleHBaseThreatIntelAdapter"
- id: "threatIntels"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- ref: "simpleHBaseThreatIntelEnrichment"
- id: "fileNameFormat"
className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
configMethods:
- name: "withPrefix"
args:
- "enrichment-"
- name: "withExtension"
args:
- ".json"
- name: "withPath"
args:
- "${index.hdfs.output}"
- id: "hdfsRotationPolicy"
className: "${bolt.hdfs.rotation.policy}"
constructorArgs:
- ${bolt.hdfs.rotation.policy.count}
- "${bolt.hdfs.rotation.policy.units}"
#indexing
- id: "hdfsWriter"
className: "org.apache.metron.writer.hdfs.HdfsWriter"
configMethods:
- name: "withFileNameFormat"
args:
- ref: "fileNameFormat"
- name: "withRotationPolicy"
args:
- ref: "hdfsRotationPolicy"
- id: "indexWriter"
className: "${writer.class.name}"
#kafka/zookeeper
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
- "${kafka.zk}"
- id: "kafkaConfig"
className: "storm.kafka.SpoutConfig"
constructorArgs:
# zookeeper hosts
- ref: "zkHosts"
# topic name
- "enrichments"
# zk root
- ""
# id
- "enrichments"
properties:
- name: "ignoreZkOffsets"
value: true
- name: "startOffsetTime"
value: -1
spouts:
- id: "kafkaSpout"
className: "storm.kafka.KafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
bolts:
# Enrichment Bolts
- id: "enrichmentSplitBolt"
className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichments"
args:
- ref: "enrichments"
- id: "geoEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "geoEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "hostEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "hostEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "simpleHBaseEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "simpleHBaseEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "enrichmentJoinBolt"
className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
# Threat Intel Bolts
- id: "threatIntelSplitBolt"
className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichments"
args:
- ref: "threatIntels"
- name: "withMessageFieldName"
args: ["message"]
- id: "simpleHBaseThreatIntelBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withEnrichment"
args:
- ref: "simpleHBaseThreatIntelEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "threatIntelJoinBolt"
className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
# Indexing Bolts
- id: "indexingBolt"
className: "org.apache.metron.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "indexWriter"
- id: "hdfsIndexingBolt"
className: "org.apache.metron.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "hdfsWriter"
streams:
#parser
- name: "spout -> enrichmentSplit"
from: "kafkaSpout"
to: "enrichmentSplitBolt"
grouping:
type: SHUFFLE
#enrichment
- name: "enrichmentSplit -> host"
from: "enrichmentSplitBolt"
to: "hostEnrichmentBolt"
grouping:
streamId: "host"
type: FIELDS
args: ["key"]
- name: "enrichmentSplit -> geo"
from: "enrichmentSplitBolt"
to: "geoEnrichmentBolt"
grouping:
streamId: "geo"
type: FIELDS
args: ["key"]
- name: "enrichmentSplit -> simpleHBaseEnrichmentBolt"
from: "enrichmentSplitBolt"
to: "simpleHBaseEnrichmentBolt"
grouping:
streamId: "hbaseEnrichment"
type: FIELDS
args: ["key"]
- name: "splitter -> join"
from: "enrichmentSplitBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
- name: "geo -> join"
from: "geoEnrichmentBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "geo"
type: FIELDS
args: ["key"]
- name: "simpleHBaseEnrichmentBolt -> join"
from: "simpleHBaseEnrichmentBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "hbaseEnrichment"
type: FIELDS
args: ["key"]
- name: "host -> join"
from: "hostEnrichmentBolt"
to: "enrichmentJoinBolt"
grouping:
streamId: "host"
type: FIELDS
args: ["key"]
#threat intel
- name: "enrichmentJoin -> threatSplit"
from: "enrichmentJoinBolt"
to: "threatIntelSplitBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
- name: "threatSplit -> simpleHBaseThreatIntel"
from: "threatIntelSplitBolt"
to: "simpleHBaseThreatIntelBolt"
grouping:
streamId: "hbaseThreatIntel"
type: FIELDS
args: ["key"]
- name: "simpleHBaseThreatIntel -> join"
from: "simpleHBaseThreatIntelBolt"
to: "threatIntelJoinBolt"
grouping:
streamId: "hbaseThreatIntel"
type: FIELDS
args: ["key"]
- name: "threatIntelSplit -> threatIntelJoin"
from: "threatIntelSplitBolt"
to: "threatIntelJoinBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
#indexing
- name: "threatIntelJoin -> indexing"
from: "threatIntelJoinBolt"
to: "indexingBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
- name: "threatIntelJoin -> hdfs"
from: "threatIntelJoinBolt"
to: "hdfsIndexingBolt"
grouping:
streamId: "message"
type: SHUFFLE
- name: "indexingBolt -> errorIndexingBolt"
from: "indexingBolt"
to: "indexingBolt"
grouping:
streamId: "error"
type: SHUFFLE