blob: d7464c682aa5a3b7ebf1eb154844687f05795cae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.samza.SamzaException;
import org.apache.samza.execution.JobPlanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The configuration class for KafkaConsumer
*/
public class KafkaConsumerConfig extends HashMap<String, Object> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
private static final int FETCH_MAX_BYTES = 1024 * 1024;
private final String systemName;
/*
* By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
* This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
*/
static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
private KafkaConsumerConfig(Map<String, Object> props, String systemName) {
super(props);
this.systemName = systemName;
}
/**
* Create kafka consumer configs, based on the subset of global configs.
* @param config application config
* @param systemName system name
* @param clientId client id provided by the caller
* @return KafkaConsumerConfig
*/
public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
final String groupId = createConsumerGroupId(config);
Map<String, Object> consumerProps = new HashMap<>(subConf);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
// These are values we enforce in sazma, and they cannot be overwritten.
// Disable consumer auto-commit because Samza controls commits
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// check if samza default offset value is defined
String systemOffsetDefault = new SystemConfig(config).getSystemOffsetDefault(systemName);
// Translate samza config value to kafka config value
String autoOffsetReset = getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), systemOffsetDefault);
LOG.info("setting auto.offset.reset for system {} to {}", systemName, autoOffsetReset);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// if consumer bootstrap servers are not configured, get them from the producer configs
if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
String bootstrapServers =
config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (StringUtils.isEmpty(bootstrapServers)) {
throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
// Always use default partition assignment strategy. Do not allow override.
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
// the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
// default to byte[]
if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
// Override default max poll config if there is no value
consumerProps.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
return new KafkaConsumerConfig(consumerProps, systemName);
}
public String getClientId() {
String clientId = (String) get(ConsumerConfig.CLIENT_ID_CONFIG);
if (StringUtils.isBlank(clientId)) {
throw new SamzaException("client Id is not set for consumer for system=" + systemName);
}
return clientId;
}
public int fetchMessageMaxBytes() {
String fetchSize = (String) get("fetch.message.max.bytes");
if (StringUtils.isBlank(fetchSize)) {
return FETCH_MAX_BYTES;
} else {
return Integer.valueOf(fetchSize);
}
}
public String getZkConnect() {
return (String) get(ZOOKEEPER_CONNECT);
}
// group id should be unique per job
static String createConsumerGroupId(Config config) {
Pair<String, String> jobNameId = getJobNameAndId(config);
return String.format("%s-%s", jobNameId.getLeft(), jobNameId.getRight());
}
// client id should be unique per job
public static String createClientId(String prefix, Config config) {
Pair<String, String> jobNameId = getJobNameAndId(config);
String jobName = jobNameId.getLeft();
String jobId = jobNameId.getRight();
return String.format("%s-%s-%s", prefix.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
jobId.replaceAll("\\W", "_"));
}
public static Pair<String, String> getJobNameAndId(Config config) {
JobConfig jobConfig = new JobConfig(JobPlanner.generateSingleJobConfig(config));
String jobName = jobConfig.getName().orElseThrow(() -> new ConfigException("Missing job name"));
return new ImmutablePair<>(jobName, jobConfig.getJobId());
}
/**
* If settings for Kafka Consumer auto.offset.reset is set - use it.
* If this setting is using the old (deprecated values) - translate them:
* "largest" -> "latest"
* "smallest" -> "earliest"
*
* If no setting specified - match it to the Samza default offset setting.
* If none defined - return "latest"
* @param autoOffsetReset consumer.auto.offset.reset config
* @param samzaOffsetDefault samza system default
* @return String representing the config value for "auto.offset.reset" property
*/
static String getAutoOffsetResetValue(final String autoOffsetReset, final String samzaOffsetDefault) {
// valid kafka consumer values
final String kafkaOffsetLatest = "latest";
final String kafkaOffsetEarliest = "earliest";
final String kafkaOffsetNone = "none";
// if the value for KafkaConsumer is set - use it.
if (!StringUtils.isBlank(autoOffsetReset)) {
if (autoOffsetReset.equals(kafkaOffsetEarliest) || autoOffsetReset.equals(kafkaOffsetLatest)
|| autoOffsetReset.equals(kafkaOffsetNone)) {
return autoOffsetReset;
}
// translate old kafka consumer values into new ones (SAMZA-1987 top remove it)
String newAutoOffsetReset;
switch (autoOffsetReset) {
case "largest":
newAutoOffsetReset = kafkaOffsetLatest;
LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, kafkaOffsetLatest);
break;
case "smallest":
newAutoOffsetReset = kafkaOffsetEarliest;
LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, kafkaOffsetEarliest);
break;
default:
throw new SamzaException("Using invalid value for kafka consumer config auto.offset.reset " + autoOffsetReset + ". See KafkaConsumer config for the correct values.");
}
LOG.info("Auto offset reset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
return newAutoOffsetReset;
}
// in case kafka consumer configs are not provided we should match them to Samza's ones.
String newAutoOffsetReset = kafkaOffsetLatest;
if (!StringUtils.isBlank(samzaOffsetDefault)) {
switch (samzaOffsetDefault) {
case SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
newAutoOffsetReset = kafkaOffsetLatest;
break;
case SystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
newAutoOffsetReset = kafkaOffsetEarliest;
break;
default:
throw new SamzaException("Using invalid value for samza default offset config " + autoOffsetReset + ". See samza config for the correct values");
}
LOG.info("Auto offset reset value for KafkaConsumer for system {} converted from {}(samza) to {}", samzaOffsetDefault, newAutoOffsetReset);
}
return newAutoOffsetReset;
}
}