blob: 3921967295dedc961f869f1a7e4e6e95d7a8cb13 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: "profiler"
config:
topology.workers: ${profiler.workers}
topology.acker.executors: ${profiler.acker.executors}
topology.worker.childopts: ${topology.worker.childopts}
topology.auto-credentials: ${topology.auto-credentials}
topology.message.timeout.secs: ${topology.message.timeout.secs}
topology.max.spout.pending: ${topology.max.spout.pending}
topology.testing.always.try.serialize: ${topology.testing.always.try.serialize}
topology.fall.back.on.java.serialization: ${topology.fall.back.on.java.serialization}
topology.kryo.register: ${topology.kryo.register}
components:
- id: "rowKeyBuilder"
className: "org.apache.metron.profiler.hbase.SaltyRowKeyBuilder"
properties:
- name: "saltDivisor"
value: ${profiler.hbase.salt.divisor}
configMethods:
- name: "withPeriodDuration"
args: [${profiler.period.duration}, "${profiler.period.duration.units}"]
- id: "columnBuilder"
className: "org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder"
constructorArgs:
- "${profiler.hbase.column.family}"
- id: "hbaseMapper"
className: "org.apache.metron.profiler.storm.ProfileHBaseMapper"
properties:
- name: "rowKeyBuilder"
ref: "rowKeyBuilder"
- name: "columnBuilder"
ref: "columnBuilder"
# Any kafka props for the producer go here.
- id: "kafkaProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "value.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "key.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "group.id"
- "profiler"
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
# The fields to pull out of the kafka messages
- id: "fields"
className: "java.util.ArrayList"
configMethods:
- name: "add"
args: ["value"]
- name: "add"
args: ["topic"]
- name: "add"
args: ["partition"]
- name: "add"
args: ["offset"]
- name: "add"
args: ["timestamp"]
- id: "kafkaConfig"
className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
constructorArgs:
# zookeeper hosts
- ref: "kafkaProps"
# topic name
- "${profiler.input.topic}"
- "${kafka.zk}"
- ref: "fields"
configMethods:
- name: "setFirstPollOffsetStrategy"
args:
- "${kafka.start}"
- id: "kafkaWriterProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
- id: "kafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
- name: "withTopic"
args: ["${profiler.output.topic}"]
- name: "withZkQuorum"
args: ["${kafka.zk}"]
- name: "withProducerConfigs"
args: [ref: "kafkaWriterProps"]
- id: "kafkaEmitter"
className: "org.apache.metron.profiler.storm.KafkaEmitter"
- id: "hbaseEmitter"
className: "org.apache.metron.profiler.storm.HBaseEmitter"
- id: "windowDuration"
className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
constructorArgs:
- ${profiler.window.duration}
- "${profiler.window.duration.units}"
- id: "windowLag"
className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
constructorArgs:
- ${profiler.window.lag}
- "${profiler.window.lag.units}"
spouts:
- id: "kafkaSpout"
className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
constructorArgs:
- ref: "kafkaConfig"
parallelism: ${profiler.spout.parallelism}
bolts:
- id: "splitterBolt"
className: "org.apache.metron.profiler.storm.ProfileSplitterBolt"
constructorArgs:
- "${kafka.zk}"
parallelism: ${profiler.splitter.parallelism}
- id: "builderBolt"
className: "org.apache.metron.profiler.storm.ProfileBuilderBolt"
configMethods:
- name: "withZookeeperUrl"
args: ["${kafka.zk}"]
- name: "withPeriodDuration"
args: [${profiler.period.duration}, "${profiler.period.duration.units}"]
- name: "withProfileTimeToLive"
args: [${profiler.ttl}, "${profiler.ttl.units}"]
- name: "withEmitter"
args: [ref: "kafkaEmitter"]
- name: "withEmitter"
args: [ref: "hbaseEmitter"]
- name: "withTumblingWindow"
args: [ref: "windowDuration"]
- name: "withLag"
args: [ref: "windowLag"]
- name: "withMaxNumberOfRoutes"
args: [${profiler.max.routes.per.bolt}]
- name: "withTimestampField"
args: ["timestamp"]
parallelism: ${profiler.builder.parallelism}
- id: "hbaseBolt"
className: "org.apache.metron.hbase.bolt.HBaseBolt"
constructorArgs:
- "${profiler.hbase.table}"
- ref: "hbaseMapper"
configMethods:
- name: "withTableProvider"
args: ["${hbase.provider.impl}"]
- name: "withBatchSize"
args: [${profiler.hbase.batch}]
- name: "withFlushIntervalSecs"
args: [${profiler.hbase.flush.interval.seconds}]
parallelism: ${profiler.hbase.writer.parallelism}
- id: "kafkaBolt"
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "PROFILER"
configMethods:
- name: "withBulkMessageWriter"
args: [ref: "kafkaWriter"]
parallelism: ${profiler.kafka.writer.parallelism}
streams:
- name: "spout -> splitter"
from: "kafkaSpout"
to: "splitterBolt"
grouping:
type: LOCAL_OR_SHUFFLE
- name: "splitter -> builder"
from: "splitterBolt"
to: "builderBolt"
grouping:
type: FIELDS
args: ["entity", "profile"]
- name: "builder -> hbase"
from: "builderBolt"
to: "hbaseBolt"
grouping:
streamId: "hbase"
type: LOCAL_OR_SHUFFLE
- name: "builder -> kafka"
from: "builderBolt"
to: "kafkaBolt"
grouping:
streamId: "kafka"
type: LOCAL_OR_SHUFFLE