| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # This is a drop-in replacement for the existing split/join enrichment topology. |
| # Instead of a fan-out/fan-in architecture, this adopts a data-parallelism strategy |
| # whereby a message is fully enriched inside of a UnifiedEnrichmentBolt. This simplifies |
| # the architecture greatly and cuts down on network hops. It has unknown performance |
| # characteristics, so caveat emptor. |
| |
| name: "enrichment" |
| config: |
| topology.workers: ${enrichment.workers} |
| topology.acker.executors: ${enrichment.acker.executors} |
| topology.worker.childopts: ${topology.worker.childopts} |
| topology.auto-credentials: ${topology.auto-credentials} |
| topology.max.spout.pending: ${topology.max.spout.pending} |
| metron.threadpool.size: ${enrichment.threadpool.size} # Either a number (e.g. 5) or multiple of cores (e.g. 5C = 5 times the number of cores) |
| metron.threadpool.type: ${enrichment.threadpool.type} # FIXED or WORK_STEALING |
| |
| components: |
| |
| # enrichment |
| - id: "stellarEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" |
| configMethods: |
| - name: "ofType" |
| args: |
| - "ENRICHMENT" |
| |
| # any kafka props for the producer go here. |
| - id: "kafkaWriterProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| - id: "stellarEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "stellar" |
| - ref: "stellarEnrichmentAdapter" |
| |
| - id: "geoEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter" |
| |
| - id: "geoEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "geo" |
| - ref: "geoEnrichmentAdapter" |
| |
| - id: "hostEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter" |
| constructorArgs: |
| - '${enrichment.host.known_hosts}' |
| |
| - id: "hostEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "host" |
| - ref: "hostEnrichmentAdapter" |
| |
| - id: "simpleHBaseEnrichmentConfig" |
| className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig" |
| configMethods: |
| - name: "withProviderImpl" |
| args: |
| - "${hbase.provider.impl}" |
| - name: "withHBaseTable" |
| args: |
| - "${enrichment.simple.hbase.table}" |
| - name: "withHBaseCF" |
| args: |
| - "${enrichment.simple.hbase.cf}" |
| |
| - id: "simpleHBaseEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter" |
| configMethods: |
| - name: "withConfig" |
| args: |
| - ref: "simpleHBaseEnrichmentConfig" |
| |
| - id: "simpleHBaseEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "hbaseEnrichment" |
| - ref: "simpleHBaseEnrichmentAdapter" |
| |
| - id: "enrichments" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - ref: "geoEnrichment" |
| - name: "add" |
| args: |
| - ref: "hostEnrichment" |
| - name: "add" |
| args: |
| - ref: "simpleHBaseEnrichment" |
| - name: "add" |
| args: |
| - ref: "stellarEnrichment" |
| |
| # enrichment error |
| - id: "enrichmentErrorKafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: |
| - "${enrichment.error.topic}" |
| - name: "withZkQuorum" |
| args: |
| - "${kafka.zk}" |
| - name: "withProducerConfigs" |
| args: |
| - ref: "kafkaWriterProps" |
| |
| # threat Intel |
| - id: "stellarThreatIntelAdapter" |
| className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" |
| configMethods: |
| - name: "ofType" |
| args: |
| - "THREAT_INTEL" |
| |
| - id: "stellarThreatIntelEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "stellar" |
| - ref: "stellarThreatIntelAdapter" |
| |
| - id: "simpleHBaseThreatIntelConfig" |
| className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig" |
| configMethods: |
| - name: "withProviderImpl" |
| args: |
| - "${hbase.provider.impl}" |
| - name: "withTrackerHBaseTable" |
| args: |
| - "${threat.intel.tracker.table}" |
| - name: "withTrackerHBaseCF" |
| args: |
| - "${threat.intel.tracker.cf}" |
| - name: "withHBaseTable" |
| args: |
| - "${threat.intel.simple.hbase.table}" |
| - name: "withHBaseCF" |
| args: |
| - "${threat.intel.simple.hbase.cf}" |
| |
| - id: "simpleHBaseThreatIntelAdapter" |
| className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter" |
| configMethods: |
| - name: "withConfig" |
| args: |
| - ref: "simpleHBaseThreatIntelConfig" |
| |
| - id: "simpleHBaseThreatIntelEnrichment" |
| className: "org.apache.metron.enrichment.configuration.Enrichment" |
| constructorArgs: |
| - "hbaseThreatIntel" |
| - ref: "simpleHBaseThreatIntelAdapter" |
| |
| - id: "threatIntels" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - ref: "simpleHBaseThreatIntelEnrichment" |
| - name: "add" |
| args: |
| - ref: "stellarThreatIntelEnrichment" |
| |
| # threatintel error |
| - id: "threatIntelErrorKafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: |
| - "${threat.intel.error.topic}" |
| - name: "withZkQuorum" |
| args: |
| - "${kafka.zk}" |
| - name: "withProducerConfigs" |
| args: |
| - ref: "kafkaWriterProps" |
| # indexing |
| - id: "kafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: |
| - "${enrichment.output.topic}" |
| - name: "withZkQuorum" |
| args: |
| - "${kafka.zk}" |
| - name: "withProducerConfigs" |
| args: |
| - ref: "kafkaWriterProps" |
| |
| # kafka/zookeeper |
| # any kafka props for the consumer go here. |
| - id: "kafkaProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "value.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "key.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "group.id" |
| - "enrichments" |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| |
| # the fields to pull out of the kafka messages |
| - id: "fields" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - "value" |
| |
| - id: "kafkaConfig" |
| className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" |
| constructorArgs: |
| - ref: "kafkaProps" |
| # topic name |
| - "${enrichment.input.topic}" |
| - "${kafka.zk}" |
| - ref: "fields" |
| configMethods: |
| - name: "setFirstPollOffsetStrategy" |
| args: |
| - "${kafka.start}" |
| |
| |
| spouts: |
| |
| - id: "kafkaSpout" |
| className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" |
| constructorArgs: |
| - ref: "kafkaConfig" |
| parallelism: ${kafka.spout.parallelism} |
| |
| bolts: |
| |
| # enrichment bolt |
| - id: "enrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichments" |
| args: |
| - ref: "enrichments" |
| - name: "withMaxCacheSize" |
| args: [${enrichment.cache.size}] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| - name: "withCaptureCacheStats" |
| args: [true] |
| - name: "withStrategy" |
| args: |
| - "ENRICHMENT" |
| - name: "withMessageGetter" |
| args: ["JSON_FROM_POSITION"] |
| parallelism: ${enrichment.parallelism} |
| |
| - id: "enrichmentErrorOutputBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "ENRICHMENT" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "enrichmentErrorKafkaWriter" |
| |
| |
| # threat intel bolts |
| - id: "threatIntelBolt" |
| className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichments" |
| args: |
| - ref: "threatIntels" |
| - name: "withMaxCacheSize" |
| args: [${threat.intel.cache.size}] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| - name: "withCaptureCacheStats" |
| args: [true] |
| - name: "withStrategy" |
| args: |
| - "THREAT_INTEL" |
| - name: "withMessageFieldName" |
| args: ["message"] |
| - name: "withMessageGetter" |
| args: ["JSON_FROM_FIELD_BY_REFERENCE"] |
| parallelism: ${threat.intel.parallelism} |
| |
| - id: "threatIntelErrorOutputBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "ENRICHMENT" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "threatIntelErrorKafkaWriter" |
| |
| # output bolt |
| - id: "outputBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "ENRICHMENT" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "kafkaWriter" |
| parallelism: ${kafka.writer.parallelism} |
| |
| streams: |
| |
| # parser |
| - name: "spout -> enrichmentBolt" |
| from: "kafkaSpout" |
| to: "enrichmentBolt" |
| grouping: |
| type: LOCAL_OR_SHUFFLE |
| |
| # error output |
| - name: "enrichmentBolt -> enrichmentErrorOutputBolt" |
| from: "enrichmentBolt" |
| to: "enrichmentErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |
| |
| # threat intel |
| - name: "enrichmentBolt -> threatIntelBolt" |
| from: "enrichmentBolt" |
| to: "threatIntelBolt" |
| grouping: |
| streamId: "message" |
| type: LOCAL_OR_SHUFFLE |
| |
| # output |
| - name: "threatIntelBolt -> output" |
| from: "threatIntelBolt" |
| to: "outputBolt" |
| grouping: |
| streamId: "message" |
| type: LOCAL_OR_SHUFFLE |
| |
| # error output |
| - name: "threatIntelBolt -> threatIntelErrorOutputBolt" |
| from: "threatIntelBolt" |
| to: "threatIntelErrorOutputBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |