blob: 919654cbcabb8520c2dd51c916b79a73be59e85e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: "batch_indexing"
config:
topology.workers: ${indexing.workers}
topology.acker.executors: ${indexing.acker.executors}
topology.worker.childopts: ${topology.worker.childopts}
topology.auto-credentials: ${topology.auto-credentials}
topology.max.spout.pending: ${topology.max.spout.pending}
components:
- id: "fileNameFormat"
className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
configMethods:
- name: "withPrefix"
args:
- "enrichment-"
- name: "withExtension"
args:
- ".json"
- name: "withPath"
args:
- "${indexing.hdfs.output}"
- id: "hdfsRotationPolicy"
className: "${bolt.hdfs.rotation.policy}"
constructorArgs:
- ${bolt.hdfs.rotation.policy.count}
- "${bolt.hdfs.rotation.policy.units}"
#indexing
- id: "hdfsWriter"
className: "org.apache.metron.writer.hdfs.HdfsWriter"
configMethods:
- name: "withFileNameFormat"
args:
- ref: "fileNameFormat"
- name: "withRotationPolicy"
args:
- ref: "hdfsRotationPolicy"
- id: "kafkaWriterProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
- id: "kafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args:
- "${indexing.error.topic}"
- name: "withZkQuorum"
args:
- "${kafka.zk}"
- name: "withProducerConfigs"
args: [ref: "kafkaWriterProps"]
#kafka/zookeeper
# Any kafka props for the producer go here.
- id: "kafkaProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "value.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "key.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "group.id"
- "indexing-batch"
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
# The fields to pull out of the kafka messages
- id: "fields"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args:
- "value"
- id: "kafkaConfig"
className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
constructorArgs:
- ref: "kafkaProps"
# topic name
- "${indexing.input.topic}"
- "${kafka.zk}"
- ref: "fields"
configMethods:
- name: "setFirstPollOffsetStrategy"
args:
- "${kafka.start}"
spouts:
- id: "kafkaSpout"
className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
parallelism: ${kafka.spout.parallelism}
bolts:
# Indexing Bolts
- id: "hdfsIndexingBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "INDEXING"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "hdfsWriter"
- name: "withMessageGetter"
args:
- "DEFAULT_JSON_FROM_POSITION"
parallelism: ${hdfs.writer.parallelism}
- id: "indexingErrorBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "INDEXING"
configMethods:
- name: "withBulkMessageWriter"
args:
- ref: "kafkaWriter"
streams:
- name: "spout -> hdfs"
from: "kafkaSpout"
to: "hdfsIndexingBolt"
grouping:
type: LOCAL_OR_SHUFFLE
- name: "hdfsBolt -> errorIndexingBolt"
from: "hdfsIndexingBolt"
to: "indexingErrorBolt"
grouping:
streamId: "error"
type: LOCAL_OR_SHUFFLE