blob: 382ae1a6428db6aa110a5ed3ac557eb05f6d1b0a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.metric.kafka;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.base.BaseRichSpout;
import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import java.util.ArrayList;
import java.util.List;
/**
* Since 8/14/16.
*/
public class KafkaSourcedSpoutProvider {
private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
public static BaseRichSpout getSpout(Config context, Scheme scheme) {
// Kafka topic
String topic = context.getString("dataSourceConfig.topic");
// Kafka consumer group id
String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
// Kafka fetch size
int fetchSize = context.getInt("dataSourceConfig.fetchSize");
// Kafka deserializer class
String deserClsName = context.getString("dataSourceConfig.deserializerClass");
// Kafka broker zk connection
String zkConnString = context.getString("dataSourceConfig.zkQuorum");
// transaction zkRoot
String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
LOG.info(String.format("Use topic id: %s",topic));
String brokerZkPath = null;
if(context.hasPath("dataSourceConfig.brokerZkPath")) {
brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
}
BrokerHosts hosts;
if(brokerZkPath == null) {
hosts = new ZkHosts(zkConnString);
} else {
hosts = new ZkHosts(zkConnString, brokerZkPath);
}
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topic,
zkRoot + "/" + topic,
groupId);
// transaction zkServers
String[] zkConnections = zkConnString.split(",");
List<String> zkHosts = new ArrayList<>();
for (String zkConnection : zkConnections) {
zkHosts.add(zkConnection.split(":")[0]);
}
Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
spoutConfig.zkServers = zkHosts;
// transaction zkPort
spoutConfig.zkPort = zkPort;
// transaction update interval
spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
// Kafka fetch size
spoutConfig.fetchSizeBytes = fetchSize;
spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;
}
}