blob: e6fdb831eed46458c573b9f4d16cbf8b4a0ac42c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.app.messaging;
import org.apache.eagle.metadata.model.StreamSourceConfig;
public class KafkaStreamSourceConfig implements StreamSourceConfig {
private static final String DEFAULT_CONSUMER_GROUP_ID = "eagleKafkaSource";
private static final String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers";
private static final Class<? extends backtype.storm.spout.Scheme> DEFAULT_KAFKA_SCHEMA = JsonSchema.class;
// Read Config
private String topicId;
private String brokerZkQuorum;
private String brokerZkBasePath;
private String transactionZkServers;
private int fetchSize = 1048576;
private String transactionZKRoot = DEFAULT_TRANSACTION_ZK_ROOT;
private String consumerGroupId = DEFAULT_CONSUMER_GROUP_ID;
private String brokerZkPath = "/brokers";
private long transactionStateUpdateMS = 2000;
private int startOffsetTime = -1;
private boolean forceFromStart = false;
private Class<? extends backtype.storm.spout.Scheme> schemaClass = DEFAULT_KAFKA_SCHEMA;
public String getBrokerZkQuorum() {
return brokerZkQuorum;
}
public void setBrokerZkQuorum(String brokerZkQuorum) {
this.brokerZkQuorum = brokerZkQuorum;
}
public String getBrokerZkBasePath() {
return brokerZkBasePath;
}
public void setBrokerZkBasePath(String brokerZkBasePath) {
this.brokerZkBasePath = brokerZkBasePath;
}
public int getFetchSize() {
return fetchSize;
}
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
public String getTransactionZKRoot() {
return transactionZKRoot;
}
public void setTransactionZKRoot(String transactionZKRoot) {
this.transactionZKRoot = transactionZKRoot;
}
public String getConsumerGroupId() {
return consumerGroupId;
}
public void setConsumerGroupId(String consumerGroupId) {
this.consumerGroupId = consumerGroupId;
}
public String getBrokerZkPath() {
return brokerZkPath;
}
public void setBrokerZkPath(String brokerZkPath) {
this.brokerZkPath = brokerZkPath;
}
public String getTransactionZkServers() {
return transactionZkServers;
}
public void setTransactionZkServers(String transactionZkServers) {
this.transactionZkServers = transactionZkServers;
}
public long getTransactionStateUpdateMS() {
return transactionStateUpdateMS;
}
public void setTransactionStateUpdateMS(long transactionStateUpdateMS) {
this.transactionStateUpdateMS = transactionStateUpdateMS;
}
public int getStartOffsetTime() {
return startOffsetTime;
}
public void setStartOffsetTime(int startOffsetTime) {
this.startOffsetTime = startOffsetTime;
}
public boolean isForceFromStart() {
return forceFromStart;
}
public void setForceFromStart(boolean forceFromStart) {
this.forceFromStart = forceFromStart;
}
public Class<? extends backtype.storm.spout.Scheme> getSchemaClass() {
return schemaClass;
}
public void setSchemaClass(Class<? extends backtype.storm.spout.Scheme> schemaClass) {
this.schemaClass = schemaClass;
}
@Override
public String getType() {
return "KAFKA";
}
@Override
public Class<?> getSourceType() {
return KafkaStreamSource.class;
}
@Override
public Class<? extends StreamSourceConfig> getConfigType() {
return StreamSourceConfig.class;
}
public String getTopicId() {
return topicId;
}
public void setTopicId(String topicId) {
this.topicId = topicId;
}
}