blob: e1708957bcaf2aed9b5cd0ae89b31d1b92994e50 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: "pcap"
config:
topology.workers: 1
components:
- id: "parser"
className: "org.apache.metron.parsing.parsers.PcapParser"
configMethods:
- name: "withTsPrecision"
args: ["MICRO"]
# Threat Intel
- id: "ipThreatIntelConfig"
className: "org.apache.metron.threatintel.ThreatIntelConfig"
configMethods:
- name: "withTrackerHBaseTable"
args:
- "${threat.intel.tracker.table}"
- name: "withTrackerHBaseCF"
args:
- "${threat.intel.tracker.cf}"
- name: "withHBaseTable"
args:
- "${threat.intel.ip.table}"
- name: "withHBaseCF"
args:
- "${threat.intel.ip.cf}"
- id: "ipThreatIntelAdapter"
className: "org.apache.metron.threatintel.ThreatIntelAdapter"
configMethods:
- name: "withConfig"
args:
- ref: "ipThreatIntelConfig"
- id: "ipThreatIntelEnrichment"
className: "org.apache.metron.domain.Enrichment"
properties:
- name: "name"
value: "ip"
- name: "fields"
value: ["message/ip_src_addr", "message/ip_dst_addr"]
- name: "adapter"
ref: "ipThreatIntelAdapter"
- id: "threatIntels"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- ref: "ipThreatIntelEnrichment"
#Enrichment
# - id: "jdbcConfig"
# className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
# properties:
# - name: "host"
# value: "${mysql.ip}"
# - name: "port"
# value: ${mysql.port}
# - name: "username"
# value: "${mysql.username}"
# - name: "password"
# value: "${mysql.password}"
# - name: "table"
# value: "GEO"
# - id: "geoEnrichmentAdapter"
# className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
# configMethods:
# - name: "withJdbcConfig"
# args:
# - ref: "jdbcConfig"
# - id: "geoEnrichment"
# className: "org.apache.metron.domain.Enrichment"
# properties:
# - name: "name"
# value: "geo"
# - name: "fields"
# value: ["ip_src_addr", "ip_dst_addr"]
# - name: "adapter"
# ref: "geoEnrichmentAdapter"
- id: "hostEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
constructorArgs:
- '${org.apache.metron.enrichment.host.known_hosts}'
- id: "hostEnrichment"
className: "org.apache.metron.domain.Enrichment"
properties:
- name: "name"
value: "host"
- name: "fields"
value: ["ip_src_addr", "ip_dst_addr"]
- name: "adapter"
ref: "hostEnrichmentAdapter"
- id: "enrichments"
className: "java.util.ArrayList"
configMethods:
# - name: "add"
# args:
# - ref: "geoEnrichment"
- name: "add"
args:
- ref: "hostEnrichment"
- id: "indexAdapter"
className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- id: "metricConfig"
className: "org.apache.commons.configuration.BaseConfiguration"
configMethods:
- name: "setProperty"
args:
- "org.apache.metron.metrics.reporter.graphite"
- "${org.apache.metron.metrics.reporter.graphite}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.reporter.console"
- "${org.apache.metron.metrics.reporter.console}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.reporter.jmx"
- "${org.apache.metron.metrics.reporter.jmx}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.graphite.address"
- "${org.apache.metron.metrics.graphite.address}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.graphite.port"
- "${org.apache.metron.metrics.graphite.port}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.TelemetryParserBolt.acks"
- "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.TelemetryParserBolt.emits"
- "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.TelemetryParserBolt.fails"
- "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
- "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
- "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
- "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
- "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
- "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
- name: "setProperty"
args:
- "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
- "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
- "${kafka.zk}"
- id: "kafkaConfig"
className: "storm.kafka.SpoutConfig"
constructorArgs:
# zookeeper hosts
- ref: "zkHosts"
# topic name
- "${spout.kafka.topic.pcap}"
# zk root
- ""
# id
- "${spout.kafka.topic.pcap}"
properties:
- name: "forceFromStart"
value: true
- name: "startOffsetTime"
value: -1
- id: "hbaseConfig"
className: "org.apache.metron.hbase.TupleTableConfig"
configMethods:
- name: "withFields"
args:
- "${bolt.hbase.table.fields}"
- name: "withTable"
args:
- "${bolt.hbase.table.name}"
- name: "withRowKeyField"
args:
- "${bolt.hbase.table.key.tuple.field.name}"
- name: "withTimestampField"
args:
- "${bolt.hbase.table.timestamp.tuple.field.name}"
- name: "withBatch"
args:
- ${bolt.hbase.enable.batching}
spouts:
- id: "kafkaSpout"
className: "storm.kafka.KafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
bolts:
- id: "hbaseBolt"
className: "org.apache.metron.hbase.HBaseBolt"
constructorArgs:
- ref: "hbaseConfig"
- "${kafka.zk}"
- id: "parserBolt"
className: "org.apache.metron.bolt.PcapParserBolt"
configMethods:
- name: "withMessageParser"
args:
- ref: "parser"
- name: "withEnrichments"
args:
- ref: "enrichments"
- id: "indexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
- name: "withIndexIP"
args:
- "${es.ip}"
- name: "withIndexPort"
args:
- ${es.port}
- name: "withClusterName"
args:
- "${es.clustername}"
- name: "withIndexName"
args:
- "pcap_index"
- name: "withIndexTimestamp"
args:
- "yyyy.MM.dd.hh"
- name: "withDocumentName"
args:
- "pcap_doc"
- name: "withBulk"
args:
- 1
- name: "withIndexAdapter"
args:
- ref: "indexAdapter"
- name: "withMetricConfiguration"
args:
- ref: "metricConfig"
- id: "errorIndexingBolt"
className: "org.apache.metron.indexing.TelemetryIndexingBolt"
configMethods:
- name: "withIndexIP"
args:
- "${es.ip}"
- name: "withIndexPort"
args:
- ${es.port}
- name: "withClusterName"
args:
- "${es.clustername}"
- name: "withIndexName"
args:
- "error"
- name: "withIndexTimestamp"
args:
- "yyyy.MM"
- name: "withDocumentName"
args:
- "pcap_error"
- name: "withBulk"
args:
- 1
- name: "withIndexAdapter"
args:
- ref: "indexAdapter"
- name: "withMetricConfiguration"
args:
- ref: "metricConfig"
# Threat Intel Bolts
- id: "threatIntelSplitBolt"
className: "org.apache.metron.enrichment.EnrichmentSplitterBolt"
configMethods:
- name: "withEnrichments"
args:
- ref: "threatIntels"
- id: "ipThreatIntelBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
configMethods:
- name: "withEnrichment"
args:
- ref: "ipThreatIntelEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "threatIntelJoinBolt"
className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
configMethods:
- name: "withEnrichments"
args:
- ref: "threatIntels"
- name: "withType"
args:
- "alerts"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
# - id: "geoEnrichmentBolt"
# className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
# configMethods:
# - name: "withEnrichment"
# args:
# - ref: "geoEnrichment"
# - name: "withMaxCacheSize"
# args: [10000]
# - name: "withMaxTimeRetain"
# args: [10]
- id: "hostEnrichmentBolt"
className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
configMethods:
- name: "withEnrichment"
args:
- ref: "hostEnrichment"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
- id: "joinBolt"
className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
configMethods:
- name: "withEnrichments"
args:
- ref: "enrichments"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
args: [10]
streams:
- name: "spout -> parser"
from: "kafkaSpout"
to: "parserBolt"
grouping:
type: SHUFFLE
- name: "parser -> hbase"
from: "parserBolt"
to: "hbaseBolt"
grouping:
streamId: "raw"
type: FIELDS
args: ["key"]
- name: "parser -> host"
from: "parserBolt"
to: "hostEnrichmentBolt"
grouping:
streamId: "host"
type: FIELDS
args: ["key"]
# - name: "parser -> geo"
# from: "parserBolt"
# to: "geoEnrichmentBolt"
# grouping:
# streamId: "geo"
# type: FIELDS
# args: ["key"]
- name: "parser -> join"
from: "parserBolt"
to: "joinBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
# - name: "geo -> join"
# from: "geoEnrichmentBolt"
# to: "joinBolt"
# grouping:
# streamId: "geo"
# type: FIELDS
# args: ["key"]
- name: "host -> join"
from: "hostEnrichmentBolt"
to: "joinBolt"
grouping:
streamId: "host"
type: FIELDS
args: ["key"]
#threat intel
- name: "enrichmentJoin -> threatSplit"
from: "joinBolt"
to: "threatIntelSplitBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
- name: "threatSplit -> ip"
from: "threatIntelSplitBolt"
to: "ipThreatIntelBolt"
grouping:
streamId: "ip"
type: FIELDS
args: ["key"]
- name: "ip -> join"
from: "ipThreatIntelBolt"
to: "threatIntelJoinBolt"
grouping:
streamId: "ip"
type: FIELDS
args: ["key"]
- name: "threatIntelSplit -> threatIntelJoin"
from: "threatIntelSplitBolt"
to: "threatIntelJoinBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
#indexing
- name: "threatIntelJoin -> indexing"
from: "threatIntelJoinBolt"
to: "indexingBolt"
grouping:
streamId: "message"
type: FIELDS
args: ["key"]
#errors
- name: "parser -> errors"
from: "parserBolt"
to: "errorIndexingBolt"
grouping:
streamId: "error"
type: SHUFFLE
- name: "indexing -> errors"
from: "indexingBolt"
to: "errorIndexingBolt"
grouping:
streamId: "error"
type: SHUFFLE