| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| name: "profiler" |
| |
| config: |
| topology.workers: ${profiler.workers} |
| topology.acker.executors: ${profiler.acker.executors} |
| topology.worker.childopts: ${topology.worker.childopts} |
| topology.auto-credentials: ${topology.auto-credentials} |
| topology.message.timeout.secs: ${topology.message.timeout.secs} |
| topology.max.spout.pending: ${topology.max.spout.pending} |
| topology.testing.always.try.serialize: ${topology.testing.always.try.serialize} |
| topology.fall.back.on.java.serialization: ${topology.fall.back.on.java.serialization} |
| topology.kryo.register: ${topology.kryo.register} |
| |
| components: |
| |
| - id: "rowKeyBuilder" |
| className: "org.apache.metron.profiler.hbase.SaltyRowKeyBuilder" |
| properties: |
| - name: "saltDivisor" |
| value: ${profiler.hbase.salt.divisor} |
| configMethods: |
| - name: "withPeriodDuration" |
| args: [${profiler.period.duration}, "${profiler.period.duration.units}"] |
| |
| - id: "columnBuilder" |
| className: "org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder" |
| constructorArgs: |
| - "${profiler.hbase.column.family}" |
| |
| - id: "hbaseMapper" |
| className: "org.apache.metron.profiler.storm.ProfileHBaseMapper" |
| properties: |
| - name: "rowKeyBuilder" |
| ref: "rowKeyBuilder" |
| - name: "columnBuilder" |
| ref: "columnBuilder" |
| |
| # Any kafka props for the producer go here. |
| - id: "kafkaProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "value.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "key.deserializer" |
| - "org.apache.kafka.common.serialization.ByteArrayDeserializer" |
| - name: "put" |
| args: |
| - "group.id" |
| - "profiler" |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| # The fields to pull out of the kafka messages |
| - id: "fields" |
| className: "java.util.ArrayList" |
| configMethods: |
| - name: "add" |
| args: ["value"] |
| - name: "add" |
| args: ["topic"] |
| - name: "add" |
| args: ["partition"] |
| - name: "add" |
| args: ["offset"] |
| - name: "add" |
| args: ["timestamp"] |
| |
| - id: "kafkaConfig" |
| className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" |
| constructorArgs: |
| # zookeeper hosts |
| - ref: "kafkaProps" |
| # topic name |
| - "${profiler.input.topic}" |
| - "${kafka.zk}" |
| - ref: "fields" |
| configMethods: |
| - name: "setFirstPollOffsetStrategy" |
| args: |
| - "${kafka.start}" |
| |
| - id: "kafkaWriterProps" |
| className: "java.util.HashMap" |
| configMethods: |
| - name: "put" |
| args: |
| - "security.protocol" |
| - "${kafka.security.protocol}" |
| |
| - id: "kafkaWriter" |
| className: "org.apache.metron.writer.kafka.KafkaWriter" |
| configMethods: |
| - name: "withTopic" |
| args: ["${profiler.output.topic}"] |
| - name: "withZkQuorum" |
| args: ["${kafka.zk}"] |
| - name: "withProducerConfigs" |
| args: [ref: "kafkaWriterProps"] |
| |
| - id: "kafkaEmitter" |
| className: "org.apache.metron.profiler.storm.KafkaEmitter" |
| |
| - id: "hbaseEmitter" |
| className: "org.apache.metron.profiler.storm.HBaseEmitter" |
| |
| - id: "windowDuration" |
| className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration" |
| constructorArgs: |
| - ${profiler.window.duration} |
| - "${profiler.window.duration.units}" |
| |
| - id: "windowLag" |
| className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration" |
| constructorArgs: |
| - ${profiler.window.lag} |
| - "${profiler.window.lag.units}" |
| |
| spouts: |
| |
| - id: "kafkaSpout" |
| className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" |
| constructorArgs: |
| - ref: "kafkaConfig" |
| parallelism: ${profiler.spout.parallelism} |
| |
| bolts: |
| |
| - id: "splitterBolt" |
| className: "org.apache.metron.profiler.storm.ProfileSplitterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| parallelism: ${profiler.splitter.parallelism} |
| |
| - id: "builderBolt" |
| className: "org.apache.metron.profiler.storm.ProfileBuilderBolt" |
| configMethods: |
| - name: "withZookeeperUrl" |
| args: ["${kafka.zk}"] |
| - name: "withPeriodDuration" |
| args: [${profiler.period.duration}, "${profiler.period.duration.units}"] |
| - name: "withProfileTimeToLive" |
| args: [${profiler.ttl}, "${profiler.ttl.units}"] |
| - name: "withEmitter" |
| args: [ref: "kafkaEmitter"] |
| - name: "withEmitter" |
| args: [ref: "hbaseEmitter"] |
| - name: "withTumblingWindow" |
| args: [ref: "windowDuration"] |
| - name: "withLag" |
| args: [ref: "windowLag"] |
| - name: "withMaxNumberOfRoutes" |
| args: [${profiler.max.routes.per.bolt}] |
| - name: "withTimestampField" |
| args: ["timestamp"] |
| parallelism: ${profiler.builder.parallelism} |
| |
| - id: "hbaseBolt" |
| className: "org.apache.metron.hbase.bolt.HBaseBolt" |
| constructorArgs: |
| - "${profiler.hbase.table}" |
| - ref: "hbaseMapper" |
| configMethods: |
| - name: "withTableProvider" |
| args: ["${hbase.provider.impl}"] |
| - name: "withBatchSize" |
| args: [${profiler.hbase.batch}] |
| - name: "withFlushIntervalSecs" |
| args: [${profiler.hbase.flush.interval.seconds}] |
| parallelism: ${profiler.hbase.writer.parallelism} |
| |
| - id: "kafkaBolt" |
| className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" |
| constructorArgs: |
| - "${kafka.zk}" |
| - "PROFILER" |
| configMethods: |
| - name: "withBulkMessageWriter" |
| args: [ref: "kafkaWriter"] |
| parallelism: ${profiler.kafka.writer.parallelism} |
| |
| streams: |
| |
| - name: "spout -> splitter" |
| from: "kafkaSpout" |
| to: "splitterBolt" |
| grouping: |
| type: LOCAL_OR_SHUFFLE |
| |
| - name: "splitter -> builder" |
| from: "splitterBolt" |
| to: "builderBolt" |
| grouping: |
| type: FIELDS |
| args: ["entity", "profile"] |
| |
| - name: "builder -> hbase" |
| from: "builderBolt" |
| to: "hbaseBolt" |
| grouping: |
| streamId: "hbase" |
| type: LOCAL_OR_SHUFFLE |
| |
| - name: "builder -> kafka" |
| from: "builderBolt" |
| to: "kafkaBolt" |
| grouping: |
| streamId: "kafka" |
| type: LOCAL_OR_SHUFFLE |