blob: 5e95df5a9c891e1450f121c7a80f7e0acc9d8748 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.kafka;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.startpoint.Startpoint;
import org.apache.samza.startpoint.StartpointOldest;
import org.apache.samza.startpoint.StartpointSpecific;
import org.apache.samza.startpoint.StartpointTimestamp;
import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.startpoint.StartpointVisitor;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.KafkaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import scala.runtime.BoxedUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
public class KafkaSystemAdmin implements SystemAdmin {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemAdmin.class);
// Default exponential sleep strategy values
protected static final double DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER = 2.0;
protected static final long DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS = 500;
protected static final long DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS = 10000;
protected static final int MAX_RETRIES_ON_EXCEPTION = 5;
protected static final int DEFAULT_REPL_FACTOR = 2;
private static final int KAFKA_ADMIN_OPS_TIMEOUT_MS = 50000;
// used in TestRepartitionJoinWindowApp TODO - remove SAMZA-1945
@VisibleForTesting
public static volatile boolean deleteMessageCalled = false;
protected final String systemName;
protected final Config config;
// Custom properties to create a new coordinator stream.
private final Properties coordinatorStreamProperties;
// Replication factor for a new coordinator stream.
private final int coordinatorStreamReplicationFactor;
// Replication factor and kafka properties for changelog topic creation
private final Map<String, ChangelogInfo> changelogTopicMetaInformation;
// Kafka properties for intermediate topics creation
private final Map<String, Properties> intermediateStreamProperties;
// used for intermediate streams
protected final boolean deleteCommittedMessages;
// admin client for create/remove topics
final AdminClient adminClient;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final ThreadSafeKafkaConsumer threadSafeKafkaConsumer;
private final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver;
public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
this.systemName = systemName;
this.config = config;
if (metadataConsumer == null) {
throw new SamzaException(
"Cannot construct KafkaSystemAdmin for system " + systemName + " with null metadataConsumer");
}
this.threadSafeKafkaConsumer = new ThreadSafeKafkaConsumer(metadataConsumer);
this.kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(threadSafeKafkaConsumer);
Properties props = createAdminClientProperties();
LOG.info("New admin client with props:" + props);
adminClient = AdminClient.create(props);
StreamConfig streamConfig = new StreamConfig(config);
KafkaConfig kafkaConfig = new KafkaConfig(config);
coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor());
coordinatorStreamProperties = getCoordinatorStreamProperties(kafkaConfig);
Map<String, String> storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores();
// Construct the meta information for each topic, if the replication factor is not defined,
// we use 2 (DEFAULT_REPL_FACTOR) as the number of replicas for the change log stream.
changelogTopicMetaInformation = new HashMap<>();
for (Map.Entry<String, String> e : storeToChangelog.entrySet()) {
String storeName = e.getKey();
String topicName = e.getValue();
String replicationFactorStr = kafkaConfig.getChangelogStreamReplicationFactor(storeName);
int replicationFactor =
StringUtils.isEmpty(replicationFactorStr) ? DEFAULT_REPL_FACTOR : Integer.valueOf(replicationFactorStr);
ChangelogInfo changelogInfo =
new ChangelogInfo(replicationFactor, kafkaConfig.getChangelogKafkaProperties(storeName));
LOG.info(String.format("Creating topic meta information for topic: %s with replication factor: %s", topicName,
replicationFactor));
changelogTopicMetaInformation.put(topicName, changelogInfo);
}
// special flag to allow/enforce deleting of committed messages
SystemConfig systemConfig = new SystemConfig(config);
this.deleteCommittedMessages = systemConfig.deleteCommittedMessages(systemName);
intermediateStreamProperties = getIntermediateStreamProperties(config);
LOG.info(String.format("Created KafkaSystemAdmin for system %s", systemName));
}
@Override
public void start() {
// Plese note. There is slight inconsistency in the use of this class.
// Some of the functionality of this class may actually be used BEFORE start() is called.
// The SamzaContainer gets metadata (using this class) in SamzaContainer.apply,
// but this "start" actually gets called in SamzaContainer.run.
// review this usage (SAMZA-1888)
// Throw exception if start is called after stop
if (stopped.get()) {
throw new IllegalStateException("SamzaKafkaAdmin.start() is called after stop()");
}
}
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
try {
threadSafeKafkaConsumer.close();
} catch (Exception e) {
LOG.warn(String.format("Exception occurred when closing consumer of system: %s.", systemName), e);
}
}
if (adminClient != null) {
adminClient.close();
}
}
@Override
public boolean isStopped() {
return stopped.get();
}
/**
* Note! This method does not populate SystemStreamMetadata for each stream with real data.
* Thus, this method should ONLY be used to get number of partitions for each stream.
* It will throw NotImplementedException if anyone tries to access the actual metadata.
* @param streamNames set of streams for which get the partitions counts
* @param cacheTTL cache TTL if caching the data
* @return a map, keyed on stream names. Number of partitions in SystemStreamMetadata is the output of this method.
*/
@Override
public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
// This optimization omits actual metadata for performance. Instead, we inject a dummy for all partitions.
final SystemStreamMetadata.SystemStreamPartitionMetadata dummySspm =
new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) {
String msg =
"getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions";
@Override
public String getOldestOffset() {
throw new NotImplementedException(msg);
}
@Override
public String getNewestOffset() {
throw new NotImplementedException(msg);
}
@Override
public String getUpcomingOffset() {
throw new NotImplementedException(msg);
}
};
ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);
Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation =
new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
@Override
public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
Map<String, SystemStreamMetadata> allMetadata = new HashMap<>();
streamNames.forEach(streamName -> {
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
partitionInfos.forEach(
partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
});
loop.done();
return allMetadata;
}
};
Map<String, SystemStreamMetadata> result = strategy.run(fetchMetadataOperation,
new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
@Override
public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
LOG.warn(String.format("Fetching systemstreampartition counts for: %s threw an exception. Retrying.",
streamNames), exception);
} else {
LOG.error(String.format("Fetching systemstreampartition counts for: %s threw an exception.", streamNames),
exception);
loop.done();
throw new SamzaException(exception);
}
return null;
}
}).get();
LOG.info("SystemStream partition counts for system {}: {}", systemName, result);
return result;
}
@Override
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
// This is safe to do with Kafka, even if a topic is key-deduped. If the
// offset doesn't exist on a compacted topic, Kafka will return the first
// message AFTER the offset that was specified in the fetch request.
return offsets.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, (entry) -> String.valueOf(Long.valueOf(entry.getValue()) + 1)));
}
@Override
public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
return getSystemStreamMetadata(streamNames,
new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS));
}
@Override
public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
Set<SystemStreamPartition> ssps) {
return getSSPMetadata(ssps,
new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS));
}
/**
* Given a set of SystemStreamPartition, fetch metadata from Kafka for each
* of them, and return a map from ssp to SystemStreamPartitionMetadata for
* each of them. This method will return null for oldest and newest offsets
* if a given SystemStreamPartition is empty. This method will block and
* retry indefinitely until it gets a successful response from Kafka.
* @param ssps a set of strings of SSP
* @param retryBackoff retry backoff strategy
* @return a map from ssp to sspMetadata which has offsets
*/
Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
Set<SystemStreamPartition> ssps, ExponentialSleepStrategy retryBackoff) {
LOG.info("Fetching SSP metadata for: {}", ssps);
List<TopicPartition> topicPartitions = ssps.stream()
.map(ssp -> new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()))
.collect(Collectors.toList());
Function1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata>> fetchTopicPartitionMetadataOperation =
new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
@Override
public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply(
ExponentialSleepStrategy.RetryLoop loop) {
OffsetsMaps topicPartitionsMetadata = fetchTopicPartitionsMetadata(topicPartitions);
Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>();
for (SystemStreamPartition ssp : ssps) {
String oldestOffset = topicPartitionsMetadata.getOldestOffsets().get(ssp);
String newestOffset = topicPartitionsMetadata.getNewestOffsets().get(ssp);
String upcomingOffset = topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
sspToSSPMetadata.put(ssp,
new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset));
}
loop.done();
return sspToSSPMetadata;
}
};
Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> onExceptionRetryOperation =
new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
@Override
public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
LOG.warn(
String.format("Fetching SSP metadata for: %s threw an exception. Retrying.", ssps), exception);
} else {
LOG.error(String.format("Fetching SSP metadata for: %s threw an exception.", ssps), exception);
loop.done();
throw new SamzaException(exception);
}
return null;
}
};
Function0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>> fallbackOperation =
new AbstractFunction0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
@Override
public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply() {
throw new SamzaException("Failed to get SSP metadata");
}
};
return retryBackoff.run(fetchTopicPartitionMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
}
/**
* Given a set of stream names (topics), fetch metadata from Kafka for each
* stream, and return a map from stream name to SystemStreamMetadata for
* each stream. This method will return null for oldest and newest offsets
* if a given SystemStreamPartition is empty. This method will block and
* retry indefinitely until it gets a successful response from Kafka.
*
* @param streamNames a set of strings of stream names/topics
* @param retryBackoff retry backoff strategy
* @return a map from topic to SystemStreamMetadata which has offsets for each partition
*/
public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames,
ExponentialSleepStrategy retryBackoff) {
LOG.info("Fetching system stream metadata for {} from system {}", streamNames, systemName);
Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation =
new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
@Override
public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
Map<String, SystemStreamMetadata> metadata = fetchSystemStreamMetadata(streamNames);
loop.done();
return metadata;
}
};
Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> onExceptionRetryOperation =
new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
@Override
public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
LOG.warn(
String.format("Fetching system stream metadata for: %s threw an exception. Retrying.", streamNames),
exception);
} else {
LOG.error(String.format("Fetching system stream metadata for: %s threw an exception.", streamNames),
exception);
loop.done();
throw new SamzaException(exception);
}
return null;
}
};
Function0<Map<String, SystemStreamMetadata>> fallbackOperation =
new AbstractFunction0<Map<String, SystemStreamMetadata>>() {
@Override
public Map<String, SystemStreamMetadata> apply() {
throw new SamzaException("Failed to get system stream metadata");
}
};
return retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
}
/**
* Uses the kafka consumer to fetch the metadata for the {@code topicPartitions}.
*/
private OffsetsMaps fetchTopicPartitionsMetadata(List<TopicPartition> topicPartitions) {
Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> newestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> upcomingOffsets = new HashMap<>();
final Map<TopicPartition, Long> oldestOffsetsWithLong = new HashMap<>();
final Map<TopicPartition, Long> upcomingOffsetsWithLong = new HashMap<>();
threadSafeKafkaConsumer.execute(consumer -> {
Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
LOG.debug("Beginning offsets for topic-partitions: {} is {}", topicPartitions, beginningOffsets);
oldestOffsetsWithLong.putAll(beginningOffsets);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
LOG.debug("End offsets for topic-partitions: {} is {}", topicPartitions, endOffsets);
upcomingOffsetsWithLong.putAll(endOffsets);
return Optional.empty();
});
oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset)));
upcomingOffsetsWithLong.forEach((topicPartition, offset) -> {
upcomingOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset));
// Kafka's beginning Offset corresponds to the offset for the oldest message.
// Kafka's end offset corresponds to the offset for the upcoming message, and it is the newest offset + 1.
// When upcoming offset is <=0, the topic appears empty, we put oldest offset 0 and the newest offset null.
// When upcoming offset is >0, we subtract the upcoming offset by one for the newest offset.
// For normal case, the newest offset will correspond to the offset of the newest message in the stream;
// But for the big message, it is not the case. Seeking on the newest offset gives nothing for the newest big message.
// For now, we keep it as is for newest offsets the same as historical metadata structure.
if (offset <= 0) {
LOG.warn(
"Empty Kafka topic partition {} with upcoming offset {}. Skipping newest offset and setting oldest offset to 0 to consume from beginning",
topicPartition, offset);
oldestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), "0");
} else {
newestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset - 1));
}
});
return new OffsetsMaps(oldestOffsets, newestOffsets, upcomingOffsets);
}
/**
* Fetch SystemStreamMetadata for each topic with the consumer
* @param topics set of topics to get metadata info for
* @return map of topic to SystemStreamMetadata
*/
private Map<String, SystemStreamMetadata> fetchSystemStreamMetadata(Set<String> topics) {
Map<SystemStreamPartition, String> allOldestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> allNewestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> allUpcomingOffsets = new HashMap<>();
LOG.info("Fetching SystemStreamMetadata for topics {} on system {}", topics, systemName);
topics.forEach(topic -> {
OffsetsMaps offsetsForTopic = threadSafeKafkaConsumer.execute(consumer -> {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
if (partitionInfos == null) {
String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
throw new SamzaException(msg);
}
List<TopicPartition> topicPartitions = partitionInfos.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
return fetchTopicPartitionsMetadata(topicPartitions);
});
allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
});
return assembleMetadata(allOldestOffsets, allNewestOffsets, allUpcomingOffsets);
}
@Override
public Integer offsetComparator(String offset1, String offset2) {
if (offset1 == null || offset2 == null) {
return -1;
}
return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
}
@Override
public boolean createStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
final String replFactor = "replication.factor";
KafkaStreamSpec kafkaStreamSpec = toKafkaSpec(streamSpec);
String topicName = kafkaStreamSpec.getPhysicalName();
// create topic.
NewTopic newTopic = new NewTopic(topicName, kafkaStreamSpec.getPartitionCount(), (short) kafkaStreamSpec.getReplicationFactor());
// specify the configs
Map<String, String> streamConfig = new HashMap<>(kafkaStreamSpec.getConfig());
// HACK - replication.factor is invalid config for AdminClient.createTopics
if (streamConfig.containsKey(replFactor)) {
String repl = streamConfig.get(replFactor);
LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl factor {}",
replFactor, repl, kafkaStreamSpec.getPhysicalName(), kafkaStreamSpec.getReplicationFactor());
streamConfig.remove(replFactor);
}
newTopic.configs(new MapConfig(streamConfig));
CreateTopicsResult result = adminClient.createTopics(ImmutableSet.of(newTopic));
try {
result.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
if (e instanceof TopicExistsException || e.getCause() instanceof TopicExistsException) {
LOG.info("Topic {} already exists.", topicName);
return false;
}
throw new SamzaException(String.format("Creation of topic %s failed.", topicName), e);
}
LOG.info("Successfully created topic {}", topicName);
DescribeTopicsResult desc = adminClient.describeTopics(ImmutableSet.of(topicName));
try {
TopicDescription td = desc.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(topicName);
LOG.info("Topic {} created with {}", topicName, td);
return true;
} catch (Exception e) {
LOG.error("'Describe after create' failed for topic " + topicName, e);
return false;
}
}
@Override
public boolean clearStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
String topicName = streamSpec.getPhysicalName();
try {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableSet.of(topicName));
deleteTopicsResult.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.error("Failed to delete topic {} with exception {}.", topicName, e);
return false;
}
return true;
}
/**
* Converts a StreamSpec into a KafkaStreamSpec. Special handling for coordinator and changelog stream.
* @param spec a StreamSpec object
* @return KafkaStreamSpec object
*/
public KafkaStreamSpec toKafkaSpec(StreamSpec spec) {
KafkaStreamSpec kafkaSpec;
if (spec.isChangeLogStream()) {
String topicName = spec.getPhysicalName();
ChangelogInfo topicMeta = changelogTopicMetaInformation.get(topicName);
if (topicMeta == null) {
throw new StreamValidationException("Unable to find topic information for topic " + topicName);
}
kafkaSpec = new KafkaStreamSpec(spec.getId(), topicName, systemName, spec.getPartitionCount(),
topicMeta.getReplicationFactor(), topicMeta.getKafkaProperties());
} else if (spec.isCoordinatorStream()) {
kafkaSpec =
new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor,
coordinatorStreamProperties);
} else if (spec.isCheckpointStream()) {
Properties checkpointTopicProperties = new Properties();
checkpointTopicProperties.putAll(spec.getConfig());
kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName()))
.copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get()))
.copyWithProperties(checkpointTopicProperties);
} else if (intermediateStreamProperties.containsKey(spec.getId())) {
kafkaSpec = KafkaStreamSpec.fromSpec(spec);
Properties properties = kafkaSpec.getProperties();
properties.putAll(intermediateStreamProperties.get(spec.getId()));
kafkaSpec = kafkaSpec.copyWithProperties(properties);
} else {
kafkaSpec = KafkaStreamSpec.fromSpec(spec);
// we check if there is a system-level rf config specified, else we use KafkaConfig.topic-default-rf
int replicationFactorFromSystemConfig = Integer.valueOf(
new KafkaConfig(config).getSystemDefaultReplicationFactor(spec.getSystemName(),
KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
LOG.info("Using replication-factor: {} for StreamSpec: {}", replicationFactorFromSystemConfig, spec);
return new KafkaStreamSpec(kafkaSpec.getId(), kafkaSpec.getPhysicalName(), kafkaSpec.getSystemName(),
kafkaSpec.getPartitionCount(), replicationFactorFromSystemConfig, kafkaSpec.getProperties());
}
return kafkaSpec;
}
@Override
public void validateStream(StreamSpec streamSpec) throws StreamValidationException {
LOG.info("About to validate stream = " + streamSpec);
String streamName = streamSpec.getPhysicalName();
SystemStreamMetadata systemStreamMetadata =
getSystemStreamMetadata(Collections.singleton(streamName)).get(streamName);
if (systemStreamMetadata == null) {
throw new StreamValidationException(
"Failed to obtain metadata for stream " + streamName + ". Validation failed.");
}
int actualPartitionCounter = systemStreamMetadata.getSystemStreamPartitionMetadata().size();
int expectedPartitionCounter = streamSpec.getPartitionCount();
LOG.info("actualCount=" + actualPartitionCounter + "; expectedCount=" + expectedPartitionCounter);
if (actualPartitionCounter != expectedPartitionCounter) {
throw new StreamValidationException(
String.format("Mismatch of partitions for stream %s. Expected %d, got %d. Validation failed.", streamName,
expectedPartitionCounter, actualPartitionCounter));
}
}
/**
* Delete records up to (and including) the provided ssp offsets for
* all system stream partitions specified in the map.
* This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
* @param offsets specifies up to what offsets the messages should be deleted
*/
@Override
public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
if (deleteCommittedMessages) {
Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet()
.stream()
.collect(Collectors.toMap(entry ->
new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()),
entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
adminClient.deleteRecords(recordsToDelete).all().whenComplete((ignored, exception) -> {
if (exception != null) {
LOG.error("Delete message failed for SSPs " + offsets.keySet() + " due to", exception);
}
});
deleteMessageCalled = true;
}
}
protected Properties createAdminClientProperties() {
// populate brokerList from either consumer or producer configs
Properties props = new Properties();
// included SSL settings if needed
props.putAll(config.subset(String.format("systems.%s.consumer.", systemName), true));
//validate brokerList
String brokerList = config.get(
String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (brokerList == null) {
brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
}
if (brokerList == null) {
throw new SamzaException(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
}
@Override
public Set<SystemStream> getAllSystemStreams() {
Map<String, List<PartitionInfo>> topicToPartitionInfoMap = threadSafeKafkaConsumer.execute(consumer -> consumer.listTopics());
Set<SystemStream> systemStreams = topicToPartitionInfoMap.keySet()
.stream()
.map(topic -> new SystemStream(systemName, topic))
.collect(Collectors.toSet());
return systemStreams;
}
/**
* A helper method that takes oldest, newest, and upcoming offsets for each
* system stream partition, and creates a single map from stream name to
* SystemStreamMetadata.
*
* @param newestOffsets map of SSP to newest offset
* @param oldestOffsets map of SSP to oldest offset
* @param upcomingOffsets map of SSP to upcoming offset
* @return a {@link Map} from {@code system} to {@link SystemStreamMetadata}
*/
@VisibleForTesting
static Map<String, SystemStreamMetadata> assembleMetadata(Map<SystemStreamPartition, String> oldestOffsets,
Map<SystemStreamPartition, String> newestOffsets, Map<SystemStreamPartition, String> upcomingOffsets) {
HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
allSSPs.addAll(oldestOffsets.keySet());
allSSPs.addAll(newestOffsets.keySet());
allSSPs.addAll(upcomingOffsets.keySet());
Map<String, SystemStreamMetadata> assembledMetadata = allSSPs.stream()
.collect(Collectors.groupingBy(SystemStreamPartition::getStream))
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> {
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
entry.getValue()
.stream()
.collect(Collectors.toMap(SystemStreamPartition::getPartition, ssp ->
new SystemStreamMetadata.SystemStreamPartitionMetadata(
oldestOffsets.getOrDefault(ssp, null),
newestOffsets.getOrDefault(ssp, null),
upcomingOffsets.get(ssp))));
return new SystemStreamMetadata(entry.getKey(), partitionMetadata);
}));
return assembledMetadata;
}
/**
* Fetch stream properties for all intermediate streams.
*
* @param config kafka system config
* @return a {@link Map} from {@code streamId} to stream {@link Properties}
*/
@VisibleForTesting
static Map<String, Properties> getIntermediateStreamProperties(Config config) {
Map<String, Properties> intermedidateStreamProperties = Collections.emptyMap();
ApplicationConfig appConfig = new ApplicationConfig(config);
if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
StreamConfig streamConfig = new StreamConfig(config);
intermedidateStreamProperties = streamConfig.getStreamIds()
.stream()
.filter(streamConfig::getIsIntermediateStream)
.collect(Collectors.toMap(Function.identity(), streamId -> {
Properties properties = new Properties();
properties.putAll(streamConfig.getStreamProperties(streamId));
properties.putIfAbsent(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
return properties;
}));
}
return intermedidateStreamProperties;
}
private Properties getCoordinatorStreamProperties(KafkaConfig config) {
Properties coordinatorStreamProperties = new Properties();
coordinatorStreamProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
coordinatorStreamProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.getCoordinatorSegmentBytes());
return coordinatorStreamProperties;
}
@Override
public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
return startpoint.apply(systemStreamPartition, kafkaStartpointToOffsetResolver);
}
/**
* Container for metadata about offsets.
*/
private static class OffsetsMaps {
private final Map<SystemStreamPartition, String> oldestOffsets;
private final Map<SystemStreamPartition, String> newestOffsets;
private final Map<SystemStreamPartition, String> upcomingOffsets;
private OffsetsMaps(Map<SystemStreamPartition, String> oldestOffsets,
Map<SystemStreamPartition, String> newestOffsets, Map<SystemStreamPartition, String> upcomingOffsets) {
this.oldestOffsets = oldestOffsets;
this.newestOffsets = newestOffsets;
this.upcomingOffsets = upcomingOffsets;
}
private Map<SystemStreamPartition, String> getOldestOffsets() {
return oldestOffsets;
}
private Map<SystemStreamPartition, String> getNewestOffsets() {
return newestOffsets;
}
private Map<SystemStreamPartition, String> getUpcomingOffsets() {
return upcomingOffsets;
}
}
/**
* A helper class for represent changelog related information.
*/
private static class ChangelogInfo {
final int replicationFactor;
final Properties kafkaProperties;
/**
* @param replicationFactor The number of replicas for the changelog stream
* @param kafkaProperties The kafka specific properties that need to be used for changelog stream creation
*/
ChangelogInfo(int replicationFactor, Properties kafkaProperties) {
this.replicationFactor = replicationFactor;
this.kafkaProperties = kafkaProperties;
}
public int getReplicationFactor() {
return replicationFactor;
}
public Properties getKafkaProperties() {
return kafkaProperties;
}
}
/**
* Offers a kafka specific implementation of {@link StartpointVisitor} that resolves
* different types of {@link Startpoint} to samza offset.
*/
@VisibleForTesting
static class KafkaStartpointToOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
private final ThreadSafeKafkaConsumer threadSafeKafkaConsumer;
public KafkaStartpointToOffsetResolver(ThreadSafeKafkaConsumer threadSafeKafkaConsumer) {
this.threadSafeKafkaConsumer = threadSafeKafkaConsumer;
}
@VisibleForTesting
KafkaStartpointToOffsetResolver(Consumer consumer) {
this.threadSafeKafkaConsumer = new ThreadSafeKafkaConsumer(consumer);
}
@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
return startpointSpecific.getSpecificOffset();
}
@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
Preconditions.checkNotNull(startpointTimestamp, "Startpoint cannot be null");
Preconditions.checkNotNull(startpointTimestamp.getTimestampOffset(), "Timestamp field in startpoint cannot be null");
TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
Map<TopicPartition, Long> topicPartitionToTimestamp = ImmutableMap.of(topicPartition, startpointTimestamp.getTimestampOffset());
LOG.info("Finding offset for timestamp: {} in topic partition: {}.", startpointTimestamp.getTimestampOffset(), topicPartition);
Map<TopicPartition, OffsetAndTimestamp> topicPartitionToOffsetTimestamps = threadSafeKafkaConsumer.execute(consumer -> consumer.offsetsForTimes(topicPartitionToTimestamp));
OffsetAndTimestamp offsetAndTimestamp = topicPartitionToOffsetTimestamps.get(topicPartition);
if (offsetAndTimestamp != null) {
return String.valueOf(offsetAndTimestamp.offset());
} else {
LOG.info("Offset for timestamp: {} does not exist for partition: {}. Falling back to end offset.", startpointTimestamp.getTimestampOffset(), topicPartition);
return getEndOffset(systemStreamPartition);
}
}
@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
Map<TopicPartition, Long> topicPartitionToOffsets = threadSafeKafkaConsumer.execute(consumer -> consumer.beginningOffsets(ImmutableSet.of(topicPartition)));
Long beginningOffset = topicPartitionToOffsets.get(topicPartition);
LOG.info("Beginning offset for topic partition: {} is {}.", topicPartition, beginningOffset);
return String.valueOf(beginningOffset);
}
@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
return getEndOffset(systemStreamPartition);
}
/**
* Converts the {@link SystemStreamPartition} to {@link TopicPartition}.
* @param systemStreamPartition the input system stream partition.
* @return the converted topic partition.
*/
static TopicPartition toTopicPartition(SystemStreamPartition systemStreamPartition) {
Preconditions.checkNotNull(systemStreamPartition);
Preconditions.checkNotNull(systemStreamPartition.getPartition());
Preconditions.checkNotNull(systemStreamPartition.getStream());
return new TopicPartition(systemStreamPartition.getStream(), systemStreamPartition.getPartition().getPartitionId());
}
/**
* Determines the end offset of the {@param SystemStreamPartition}.
* @param systemStreamPartition represents the system stream partition.
* @return the end offset of the partition.
*/
private String getEndOffset(SystemStreamPartition systemStreamPartition) {
TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
Map<TopicPartition, Long> topicPartitionToOffsets = threadSafeKafkaConsumer.execute(consumer -> consumer.endOffsets(ImmutableSet.of(topicPartition)));
Long endOffset = topicPartitionToOffsets.get(topicPartition);
LOG.info("End offset for topic partition: {} is {}.", topicPartition, endOffset);
return String.valueOf(endOffset);
}
}
/**
* Offers thread-safe operations over the vanilla {@link Consumer}.
*/
static class ThreadSafeKafkaConsumer {
private final Consumer kafkaConsumer;
ThreadSafeKafkaConsumer(Consumer kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
}
/**
* Executes the lambda function comprised of kafka-consumer operations in a thread-safe manner
* and returns the result of the execution.
*
* @param function accepts the kafka consumer as argument and returns a result after executing a
* sequence of operations on a kafka-broker.
* @param <T> the return type of the lambda function.
* @return the result of executing the lambda function.
*/
public <T> T execute(Function<Consumer, T> function) {
// Kafka consumer is not thread-safe
synchronized (kafkaConsumer) {
return function.apply(kafkaConsumer);
}
}
/**
* Closes the underlying kafka consumer.
*/
public void close() {
synchronized (kafkaConsumer) {
kafkaConsumer.close();
}
}
}
}