| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| name: "enrichment" |
| config: |
| topology.workers: ${enrichment.workers} |
| topology.acker.executors: ${enrichment.acker.executors} |
| topology.worker.childopts: ${topology.worker.childopts} |
| topology.auto-credentials: ${topology.auto-credentials} |
| topology.max.spout.pending: ${topology.max.spout.pending} |
| |
| components: |
| |
| # Enrichment |
| - id: "stellarEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" |
| configMethods: |
| - name: "ofType" |
| args: |
| - "ENRICHMENT" |
| |
| # Any kafka props for the producer go here. |
| - id: "kafkaWriterProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| - id: "stellarEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "stellar" |
| - ref: "stellarEnrichmentAdapter" |
| |
| - id: "geoEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter" |
| - id: "geoEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "geo" |
| - ref: "geoEnrichmentAdapter" |
| - id: "hostEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter" |
| constructorArgs: |
| - '${enrichment.host.known_hosts}' |
| - id: "hostEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "host" |
| - ref: "hostEnrichmentAdapter" |
| |
| - id: "simpleHBaseEnrichmentConfig" |
| className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig" |
| configMethods: |
| - name: "withProviderImpl" |
| args: |
| - "${hbase.provider.impl}" |
| - name: "withHBaseTable" |
| args: |
| - "${enrichment.simple.hbase.table}" |
| - name: "withHBaseCF" |
| args: |
| - "${enrichment.simple.hbase.cf}" |
| - id: "simpleHBaseEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter" |
| configMethods: |
| - name: "withConfig" |
| args: |
| - ref: "simpleHBaseEnrichmentConfig" |
| - id: "simpleHBaseEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "hbaseEnrichment" |
| - ref: "simpleHBaseEnrichmentAdapter" |
| - id: "enrichments" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - ref: "geoEnrichment" |
| - name: "add" |
| args: |
| - ref: "hostEnrichment" |
| - name: "add" |
| args: |
| - ref: "simpleHBaseEnrichment" |
| - name: "add" |
| args: |
| - ref: "stellarEnrichment" |
| |
| #enrichment error |
| - id: "enrichmentErrorKafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: |
| - "${enrichment.error.topic}" |
| - name: "withZkQuorum" |
| args: |
| - "${kafka.zk}" |
| - name: "withProducerConfigs" |
| args: |
| - ref: "kafkaWriterProps" |
| |
| # Threat Intel |
| - id: "stellarThreatIntelAdapter" |
| className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" |
| configMethods: |
| - name: "ofType" |
| args: |
| - "THREAT_INTEL" |
| - id: "stellarThreatIntelEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "stellar" |
| - ref: "stellarThreatIntelAdapter" |
| - id: "simpleHBaseThreatIntelConfig" |
| className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig" |
| configMethods: |
| - name: "withProviderImpl" |
| args: |
| - "${hbase.provider.impl}" |
| - name: "withTrackerHBaseTable" |
| args: |
| - "${threat.intel.tracker.table}" |
| - name: "withTrackerHBaseCF" |
| args: |
| - "${threat.intel.tracker.cf}" |
| - name: "withHBaseTable" |
| args: |
| - "${threat.intel.simple.hbase.table}" |
| - name: "withHBaseCF" |
| args: |
| - "${threat.intel.simple.hbase.cf}" |
| - id: "simpleHBaseThreatIntelAdapter" |
| className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter" |
| configMethods: |
| - name: "withConfig" |
| args: |
| - ref: "simpleHBaseThreatIntelConfig" |
| - id: "simpleHBaseThreatIntelEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "hbaseThreatIntel" |
| - ref: "simpleHBaseThreatIntelAdapter" |
| |
| - id: "threatIntels" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - ref: "simpleHBaseThreatIntelEnrichment" |
| - name: "add" |
| args: |
| - ref: "stellarThreatIntelEnrichment" |
| |
| #threatintel error |
| - id: "threatIntelErrorKafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: |
| - "${threat.intel.error.topic}" |
| - name: "withZkQuorum" |
| args: |
| - "${kafka.zk}" |
| - name: "withProducerConfigs" |
| args: |
| - ref: "kafkaWriterProps" |
| #indexing |
| - id: "kafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: |
| - "${enrichment.output.topic}" |
| - name: "withZkQuorum" |
| args: |
| - "${kafka.zk}" |
| - name: "withProducerConfigs" |
| args: |
| - ref: "kafkaWriterProps" |
| |
| #kafka/zookeeper |
| # Any kafka props for the consumer go here. |
| - id: "kafkaProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "value.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "key.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "group.id" |
| - "enrichments" |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| |
| # The fields to pull out of the kafka messages |
| - id: "fields" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - "value" |
| |
| - id: "kafkaConfig" |
| className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" |
| constructorArgs: |
| - ref: "kafkaProps" |
| # topic name |
| - "${enrichment.input.topic}" |
| - "${kafka.zk}" |
| - ref: "fields" |
| configMethods: |
| - name: "setFirstPollOffsetStrategy" |
| args: |
| - "${kafka.start}" |
| |
| |
| spouts: |
| - id: "kafkaSpout" |
| className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" |
| constructorArgs: |
| - ref: "kafkaConfig" |
| parallelism: ${kafka.spout.parallelism} |
| |
| bolts: |
| # Enrichment Bolts |
| - id: "enrichmentSplitBolt" |
| className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichments" |
| args: |
| - ref: "enrichments" |
| parallelism: ${enrichment.split.parallelism} |
| |
| - id: "geoEnrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "geoEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| |
| - id: "stellarEnrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "stellarEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| parallelism: ${enrichment.stellar.parallelism} |
| |
| - id: "hostEnrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "hostEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| |
| - id: "simpleHBaseEnrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "simpleHBaseEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| |
| - id: "enrichmentJoinBolt" |
| className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withMaxCacheSize" |
| args: [${enrichment.join.cache.size}] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| parallelism: ${enrichment.join.parallelism} |
| |
| - id: "enrichmentErrorOutputBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "ENRICHMENT" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "enrichmentErrorKafkaWriter" |
| |
| |
| # Threat Intel Bolts |
| - id: "threatIntelSplitBolt" |
| className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichments" |
| args: |
| - ref: "threatIntels" |
| - name: "withMessageFieldName" |
| args: ["message"] |
| parallelism: ${threat.intel.split.parallelism} |
| |
| - id: "simpleHBaseThreatIntelBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "simpleHBaseThreatIntelEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| - id: "stellarThreatIntelBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "stellarThreatIntelEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| parallelism: ${threat.intel.stellar.parallelism} |
| |
| - id: "threatIntelJoinBolt" |
| className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withMaxCacheSize" |
| args: [${threat.intel.join.cache.size}] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| parallelism: ${threat.intel.join.parallelism} |
| |
| - id: "threatIntelErrorOutputBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "ENRICHMENT" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "threatIntelErrorKafkaWriter" |
| |
| # Indexing Bolts |
| - id: "outputBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "ENRICHMENT" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "kafkaWriter" |
| parallelism: ${kafka.writer.parallelism} |
| |
| |
| streams: |
| #parser |
| - name: "spout -> enrichmentSplit" |
| from: "kafkaSpout" |
| to: "enrichmentSplitBolt" |
| grouping: |
| type: LOCAL_OR_SHUFFLE |
| |
| #enrichment |
| - name: "enrichmentSplit -> host" |
| from: "enrichmentSplitBolt" |
| to: "hostEnrichmentBolt" |
| grouping: |
| streamId: "host" |
| type: FIELDS |
| args: ["message"] |
| |
| - name: "enrichmentSplit -> geo" |
| from: "enrichmentSplitBolt" |
| to: "geoEnrichmentBolt" |
| grouping: |
| streamId: "geo" |
| type: FIELDS |
| args: ["message"] |
| |
| - name: "enrichmentSplit -> stellar" |
| from: "enrichmentSplitBolt" |
| to: "stellarEnrichmentBolt" |
| grouping: |
| streamId: "stellar" |
| type: FIELDS |
| args: ["message"] |
| |
| |
| - name: "enrichmentSplit -> simpleHBaseEnrichmentBolt" |
| from: "enrichmentSplitBolt" |
| to: "simpleHBaseEnrichmentBolt" |
| grouping: |
| streamId: "hbaseEnrichment" |
| type: FIELDS |
| args: ["message"] |
| |
| - name: "splitter -> join" |
| from: "enrichmentSplitBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "message" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "geo -> join" |
| from: "geoEnrichmentBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "geo" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "stellar -> join" |
| from: "stellarEnrichmentBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "stellar" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "simpleHBaseEnrichmentBolt -> join" |
| from: "simpleHBaseEnrichmentBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "hbaseEnrichment" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "host -> join" |
| from: "hostEnrichmentBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "host" |
| type: FIELDS |
| args: ["key"] |
| |
| # Error output |
| - name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt" |
| from: "geoEnrichmentBolt" |
| to: "enrichmentErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |
| |
| - name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt" |
| from: "stellarEnrichmentBolt" |
| to: "enrichmentErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |
| |
| - name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt" |
| from: "hostEnrichmentBolt" |
| to: "enrichmentErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |
| |
| - name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt" |
| from: "simpleHBaseEnrichmentBolt" |
| to: "enrichmentErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |
| |
| #threat intel |
| - name: "enrichmentJoin -> threatSplit" |
| from: "enrichmentJoinBolt" |
| to: "threatIntelSplitBolt" |
| grouping: |
| streamId: "message" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "threatSplit -> simpleHBaseThreatIntel" |
| from: "threatIntelSplitBolt" |
| to: "simpleHBaseThreatIntelBolt" |
| grouping: |
| streamId: "hbaseThreatIntel" |
| type: FIELDS |
| args: ["message"] |
| |
| - name: "threatSplit -> stellarThreatIntel" |
| from: "threatIntelSplitBolt" |
| to: "stellarThreatIntelBolt" |
| grouping: |
| streamId: "stellar" |
| type: FIELDS |
| args: ["message"] |
| |
| |
| - name: "simpleHBaseThreatIntel -> join" |
| from: "simpleHBaseThreatIntelBolt" |
| to: "threatIntelJoinBolt" |
| grouping: |
| streamId: "hbaseThreatIntel" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "stellarThreatIntel -> join" |
| from: "stellarThreatIntelBolt" |
| to: "threatIntelJoinBolt" |
| grouping: |
| streamId: "stellar" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "threatIntelSplit -> threatIntelJoin" |
| from: "threatIntelSplitBolt" |
| to: "threatIntelJoinBolt" |
| grouping: |
| streamId: "message" |
| type: FIELDS |
| args: ["key"] |
| #output |
| - name: "threatIntelJoin -> output" |
| from: "threatIntelJoinBolt" |
| to: "outputBolt" |
| grouping: |
| streamId: "message" |
| type: LOCAL_OR_SHUFFLE |
| |
| # Error output |
| - name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt" |
| from: "simpleHBaseThreatIntelBolt" |
| to: "threatIntelErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |
| |
| - name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt" |
| from: "stellarThreatIntelBolt" |
| to: "threatIntelErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |
| |