blob: cd9abe03d79c4c3fb1c15eb820296aa337f7d46b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.kafka.util;
import kafka.api.OffsetRequest;
import kafka.common.TopicAndPartition;
import kafka.consumer.ConsumerConfig;
import org.apache.gearpump.streaming.kafka.lib.source.DefaultKafkaMessageDecoder;
import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient;
import org.apache.gearpump.streaming.kafka.lib.source.grouper.DefaultPartitionGrouper;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import java.io.Serializable;
import java.util.Properties;
/**
* kafka specific configs
*/
public class KafkaConfig extends AbstractConfig implements Serializable {
private static final ConfigDef CONFIG;
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
private static final String ZOOKEEPER_CONNECT_DOC =
"Zookeeper connect string for Kafka topics management.";
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port pairs to use for "
+ "establishing the initial connection to the Kafka cluster. "
+ "The client will make use of all servers irrespective of which servers are specified "
+ "here for bootstrapping—this list only impacts the initial hosts used to discover "
+ "the full set of servers. This list should be in the form "
+ "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the "
+ "initial connection to discover the full cluster membership (which may change dynamically),"
+ " this list need not contain the full set of servers (you may want more than one, though, "
+ "in case a server is down).";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String CLIENT_ID_DOC = "An id string to pass to the server when making "
+ "requests. The purpose of this is to be able to track the source of requests beyond just "
+ "ip/port by allowing a logical application name to be included in server-side request "
+ "logging.";
public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_ID_DOC =
"A string that uniquely identifies a set of consumers within the same consumer group";
public static final String ENABLE_AUTO_COMMIT_CONFIG = "auto.commit.enable";
public static final String ENABLE_AUTO_COMMIT_DOC =
"If true the consumer's offset will be periodically committed in the background.";
/** KafkaSource specific configs */
public static final String CONSUMER_START_OFFSET_CONFIG = "consumer.start.offset";
private static final String CONSUMER_START_OFFSET_DOC = "Kafka offset to start consume from. "
+ "This will be overwritten when checkpoint recover takes effect.";
public static final String FETCH_THRESHOLD_CONFIG = "fetch.threshold";
private static final String FETCH_THRESHOLD_DOC = "Kafka messages are fetched asynchronously "
+ "and put onto a internal queue. When the number of messages in the queue hit the threshold,"
+ "the fetch thread stops fetching, and goes to sleep. It starts fetching again when the"
+ "number falls below the threshold";
public static final String FETCH_SLEEP_MS_CONFIG = "fetch.sleep.ms";
private static final String FETCH_SLEEP_MS_DOC =
"The amount of time to sleep when hitting fetch.threshold.";
public static final String MESSAGE_DECODER_CLASS_CONFIG = "message.decoder.class";
private static final String MESSAGE_DECODER_CLASS_DOC =
"Message decoder class that implements the <code>MessageDecoder</code> interface.";
public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
private static final String PARTITION_GROUPER_CLASS_DOC =
"Partition grouper class that implements the <code>KafkaGrouper</code> interface.";
public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
public static final String REPLICATION_FACTOR_DOC =
"The replication factor for checkpoint store topic.";
public static final String CHECKPOINT_STORE_NAME_PREFIX_CONFIG = "checkpoint.store.name.prefix";
public static final String CHECKPOINT_STORE_NAME_PREFIX_DOC = "Name prefix for checkpoint "
+ "store whose name will be of the form, namePrefix-sourceTopic-partitionId";
static {
CONFIG = new ConfigDef()
.define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
ConfigDef.Type.LIST,
ConfigDef.Importance.HIGH,
BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_ID_CONFIG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
CLIENT_ID_DOC)
.define(GROUP_ID_CONFIG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
GROUP_ID_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
ZOOKEEPER_CONNECT_DOC)
.define(REPLICATION_FACTOR_CONFIG,
ConfigDef.Type.INT,
1,
ConfigDef.Range.atLeast(1),
ConfigDef.Importance.MEDIUM,
REPLICATION_FACTOR_DOC)
.define(MESSAGE_DECODER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DefaultKafkaMessageDecoder.class.getName(),
ConfigDef.Importance.MEDIUM,
MESSAGE_DECODER_CLASS_DOC)
.define(PARTITION_GROUPER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DefaultPartitionGrouper.class.getName(),
ConfigDef.Importance.MEDIUM,
PARTITION_GROUPER_CLASS_DOC)
.define(FETCH_THRESHOLD_CONFIG,
ConfigDef.Type.INT,
10000,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.LOW,
FETCH_THRESHOLD_DOC)
.define(FETCH_SLEEP_MS_CONFIG,
ConfigDef.Type.LONG,
100,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.LOW,
FETCH_SLEEP_MS_DOC)
.define(CONSUMER_START_OFFSET_CONFIG,
ConfigDef.Type.LONG,
OffsetRequest.EarliestTime(),
ConfigDef.Range.atLeast(-2),
ConfigDef.Importance.MEDIUM,
CONSUMER_START_OFFSET_DOC)
.define(ENABLE_AUTO_COMMIT_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.MEDIUM,
ENABLE_AUTO_COMMIT_DOC)
.define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
CHECKPOINT_STORE_NAME_PREFIX_DOC);
}
public KafkaConfig(Properties props) {
super(CONFIG, props);
}
public static String getCheckpointStoreNameSuffix(TopicAndPartition tp) {
return tp.topic() + "-" + tp.partition();
}
public Properties getProducerConfig() {
Properties props = new Properties();
props.putAll(this.originals());
// remove source properties
removeSourceSpecificConfigs(props);
// remove consumer properties
removeConsumerSpecificConfigs(props);
return props;
}
public String getKafkaStoreTopic(String suffix) {
return getString(CHECKPOINT_STORE_NAME_PREFIX_CONFIG) + "-" + suffix;
}
public KafkaClient.KafkaClientFactory getKafkaClientFactory() {
return KafkaClient.factory();
}
public ConsumerConfig getConsumerConfig() {
Properties props = new Properties();
props.putAll(this.originals());
// remove source properties
removeSourceSpecificConfigs(props);
// remove producer properties
removeProducerSpecificConfigs(props);
// set consumer default property values
if (!props.containsKey(GROUP_ID_CONFIG)) {
props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG));
}
return new ConsumerConfig(props);
}
private void removeSourceSpecificConfigs(Properties props) {
props.remove(FETCH_SLEEP_MS_CONFIG);
props.remove(FETCH_THRESHOLD_CONFIG);
props.remove(PARTITION_GROUPER_CLASS_CONFIG);
props.remove(MESSAGE_DECODER_CLASS_CONFIG);
props.remove(REPLICATION_FACTOR_CONFIG);
props.remove(CHECKPOINT_STORE_NAME_PREFIX_CONFIG);
}
private void removeConsumerSpecificConfigs(Properties props) {
props.remove(ZOOKEEPER_CONNECT_CONFIG);
props.remove(GROUP_ID_CONFIG);
}
private void removeProducerSpecificConfigs(Properties props) {
props.remove(BOOTSTRAP_SERVERS_CONFIG);
}
public static class KafkaConfigFactory implements Serializable {
public KafkaConfig getKafkaConfig(Properties props) {
return new KafkaConfig(props);
}
}
}