blob: 3e0f9550a546592fb7f7a97a7a87c3f64670b543 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
/**
* Configuration for Kafka Streams. Documentation for these configurations can be found in the <a
* href="http://kafka.apache.org/documentation.html#streamsconfigs">Kafka documentation</a>
*/
public class StreamsConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
/** <code>state.dir</code> */
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state store.";
/** <code>zookeeper.connect<code/> */
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
/** <code>commit.interval.ms</code> */
public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
/** <code>poll.ms</code> */
public static final String POLL_MS_CONFIG = "poll.ms";
private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
/** <code>num.stream.threads</code> */
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
/** <code>num.stream.threads</code> */
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
/** <code>buffered.records.per.partition</code> */
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
/** <code>state.cleanup.delay</code> */
public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
/** <code>timestamp.extractor</code> */
public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
/** <code>partition.grouper</code> */
public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
/** <code>application.id</code> */
public static final String APPLICATION_ID_CONFIG = "application.id";
public static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
/** <code>replication.factor</code> */
public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
/** <code>replication.factor</code> */
public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
public static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface.";
/** <code>replication.factor</code> */
public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
public static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
/** <code>metrics.sample.window.ms</code> */
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
/** <code>metrics.num.samples</code> */
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
/** <code>metric.reporters</code> */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
/** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
static {
CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value
Type.STRING,
Importance.HIGH,
StreamsConfig.APPLICATION_ID_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
Type.STRING,
Importance.HIGH,
CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
Importance.HIGH,
CommonClientConfigs.CLIENT_ID_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG,
Type.STRING,
"",
Importance.HIGH,
StreamsConfig.ZOOKEEPER_CONNECT_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
"/tmp/kafka-streams",
Importance.MEDIUM,
STATE_DIR_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
1,
Importance.MEDIUM,
REPLICATION_FACTOR_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
ConsumerRecordTimestampExtractor.class.getName(),
Importance.MEDIUM,
TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(PARTITION_GROUPER_CLASS_CONFIG,
Type.CLASS,
DefaultPartitionGrouper.class.getName(),
Importance.MEDIUM,
PARTITION_GROUPER_CLASS_DOC)
.define(KEY_SERDE_CLASS_CONFIG,
Type.CLASS,
Serdes.ByteArraySerde.class.getName(),
Importance.MEDIUM,
KEY_SERDE_CLASS_DOC)
.define(VALUE_SERDE_CLASS_CONFIG,
Type.CLASS,
Serdes.ByteArraySerde.class.getName(),
Importance.MEDIUM,
VALUE_SERDE_CLASS_DOC)
.define(COMMIT_INTERVAL_MS_CONFIG,
Type.LONG,
30000,
Importance.LOW,
COMMIT_INTERVAL_MS_DOC)
.define(POLL_MS_CONFIG,
Type.LONG,
100,
Importance.LOW,
POLL_MS_DOC)
.define(NUM_STREAM_THREADS_CONFIG,
Type.INT,
1,
Importance.LOW,
NUM_STREAM_THREADS_DOC)
.define(NUM_STANDBY_REPLICAS_CONFIG,
Type.INT,
0,
Importance.LOW,
NUM_STANDBY_REPLICAS_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
1000,
Importance.LOW,
BUFFERED_RECORDS_PER_PARTITION_DOC)
.define(STATE_CLEANUP_DELAY_MS_CONFIG,
Type.LONG,
60000,
Importance.LOW,
STATE_CLEANUP_DELAY_MS_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
"",
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG,
Type.INT,
2,
atLeast(1),
Importance.LOW,
CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
}
public static class InternalConfig {
public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
}
public StreamsConfig(Map<?, ?> props) {
super(CONFIG, props);
}
public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
Map<String, Object> props = getBaseConsumerConfigs();
// add client id with stream client id prefix, and group id
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
// add configs required for stream partition assignor
props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
return props;
}
public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
Map<String, Object> props = getBaseConsumerConfigs();
// no need to set group id for a restore consumer
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
return props;
}
private Map<String, Object> getBaseConsumerConfigs() {
Map<String, Object> props = this.originals();
// remove streams properties
removeStreamsSpecificConfigs(props);
// set consumer default property values
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return props;
}
public Map<String, Object> getProducerConfigs(String clientId) {
Map<String, Object> props = this.originals();
// remove consumer properties that are not required for producers
props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
// remove streams properties
removeStreamsSpecificConfigs(props);
// set producer default property values
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
return props;
}
private void removeStreamsSpecificConfigs(Map<String, Object> props) {
props.remove(StreamsConfig.POLL_MS_CONFIG);
props.remove(StreamsConfig.STATE_DIR_CONFIG);
props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
props.remove(StreamsConfig.KEY_SERDE_CLASS_CONFIG);
props.remove(StreamsConfig.VALUE_SERDE_CLASS_CONFIG);
props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG);
props.remove(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
props.remove(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
props.remove(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG);
props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
props.remove(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
}
public Serde keySerde() {
return getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
}
public Serde valueSerde() {
return getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class);
}
public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
}