| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| name: "batch_indexing" |
| |
| config: |
| topology.workers: ${indexing.workers} |
| topology.acker.executors: ${indexing.acker.executors} |
| topology.worker.childopts: ${topology.worker.childopts} |
| topology.auto-credentials: ${topology.auto-credentials} |
| topology.max.spout.pending: ${topology.max.spout.pending} |
| |
| components: |
| |
| - id: "fileNameFormat" |
| className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat" |
| configMethods: |
| - name: "withPrefix" |
| args: |
| - "enrichment-" |
| - name: "withExtension" |
| args: |
| - ".json" |
| - name: "withPath" |
| args: |
| - "${indexing.hdfs.output}" |
| |
| - id: "hdfsRotationPolicy" |
| className: "${bolt.hdfs.rotation.policy}" |
| constructorArgs: |
| - ${bolt.hdfs.rotation.policy.count} |
| - "${bolt.hdfs.rotation.policy.units}" |
| #indexing |
| - id: "hdfsWriter" |
| className: "org.apache.metron.writer.hdfs.HdfsWriter" |
| configMethods: |
| - name: "withFileNameFormat" |
| args: |
| - ref: "fileNameFormat" |
| - name: "withRotationPolicy" |
| args: |
| - ref: "hdfsRotationPolicy" |
| |
| - id: "kafkaWriterProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| - id: "kafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: |
| - "${indexing.error.topic}" |
| - name: "withZkQuorum" |
| args: |
| - "${kafka.zk}" |
| - name: "withProducerConfigs" |
| args: [ref: "kafkaWriterProps"] |
| |
| |
| #kafka/zookeeper |
| # Any kafka props for the producer go here. |
| - id: "kafkaProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "value.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "key.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "group.id" |
| - "indexing-batch" |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| # The fields to pull out of the kafka messages |
| - id: "fields" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: |
| - "value" |
| |
| - id: "kafkaConfig" |
| className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" |
| constructorArgs: |
| - ref: "kafkaProps" |
| # topic name |
| - "${indexing.input.topic}" |
| - "${kafka.zk}" |
| - ref: "fields" |
| configMethods: |
| - name: "setFirstPollOffsetStrategy" |
| args: |
| - "${kafka.start}" |
| |
| |
| spouts: |
| - id: "kafkaSpout" |
| className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" |
| constructorArgs: |
| - ref: "kafkaConfig" |
| parallelism: ${kafka.spout.parallelism} |
| |
| bolts: |
| |
| # Indexing Bolts |
| |
| - id: "hdfsIndexingBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "INDEXING" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "hdfsWriter" |
| - name: "withMessageGetter" |
| args: |
| - "DEFAULT_JSON_FROM_POSITION" |
| parallelism: ${hdfs.writer.parallelism} |
| |
| - id: "indexingErrorBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "INDEXING" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: |
| - ref: "kafkaWriter" |
| |
| streams: |
| |
| - name: "spout -> hdfs" |
| from: "kafkaSpout" |
| to: "hdfsIndexingBolt" |
| grouping: |
| type: LOCAL_OR_SHUFFLE |
| |
| |
| - name: "hdfsBolt -> errorIndexingBolt" |
| from: "hdfsIndexingBolt" |
| to: "indexingErrorBolt" |
| grouping: |
| streamId: "error" |
| type: LOCAL_OR_SHUFFLE |