blob: 76c7bf291573bfc98f565cc30c222508d8d3104d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.eagle.metric.kafka;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
import org.apache.commons.lang.time.DateUtils;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.metric.reportor.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class KafkaMessageDistributionBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionBolt.class);
private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000;
private Config config;
private Map<String, String> baseMetricDimension;
private MetricRegistry registry;
private EagleMetricListener listener;
private long granularity;
private OutputCollector collector;
public KafkaMessageDistributionBolt(Config config){
this.config = config;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
String site = config.getString("dataSourceConfig.site");
String topic = config.getString("dataSourceConfig.topic");
this.baseMetricDimension = new HashMap<>();
this.baseMetricDimension.put("site", site);
this.baseMetricDimension.put("topic", topic);
registry = new MetricRegistry();
this.granularity = DEFAULT_METRIC_GRANULARITY;
if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE;
}
String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
listener = new EagleServiceReporterMetricListener(host, port, username, password);
}
public String generateMetricKey(String user) {
Map<String, String> dimensions = new HashMap<>();
dimensions.putAll(baseMetricDimension);
dimensions.put("user", user);
String metricName = "eagle.kafka.message.count";
String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions);
return encodedMetricName;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void execute(Tuple input) {
try {
String user = input.getString(0);
Long timestamp = input.getLong(1);
String metricKey = generateMetricKey(user);
if (registry.getMetrics().get(metricKey) == null) {
EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity);
metric.registerListener(listener);
registry.register(metricKey, metric);
}
else {
EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey);
metric.update(1, timestamp);
//TODO: if we need to remove metric from registry
}
}
catch (Exception ex) {
LOG.error("Got an exception, ex: ", ex);
}finally {
collector.ack(input);
}
}
}