blob: 2a0bc2be7f22999c9ec6c29fc8c069998b07a62b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.kafka.client;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
/**
* A base {@link GobblinKafkaConsumerClient} that sets configurations shared by all {@link GobblinKafkaConsumerClient}s
*/
@Slf4j
public abstract class AbstractBaseKafkaConsumerClient implements GobblinKafkaConsumerClient {
public static final String CONFIG_NAMESPACE = "source.kafka";
public static final String CONFIG_PREFIX = CONFIG_NAMESPACE + ".";
public static final String CONSUMER_CONFIG = "consumerConfig";
public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = CONFIG_PREFIX + "fetchTimeoutMillis";
public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000; // 1 second
public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = CONFIG_PREFIX + "fetchMinBytes";
private static final int CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES_DEFAULT = 1024;
public static final String CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE = CONFIG_PREFIX + "socketTimeoutMillis";
public static final int CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT = 30000; // 30 seconds
public static final String CONFIG_ENABLE_SCHEMA_CHECK = CONFIG_PREFIX + "enableSchemaCheck";
public static final boolean ENABLE_SCHEMA_CHECK_DEFAULT = false;
protected final List<String> brokers;
protected final int fetchTimeoutMillis;
protected final int fetchMinBytes;
protected final int socketTimeoutMillis;
protected final Config config;
protected Optional<KafkaSchemaRegistry> schemaRegistry;
protected final boolean schemaCheckEnabled;
public AbstractBaseKafkaConsumerClient(Config config) {
this.config = config;
this.brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
if (this.brokers.isEmpty()) {
throw new IllegalArgumentException("Need to specify at least one Kafka broker.");
}
this.socketTimeoutMillis =
ConfigUtils.getInt(config, CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE, CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT);
this.fetchTimeoutMillis =
ConfigUtils.getInt(config, CONFIG_KAFKA_FETCH_TIMEOUT_VALUE, CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT);
this.fetchMinBytes =
ConfigUtils.getInt(config, CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES, CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES_DEFAULT);
Preconditions.checkArgument((this.fetchTimeoutMillis < this.socketTimeoutMillis),
"Kafka Source configuration error: FetchTimeout " + this.fetchTimeoutMillis
+ " must be smaller than SocketTimeout " + this.socketTimeoutMillis);
this.schemaCheckEnabled = ConfigUtils.getBoolean(config, CONFIG_ENABLE_SCHEMA_CHECK, ENABLE_SCHEMA_CHECK_DEFAULT);
}
/**
* Filter topics based on whitelist and blacklist patterns
* and if {@link #schemaCheckEnabled}, also filter on whether schema is present in schema registry
* @param blacklist - List of regex patterns that need to be blacklisted
* @param whitelist - List of regex patterns that need to be whitelisted
*
* @return
*/
@Override
public List<KafkaTopic> getFilteredTopics(final List<Pattern> blacklist, final List<Pattern> whitelist) {
return Lists.newArrayList(Iterables.filter(getTopics(), new Predicate<KafkaTopic>() {
@Override
public boolean apply(@Nonnull KafkaTopic kafkaTopic) {
return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist, whitelist) && isSchemaPresent(kafkaTopic.getName());
}
}));
}
private boolean isSchemaRegistryConfigured() {
if(this.schemaRegistry == null) {
this.schemaRegistry = (config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) && config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL)) ? Optional.of(KafkaSchemaRegistry.get(ConfigUtils.configToProperties(this.config))) : Optional.absent();
}
return this.schemaRegistry.isPresent();
}
/**
* accept topic if {@link #schemaCheckEnabled} and schema registry is configured
* @param topic
* @return
*/
private boolean isSchemaPresent(String topic) {
if(this.schemaCheckEnabled && isSchemaRegistryConfigured()) {
try {
if(this.schemaRegistry.get().getLatestSchemaByTopic(topic) == null) {
log.warn(String.format("Schema not found for topic %s skipping.", topic));
return false;
}
} catch (SchemaRegistryException e) {
log.warn(String.format("Schema not found for topic %s skipping.", topic));
return false;
}
}
return true;
}
/**
* A helper method that returns the canonical metric name for a kafka metric. A typical canonicalized metric name would
* be of the following format: "{metric-group}_{client-id}_{metric-name}". This method is invoked in {@link GobblinKafkaConsumerClient#getMetrics()}
* implementations to convert KafkaMetric names to a Coda Hale metric name. Note that the canonicalization is done on every invocation of the
* {@link GobblinKafkaConsumerClient#getMetrics()} ()} API.
* @param metricGroup the type of the Kafka metric e.g."consumer-fetch-manager-metrics", "consumer-coordinator-metrics" etc.
* @param metricTags any tags associated with the Kafka metric, typically include the kafka client id, topic name, partition number etc.
* @param metricName the name of the Kafka metric e.g. "records-lag-max", "fetch-throttle-time-max" etc.
* @return the canonicalized metric name.
*/
protected String canonicalMetricName(String metricGroup, Collection<String> metricTags, String metricName) {
List<String> nameParts = new ArrayList<>();
nameParts.add(metricGroup);
nameParts.addAll(metricTags);
nameParts.add(metricName);
StringBuilder builder = new StringBuilder();
for (String namePart : nameParts) {
builder.append(namePart);
builder.append(".");
}
// Remove the trailing dot.
builder.setLength(builder.length() - 1);
String processedName = builder.toString().replace(' ', '_').replace("\\.", "_");
return processedName;
}
/**
* Get a list of all kafka topics
*/
public abstract List<KafkaTopic> getTopics();
}