blob: 9054f8f90c6ddc42789052e8faa3380b12fbafc7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing software
# distributed under the License is distributed on an "AS IS" BASIS
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{
config {
envContextConfig {
"env" : "storm"
"mode" : "local"
"topologyName" : "dsl-topology"
"parallelismConfig" : {
"kafkaMsgConsumer" : 1
},
}
alertExecutorConfigs {
defaultAlertExecutor {
"parallelism" : 1
"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
"needValidation" : "true"
}
}
eagleProps {
"site" : "sandbox"
"dataSource": "HADOOP"
}
}
dataflow {
KafkaSource.JmxStreamOne {
parallism = 1000
topic = "metric_event_1"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamTwo {
parallism = 1000
topic = "metric_event_2"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
KafkaSource.JmxStreamThree{
parallism = 1000
topic = "metric_event_3"
zkConnection = "localhost:2181"
zkConnectionTimeoutMS = 15000
consumerGroupId = "Consumer"
fetchSize = 1048586
transactionZKServers = "localhost"
transactionZKPort = 2181
transactionZKRoot = "/consumers"
transactionStateUpdateMS = 2000
deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
}
Console.printer {
format = "%s"
}
KafkaSink.metricStore {
topic = "metric_event_persist"
}
// KafkaSink.aggSink {
// topic = "metric_agg_persist"
// }
Alert.defaultAlertExecutor {
// upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
// alertExecutorId = defaultAlertExecutor
}
Aggregator.Aggregator{ sql = """
define stream JmxStreamOne(eagleAlertContext object, timestamp long, metric string, value double);
@info(name = "query")
from JmxStreamOne[value > 100.0] select * insert into outputStream;
"""}
JmxStreamOne -> Aggregator {}
Aggregator -> printer {}
// Aggregator -> aggregatedSink{
// grouping = shuffle
// }
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
grouping = shuffle
}
JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
grouping = shuffle
}
}
}