| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| name: "enrichment" |
| config: |
| topology.workers: 1 |
| |
| components: |
| # Enrichment |
| - id: "geoEnrichmentAdapter" |
| className: "org.apache.metron.integration.util.mock.MockGeoAdapter" |
| - id: "geoEnrichment" |
| className: "org.apache.metron.domain.Enrichment" |
| constructorArgs: |
| - "geo" |
| - ref: "geoEnrichmentAdapter" |
| - id: "hostEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter" |
| constructorArgs: |
| - '${org.apache.metron.enrichment.host.known_hosts}' |
| - id: "hostEnrichment" |
| className: "org.apache.metron.domain.Enrichment" |
| constructorArgs: |
| - "host" |
| - ref: "hostEnrichmentAdapter" |
| |
| - id: "simpleHBaseEnrichmentConfig" |
| className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig" |
| configMethods: |
| - name: "withProviderImpl" |
| args: |
| - "${hbase.provider.impl}" |
| - name: "withHBaseTable" |
| args: |
| - "${enrichment.simple.hbase.table}" |
| - name: "withHBaseCF" |
| args: |
| - "${enrichment.simple.hbase.cf}" |
| - id: "simpleHBaseEnrichmentAdapter" |
| className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter" |
| configMethods: |
| - name: "withConfig" |
| args: |
| - ref: "simpleHBaseEnrichmentConfig" |
| - id: "simpleHBaseEnrichment" |
| className: "org.apache.metron.domain.Enrichment" |
| constructorArgs: |
| - "hbaseEnrichment" |
| - ref: "simpleHBaseEnrichmentAdapter" |
| - id: "enrichments" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - ref: "geoEnrichment" |
| - name: "add" |
| args: |
| - ref: "hostEnrichment" |
| - name: "add" |
| args: |
| - ref: "simpleHBaseEnrichment" |
| # Threat Intel |
| |
| - id: "simpleHBaseThreatIntelConfig" |
| className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig" |
| configMethods: |
| - name: "withProviderImpl" |
| args: |
| - "${hbase.provider.impl}" |
| - name: "withTrackerHBaseTable" |
| args: |
| - "${threat.intel.tracker.table}" |
| - name: "withTrackerHBaseCF" |
| args: |
| - "${threat.intel.tracker.cf}" |
| - name: "withHBaseTable" |
| args: |
| - "${threat.intel.simple.hbase.table}" |
| - name: "withHBaseCF" |
| args: |
| - "${threat.intel.simple.hbase.cf}" |
| - id: "simpleHBaseThreatIntelAdapter" |
| className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter" |
| configMethods: |
| - name: "withConfig" |
| args: |
| - ref: "simpleHBaseThreatIntelConfig" |
| - id: "simpleHBaseThreatIntelEnrichment" |
| className: "org.apache.metron.domain.Enrichment" |
| constructorArgs: |
| - "hbaseThreatIntel" |
| - ref: "simpleHBaseThreatIntelAdapter" |
| |
| - id: "threatIntels" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - ref: "simpleHBaseThreatIntelEnrichment" |
| |
| - id: "fileNameFormat" |
| className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat" |
| configMethods: |
| - name: "withPrefix" |
| args: |
| - "enrichment-" |
| - name: "withExtension" |
| args: |
| - ".json" |
| - name: "withPath" |
| args: |
| - "${index.hdfs.output}" |
| #indexing |
| - id: "hdfsWriter" |
| className: "org.apache.metron.writer.hdfs.HdfsWriter" |
| configMethods: |
| - name: "withFileNameFormat" |
| args: |
| - ref: "fileNameFormat" |
| - id: "indexWriter" |
| className: "${writer.class.name}" |
| |
| #kafka/zookeeper |
| - id: "zkHosts" |
| className: "storm.kafka.ZkHosts" |
| constructorArgs: |
| - "${kafka.zk}" |
| - id: "kafkaConfig" |
| className: "storm.kafka.SpoutConfig" |
| constructorArgs: |
| # zookeeper hosts |
| - ref: "zkHosts" |
| # topic name |
| - "enrichments" |
| # zk root |
| - "" |
| # id |
| - "enrichments" |
| properties: |
| - name: "ignoreZkOffsets" |
| value: true |
| - name: "startOffsetTime" |
| value: -2 |
| |
| spouts: |
| - id: "testingSpout" |
| className: "org.apache.metron.test.spouts.GenericInternalTestSpout" |
| parallelism: 1 |
| configMethods: |
| - name: "withFilename" |
| args: |
| - "../Metron-Testing/src/main/resources/sample/data/SampleInput/YafExampleOutput" |
| - name: "withRepeating" |
| args: |
| - true |
| - id: "kafkaSpout" |
| className: "storm.kafka.KafkaSpout" |
| constructorArgs: |
| - ref: "kafkaConfig" |
| bolts: |
| # Enrichment Bolts |
| - id: "enrichmentSplitBolt" |
| className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichments" |
| args: |
| - ref: "enrichments" |
| - id: "geoEnrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "geoEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| - id: "hostEnrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "hostEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| - id: "simpleHBaseEnrichmentBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "simpleHBaseEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| - id: "enrichmentJoinBolt" |
| className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| |
| # Threat Intel Bolts |
| - id: "threatIntelSplitBolt" |
| className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichments" |
| args: |
| - ref: "threatIntels" |
| - name: "withMessageFieldName" |
| args: ["message"] |
| - id: "simpleHBaseThreatIntelBolt" |
| className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withEnrichment" |
| args: |
| - ref: "simpleHBaseThreatIntelEnrichment" |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| - id: "threatIntelJoinBolt" |
| className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withMaxCacheSize" |
| args: [10000] |
| - name: "withMaxTimeRetain" |
| args: [10] |
| # Indexing Bolts |
| - id: "indexingBolt" |
| className: "org.apache.metron.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "indexWriter" |
| - id: "hdfsIndexingBolt" |
| className: "org.apache.metron.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "hdfsWriter" |
| |
| |
| streams: |
| #parser |
| - name: "spout -> enrichmentSplit" |
| from: "kafkaSpout" |
| to: "enrichmentSplitBolt" |
| grouping: |
| type: SHUFFLE |
| |
| #enrichment |
| - name: "enrichmentSplit -> host" |
| from: "enrichmentSplitBolt" |
| to: "hostEnrichmentBolt" |
| grouping: |
| streamId: "host" |
| type: FIELDS |
| args: ["key"] |
| - name: "enrichmentSplit -> geo" |
| from: "enrichmentSplitBolt" |
| to: "geoEnrichmentBolt" |
| grouping: |
| streamId: "geo" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "enrichmentSplit -> simpleHBaseEnrichmentBolt" |
| from: "enrichmentSplitBolt" |
| to: "simpleHBaseEnrichmentBolt" |
| grouping: |
| streamId: "hbaseEnrichment" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "splitter -> join" |
| from: "enrichmentSplitBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "message" |
| type: FIELDS |
| args: ["key"] |
| - name: "geo -> join" |
| from: "geoEnrichmentBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "geo" |
| type: FIELDS |
| args: ["key"] |
| |
| |
| - name: "simpleHBaseEnrichmentBolt -> join" |
| from: "simpleHBaseEnrichmentBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "hbaseEnrichment" |
| type: FIELDS |
| args: ["key"] |
| - name: "host -> join" |
| from: "hostEnrichmentBolt" |
| to: "enrichmentJoinBolt" |
| grouping: |
| streamId: "host" |
| type: FIELDS |
| args: ["key"] |
| |
| #threat intel |
| - name: "enrichmentJoin -> threatSplit" |
| from: "enrichmentJoinBolt" |
| to: "threatIntelSplitBolt" |
| grouping: |
| streamId: "message" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "threatSplit -> simpleHBaseThreatIntel" |
| from: "threatIntelSplitBolt" |
| to: "simpleHBaseThreatIntelBolt" |
| grouping: |
| streamId: "hbaseThreatIntel" |
| type: FIELDS |
| args: ["key"] |
| |
| - name: "simpleHBaseThreatIntel -> join" |
| from: "simpleHBaseThreatIntelBolt" |
| to: "threatIntelJoinBolt" |
| grouping: |
| streamId: "hbaseThreatIntel" |
| type: FIELDS |
| args: ["key"] |
| - name: "threatIntelSplit -> threatIntelJoin" |
| from: "threatIntelSplitBolt" |
| to: "threatIntelJoinBolt" |
| grouping: |
| streamId: "message" |
| type: FIELDS |
| args: ["key"] |
| #indexing |
| - name: "threatIntelJoin -> indexing" |
| from: "threatIntelJoinBolt" |
| to: "indexingBolt" |
| grouping: |
| streamId: "message" |
| type: FIELDS |
| args: ["key"] |
| - name: "threatIntelJoin -> hdfs" |
| from: "threatIntelJoinBolt" |
| to: "hdfsIndexingBolt" |
| grouping: |
| streamId: "message" |
| type: SHUFFLE |
| |
| - name: "indexingBolt -> errorIndexingBolt" |
| from: "indexingBolt" |
| to: "indexingBolt" |
| grouping: |
| streamId: "error" |
| type: SHUFFLE |